diff --git a/config/config.yaml b/config/config.yaml index a3e836a15..85c5bc9f1 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -47,6 +47,9 @@ kafka: ws2mschatoffline: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat_offline" + msgtomongo: + addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 + topic: "msg_to_mongo" ms2pschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" diff --git a/docker-compose.yaml b/docker-compose.yaml index 4a419ae1b..e1d2a98b2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -75,7 +75,7 @@ services: TZ: Asia/Shanghai KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: 127.0.0.1:2181 - KAFKA_CREATE_TOPICS: "ws2ms_chat:2:1,ms2ps_chat:2:1" + KAFKA_CREATE_TOPICS: "ws2ms_chat:2:1,ms2ps_chat:2:1,msg_to_mongo:2:1" KAFKA_ADVERTISED_LISTENERS: INSIDE://127.0.0.1:9092,OUTSIDE://103.116.45.174:9093 KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT" diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 8829906e4..65457878c 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -22,6 +22,7 @@ var ( persistentCH PersistentConsumerHandler historyCH OnlineHistoryConsumerHandler producer *kafka.Producer + producerToMongo *kafka.Producer cmdCh chan Cmd2Value onlineTopicStatus int w *sync.Mutex @@ -43,6 +44,7 @@ func Init() { statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) + producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic) } func Run() { //register mysqlConsumerHandler to diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index afacf097c..42684c7ff 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -155,11 +155,17 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { - hashCode := getHashCode(aggregationID) - channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) - //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} + pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) + if err != nil { + log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) + } else { + // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) + } + //hashCode := getHashCode(aggregationID) + //channelID := hashCode % ChannelNum + //log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) + ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + //och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} } func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { for { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index dff8a693e..0618ca7d7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -203,6 +203,10 @@ type config struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } + MsgToMongo struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } Ms2pschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` diff --git a/pkg/common/db/model.go b/pkg/common/db/model.go index 2418e7658..92bbca353 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/model.go @@ -28,7 +28,7 @@ type DataBases struct { mgoSession *mgo.Session //redisPool *redis.Pool mongoClient *mongo.Client - rdb *go_redis.Client + rdb *go_redis.ClusterClient } func key(dbAddress, dbName string) string { @@ -112,17 +112,17 @@ func init() { // ) // }, //} - DB.rdb = go_redis.NewClient(&go_redis.Options{ - Addr: config.Config.Redis.DBAddress, - Password: config.Config.Redis.DBPassWord, // no password set - DB: 0, // use default DB - PoolSize: 100, // 连接池大小 - }) - //DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ - // Addrs: []string{config.Config.Redis.DBAddress}, - // PoolSize: 100, - // Password: config.Config.Redis.DBPassWord, + //DB.rdb = go_redis.NewClient(&go_redis.Options{ + // Addr: config.Config.Redis.DBAddress, + // Password: config.Config.Redis.DBPassWord, // no password set + // DB: 0, // use default DB + // PoolSize: 100, // 连接池大小 //}) + DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ + Addrs: []string{config.Config.Redis.DBAddress}, + PoolSize: 50, + //Password: config.Config.Redis.DBPassWord, + }) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = DB.rdb.Ping(ctx).Result()