From 29dbb369613dd397ff8847e607bd4d9c44b0cf90 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 16 Jun 2022 19:33:35 +0800 Subject: [PATCH 1/3] user proto modify --- cmd/open_im_api/main.go | 3 ++- internal/api/user/user.go | 38 ++++++++++++++++++++++++++++++++ internal/rpc/user/user.go | 21 ++++++++++++++++++ pkg/base_info/user_api_struct.go | 8 ++++++- script/start_all.sh | 2 +- 5 files changed, 69 insertions(+), 3 deletions(-) diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index e8cf2e799..8b3818aa8 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -38,7 +38,8 @@ func main() { // user routing group, which handles user registration and login services userRouterGroup := r.Group("/user") { - userRouterGroup.POST("/update_user_info", user.UpdateUserInfo) //1 + userRouterGroup.POST("/update_user_info", user.UpdateUserInfo) //1 + userRouterGroup.POST("/set_global_msg_recv_opt", user.SetGlobalRecvMessageOpt) userRouterGroup.POST("/get_users_info", user.GetUsersInfo) //1 userRouterGroup.POST("/get_self_user_info", user.GetSelfUserInfo) //1 userRouterGroup.POST("/get_users_online_status", user.GetUsersOnlineStatus) //1 diff --git a/internal/api/user/user.go b/internal/api/user/user.go index af93416b4..ac40059d3 100644 --- a/internal/api/user/user.go +++ b/internal/api/user/user.go @@ -233,6 +233,44 @@ func UpdateUserInfo(c *gin.Context) { log.NewInfo(req.OperationID, "UpdateUserInfo api return ", resp) c.JSON(http.StatusOK, resp) } +func SetGlobalRecvMessageOpt(c *gin.Context) { + params := api.SetGlobalRecvMessageOptReq{} + if err := c.BindJSON(¶ms); err != nil { + log.NewError("0", "BindJSON failed ", err.Error()) + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + req := &rpc.SetGlobalRecvMessageOptReq{} + utils.CopyStructFields(req, ¶ms) + req.OperationID = params.OperationID + var ok bool + var errInfo string + ok, req.UserID, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID) + if !ok { + errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token") + log.NewError(req.OperationID, errMsg) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg}) + return + } + log.NewInfo(params.OperationID, "SetGlobalRecvMessageOpt args ", req.String()) + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID) + if etcdConn == nil { + errMsg := req.OperationID + "getcdv3.GetConn == nil" + log.NewError(req.OperationID, errMsg) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg}) + return + } + client := rpc.NewUserClient(etcdConn) + RpcResp, err := client.SetGlobalRecvMessageOpt(context.Background(), req) + if err != nil { + log.NewError(req.OperationID, "SetGlobalRecvMessageOpt failed ", err.Error(), req.String()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call rpc server failed"}) + return + } + resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}} + log.NewInfo(req.OperationID, "SetGlobalRecvMessageOpt api return ", resp) + c.JSON(http.StatusOK, resp) +} func GetSelfUserInfo(c *gin.Context) { params := api.GetSelfUserInfoReq{} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 86bd07c5b..fa733cbe4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -433,7 +433,28 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI //} return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{}}, nil } +func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbUser.SetGlobalRecvMessageOptReq) (*pbUser.SetGlobalRecvMessageOptResp, error) { + log.NewInfo(req.OperationID, "SetGlobalRecvMessageOpt args ", req.String()) + var user db.User + user.UserID = req.UserID + m := make(map[string]interface{}, 1) + + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), req.GlobalRecvMsgOpt, "set GlobalRecvMsgOpt") + m["global_recv_msg_opt"] = req.GlobalRecvMsgOpt + err := db.DB.SetUserGlobalMsgRecvOpt(user.UserID, req.GlobalRecvMsgOpt) + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetGlobalRecvMessageOpt failed ", err.Error(), user) + return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil + } + err = imdb.UpdateUserInfoByMap(user, m) + if err != nil { + log.NewError(req.OperationID, "SetGlobalRecvMessageOpt failed ", err.Error(), user) + return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil + } + chat.UserInfoUpdatedNotification(req.OperationID, req.UserID, req.UserID) + return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{}}, nil +} func (s *userServer) SyncJoinedGroupMemberFaceURL(userID string, faceURL string, operationID string, opUserID string) { joinedGroupIDList, err := imdb.GetJoinedGroupIDListByUserID(userID) if err != nil { diff --git a/pkg/base_info/user_api_struct.go b/pkg/base_info/user_api_struct.go index b517f24de..45c8cb22d 100644 --- a/pkg/base_info/user_api_struct.go +++ b/pkg/base_info/user_api_struct.go @@ -18,7 +18,13 @@ type UpdateSelfUserInfoReq struct { ApiUserInfo OperationID string `json:"operationID" binding:"required"` } - +type SetGlobalRecvMessageOptReq struct { + OperationID string `json:"operationID" binding:"required"` + GlobalRecvMsgOpt *int32 `json:"globalRecvMsgOpt" binding:"omitempty,oneof=0 1 2"` +} +type SetGlobalRecvMessageOptResp struct { + CommResp +} type UpdateUserInfoResp struct { CommResp } diff --git a/script/start_all.sh b/script/start_all.sh index bf7170d68..9fcbd1c6a 100644 --- a/script/start_all.sh +++ b/script/start_all.sh @@ -5,10 +5,10 @@ #fixme Put the shell script name here need_to_start_server_shell=( start_rpc_service.sh - msg_gateway_start.sh push_start.sh msg_transfer_start.sh sdk_svr_start.sh + msg_gateway_start.sh demo_svr_start.sh ) time=`date +"%Y-%m-%d %H:%M:%S"` From 903c39206ffb04731d24af52538cd56d71e35d9d Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 16 Jun 2022 20:35:27 +0800 Subject: [PATCH 2/3] msg transfer handle change --- config/config.yaml | 2 +- internal/msg_transfer/logic/init.go | 6 +- .../logic/online_history_msg_handler.go | 136 +++++++----------- .../logic/online_msg_to_mongo_handler.go | 79 ++++++++++ pkg/common/config/config.go | 8 +- 5 files changed, 139 insertions(+), 92 deletions(-) create mode 100644 internal/msg_transfer/logic/online_msg_to_mongo_handler.go diff --git a/config/config.yaml b/config/config.yaml index 4d9569d07..3143f3a0b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -55,8 +55,8 @@ kafka: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" consumergroupid: + msgToRedis: redis msgToMongo: mongo - msgToMongoOffline: mongo_offline msgToMySql: mysql msgToPush: push diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 65457878c..177745c6f 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -4,7 +4,6 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" - "Open_IM/pkg/common/log" "Open_IM/pkg/statistics" "fmt" "sync" @@ -20,7 +19,8 @@ const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler - historyCH OnlineHistoryConsumerHandler + historyCH OnlineHistoryRedisConsumerHandler + historyMongoCH OnlineHistoryMongoConsumerHandler producer *kafka.Producer producerToMongo *kafka.Producer cmdCh chan Cmd2Value @@ -39,7 +39,6 @@ func Init() { persistentCH.Init() historyCH.Init(cmdCh) onlineTopicStatus = OnlineTopicVacancy - log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) //offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) @@ -54,6 +53,7 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) + go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } func SetOnlineTopicStatus(status int) { diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index a1544549d..908d28b27 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -9,10 +9,8 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" - server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" - "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "hash/crc32" @@ -36,28 +34,21 @@ type Cmd2Value struct { Cmd int Value interface{} } -type OnlineHistoryConsumerHandler struct { +type OnlineHistoryRedisConsumerHandler struct { msgHandle map[string]fcb historyConsumerGroup *kfk.MConsumerGroup - cmdCh chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value - chMongoArrays [ChannelNum]chan Cmd2Value msgDistributionCh chan Cmd2Value } -func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { +func (och *OnlineHistoryRedisConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel go och.MessagesDistributionHandle() - och.cmdCh = cmdCh for i := 0; i < ChannelNum; i++ { och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } - for i := 0; i < ChannelNum; i++ { - och.chMongoArrays[i] = make(chan Cmd2Value, 10000) - go och.MongoMessageRun(i) - } if config.Config.ReliableStorage { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo } else { @@ -66,34 +57,10 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { } och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, - config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) } -func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) { - operationID := utils.OperationIDGenerator() - err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) - if err != nil { - log.Error(operationID, "TriggerCmd failed ", err.Error(), status) - return - } - log.Debug(operationID, "TriggerCmd success", status) - -} -func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { - var flag = 0 - select { - case ch <- value: - flag = 1 - case <-time.After(time.Second * time.Duration(timeout)): - flag = 2 - } - if flag == 1 { - return nil - } else { - return errors.New("send cmd timeout") - } -} -func (och *OnlineHistoryConsumerHandler) Run(channelID int) { +func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { for { select { case cmd := <-och.chArrays[channelID]: @@ -154,7 +121,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } } -func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { +func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) if err != nil { log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) @@ -167,47 +134,48 @@ func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID stri ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { //och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} } -func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { - for { - select { - case cmd := <-och.chMongoArrays[channelID]: - switch cmd.Cmd { - case MongoMessages: - msgChannelValue := cmd.Value.(MsgChannelValue) - msgList := msgChannelValue.msgList - triggerID := msgChannelValue.triggerID - aggregationID := msgChannelValue.aggregationID - lastSeq := msgChannelValue.lastSeq - err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq) - if err != nil { - log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) - } - for _, v := range msgList { - if v.MsgData.ContentType == constant.DeleteMessageNotification { - tips := server_api_params.TipsComm{} - DeleteMessageTips := server_api_params.DeleteMessageTips{} - err := proto.Unmarshal(v.MsgData.Content, &tips) - if err != nil { - log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String()) - continue - } - err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) - if err != nil { - log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) - continue - } - if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { - log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) - } - } - } - } - } - } -} +//func (och *OnlineHistoryRedisConsumerHandler) MongoMessageRun(channelID int) { +// for { +// select { +// case cmd := <-och.chMongoArrays[channelID]: +// switch cmd.Cmd { +// case MongoMessages: +// msgChannelValue := cmd.Value.(MsgChannelValue) +// msgList := msgChannelValue.msgList +// triggerID := msgChannelValue.triggerID +// aggregationID := msgChannelValue.aggregationID +// lastSeq := msgChannelValue.lastSeq +// err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq) +// if err != nil { +// log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) +// } +// for _, v := range msgList { +// if v.MsgData.ContentType == constant.DeleteMessageNotification { +// tips := server_api_params.TipsComm{} +// DeleteMessageTips := server_api_params.DeleteMessageTips{} +// err := proto.Unmarshal(v.MsgData.Content, &tips) +// if err != nil { +// log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String()) +// continue +// } +// err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) +// if err != nil { +// log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) +// continue +// } +// if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { +// log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) +// } +// +// } +// } +// } +// } +// } +//} -func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { +func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { for { aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) select { @@ -253,7 +221,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { } } -func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} @@ -325,7 +293,7 @@ func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consumer log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } -func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -365,10 +333,10 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg * } } -func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, // claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group // log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) // for msg := range claim.Messages() { @@ -385,7 +353,7 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error // return nil //} -func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group for { @@ -480,7 +448,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS return nil } -//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, // claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group // // for { diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go new file mode 100644 index 000000000..1d71b8a1c --- /dev/null +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -0,0 +1,79 @@ +package logic + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" + kfk "Open_IM/pkg/common/kafka" + "Open_IM/pkg/common/log" + pbMsg "Open_IM/pkg/proto/chat" + server_api_params "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" +) + +type OnlineHistoryMongoConsumerHandler struct { + msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup +} + +func (och *OnlineHistoryMongoConsumerHandler) Init(cmdCh chan Cmd2Value) { + och.msgHandle = make(map[string]fcb) + och.msgHandle[config.Config.Kafka.MsgToMongo.Topic] = och.handleChatWs2Mongo + och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic}, + config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + +} +func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { + msg := cMsg.Value + msgFromMQ := pbMsg.MsgDataToMongoByMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList) + } + for _, v := range msgFromMQ.MessageList { + if v.MsgData.ContentType == constant.DeleteMessageNotification { + tips := server_api_params.TipsComm{} + DeleteMessageTips := server_api_params.DeleteMessageTips{} + err := proto.Unmarshal(v.MsgData.Content, &tips) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "tips unmarshal err:", err.Error(), v.String()) + continue + } + err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) + if err != nil { + log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) + continue + } + if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { + log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) + } + + } + } +} + +func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +func (och *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + for msg := range claim.Messages() { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + if len(msg.Value) != 0 { + och.msgHandle[msg.Topic](msg, string(msg.Key), sess) + } else { + log.Error("", "mongo msg get from kafka but is nil", msg.Key) + } + sess.MarkMessage(msg, "") + } + return nil +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 185236744..de0d1597e 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -213,10 +213,10 @@ type config struct { Topic string `yaml:"topic"` } ConsumerGroupID struct { - MsgToMongo string `yaml:"msgToMongo"` - MsgToMongoOffline string `yaml:"msgToMongoOffline"` - MsgToMySql string `yaml:"msgToMySql"` - MsgToPush string `yaml:"msgToPush"` + MsgToRedis string `yaml:"msgToRedis"` + MsgToMongo string `yaml:"msgToMongo"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` } } Secret string `yaml:"secret"` From 8e3f090ffeae9b3832916c3062ef0e1cfac62f96 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 16 Jun 2022 21:15:24 +0800 Subject: [PATCH 3/3] msg transfer handle change --- internal/msg_transfer/logic/init.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 177745c6f..168bfe785 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -53,7 +53,7 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) - go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) + //go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH) //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } func SetOnlineTopicStatus(status int) {