From 7285e2b2c988aad062ff91ab74d07fe5473012cc Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Sat, 28 May 2022 18:10:08 +0800 Subject: [PATCH] add super group --- internal/msg_gateway/gate/logic.go | 48 +-- internal/msg_gateway/gate/validate.go | 12 + internal/msg_transfer/logic/init.go | 3 +- .../logic/offline_history_msg_handler.go | 313 ------------------ .../logic/online_history_msg_handler.go | 61 ++-- internal/rpc/msg/pull_message.go | 40 ++- internal/rpc/msg/send_msg.go | 87 +++-- pkg/common/constant/constant.go | 6 +- pkg/common/db/mongoModel.go | 7 + 9 files changed, 179 insertions(+), 398 deletions(-) delete mode 100644 internal/msg_transfer/logic/offline_history_msg_handler.go diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 884f5f842..af6fa025d 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -64,28 +64,37 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { } func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data) - rpcReq := pbChat.GetMaxAndMinSeqReq{} - nReply := new(pbChat.GetMaxAndMinSeqResp) - rpcReq.UserID = m.SendID - rpcReq.OperationID = m.OperationID - grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) - msgClient := pbChat.NewChatClient(grpcConn) - rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) - if err != nil { - log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err, rpcReq.String()) - nReply.ErrCode = 500 - nReply.ErrMsg = err.Error() - ws.getSeqResp(conn, m, nReply) + nReply := new(sdk_ws.GetMaxAndMinSeqResp) + isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq) + if isPass { + rpcReq := sdk_ws.GetMaxAndMinSeqReq{} + rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList + rpcReq.UserID = m.SendID + rpcReq.OperationID = m.OperationID + log.Debug(m.OperationID, "Ws call success to getMaxAndMinSeq", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList) + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) + if err != nil { + log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err.Error(), rpcReq.String()) + nReply.ErrCode = 500 + nReply.ErrMsg = err.Error() + ws.getSeqResp(conn, m, nReply) + } else { + log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String()) + ws.getSeqResp(conn, m, rpcReply) + } } else { - log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String()) - ws.getSeqResp(conn, m, rpcReply) + nReply.ErrCode = errCode + nReply.ErrMsg = errMsg + ws.getSeqResp(conn, m, nReply) + } + } -func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) { - var mReplyData sdk_ws.GetMaxAndMinSeqResp - mReplyData.MaxSeq = pb.GetMaxSeq() - mReplyData.MinSeq = pb.GetMinSeq() - b, _ := proto.Marshal(&mReplyData) +func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) { + log.Debug(m.OperationID, "getSeqResp come here ", pb.String()) + b, _ := proto.Marshal(pb) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, MsgIncr: m.MsgIncr, @@ -146,6 +155,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { sendMsgAllCountLock.Lock() sendMsgAllCount++ sendMsgAllCountLock.Unlock() + //stat.GaugeVecApiMethod.WithLabelValues("ws_send_message_count").Inc() log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) nReply := new(pbChat.SendMsgResp) diff --git a/internal/msg_gateway/gate/validate.go b/internal/msg_gateway/gate/validate.go index 31198a918..0f4950728 100644 --- a/internal/msg_gateway/gate/validate.go +++ b/internal/msg_gateway/gate/validate.go @@ -59,6 +59,18 @@ type SeqListData struct { func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) { switch r { + case constant.WSGetNewestSeq: + data := open_im_sdk.GetMaxAndMinSeqReq{} + if err := proto.Unmarshal(m.Data, &data); err != nil { + log.Error("", "Decode Data struct err", err.Error(), r) + return false, 203, err.Error(), nil + } + if err := validate.Struct(data); err != nil { + log.Error("", "data args validate err", err.Error(), r) + return false, 204, err.Error(), nil + + } + return true, 0, "", data case constant.WSSendMsg: data := open_im_sdk.MsgData{} if err := proto.Unmarshal(m.Data, &data); err != nil { diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index f8c424dcc..8829906e4 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -14,14 +14,13 @@ const OnlineTopicBusy = 1 const OnlineTopicVacancy = 0 const Msg = 2 const ConsumerMsgs = 3 -const UserMessages = 4 +const AggregationMessages = 4 const MongoMessages = 5 const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler historyCH OnlineHistoryConsumerHandler - offlineHistoryCH OfflineHistoryConsumerHandler producer *kafka.Producer cmdCh chan Cmd2Value onlineTopicStatus int diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go deleted file mode 100644 index b7ee3b2e3..000000000 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ /dev/null @@ -1,313 +0,0 @@ -package logic - -import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - kfk "Open_IM/pkg/common/kafka" - "Open_IM/pkg/common/log" - pbMsg "Open_IM/pkg/proto/chat" - "Open_IM/pkg/utils" - "github.com/Shopify/sarama" - "github.com/golang/protobuf/proto" - "time" -) - -type OfflineHistoryConsumerHandler struct { - msgHandle map[string]fcb - historyConsumerGroup *kfk.MConsumerGroup - cmdCh chan Cmd2Value - msgCh chan Cmd2Value - chArrays [ChannelNum]chan Cmd2Value - msgDistributionCh chan Cmd2Value -} - -func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { - mc.msgHandle = make(map[string]fcb) - mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel - go mc.MessagesDistributionHandle() - mc.cmdCh = cmdCh - mc.msgCh = make(chan Cmd2Value, 1000) - for i := 0; i < ChannelNum; i++ { - mc.chArrays[i] = make(chan Cmd2Value, 1000) - go mc.Run(i) - } - if config.Config.ReliableStorage { - mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo - } else { - mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability - - } - mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, - config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) - -} -func (och *OfflineHistoryConsumerHandler) Run(channelID int) { - for { - select { - case cmd := <-och.chArrays[channelID]: - switch cmd.Cmd { - case UserMessages: - msgChannelValue := cmd.Value.(MsgChannelValue) - msgList := msgChannelValue.msgList - triggerID := msgChannelValue.triggerID - storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) - for _, v := range msgList { - log.Debug(triggerID, "msg come to storage center", v.String()) - isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) - isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) - if isHistory { - storageMsgList = append(storageMsgList, v) - } - if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { - pushMsgList = append(pushMsgList, v) - } - } - - //switch msgChannelValue.msg.MsgData.SessionType { - //case constant.SingleChatType: - //case constant.GroupChatType: - //case constant.NotificationChatType: - //default: - // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) - // return - //} - - err, _ := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) - if err != nil { - singleMsgFailedCount += uint64(len(storageMsgList)) - log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) - } else { - singleMsgSuccessCountMutex.Lock() - singleMsgSuccessCount += uint64(len(storageMsgList)) - singleMsgSuccessCountMutex.Unlock() - for _, v := range pushMsgList { - sendMessageToPush(v, msgChannelValue.userID) - } - - } - } - } - } -} -func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() { - UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) - for { - select { - case cmd := <-och.msgDistributionCh: - switch cmd.Cmd { - case ConsumerMsgs: - triggerChannelValue := cmd.Value.(TriggerChannelValue) - triggerID := triggerChannelValue.triggerID - consumerMessages := triggerChannelValue.cmsgList - //Aggregation map[userid]message list - log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) - for i := 0; i < len(consumerMessages); i++ { - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) - if err != nil { - log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) - return - } - log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String()) - if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { - oldM = append(oldM, &msgFromMQ) - UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM - } else { - m := make([]*pbMsg.MsgDataToMQ, 0, 100) - m = append(m, &msgFromMQ) - UserAggregationMsgs[string(consumerMessages[i].Key)] = m - } - } - log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) - for userID, v := range UserAggregationMsgs { - if len(v) >= 0 { - channelID := getHashCode(userID) % ChannelNum - go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} - }(channelID, userID, v) - } - } - } - } - } - -} -func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { - msg := cMsg.Value - now := time.Now() - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(msg, &msgFromMQ) - if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) - return - } - operationID := msgFromMQ.OperationID - log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) - //Control whether to store offline messages (mongo) - isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) - //Control whether to store history messages (mysql) - isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) - isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) - switch msgFromMQ.MsgData.SessionType { - case constant.SingleChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) - if isHistory { - err := saveUserChat(msgKey, &msgFromMQ) - if err != nil { - singleMsgFailedCount++ - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - return - } - singleMsgSuccessCountMutex.Lock() - singleMsgSuccessCount++ - singleMsgSuccessCountMutex.Unlock() - log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { - go sendMessageToPush(&msgFromMQ, msgKey) - } - log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now)) - case constant.GroupChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) - if isHistory { - err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) - if err != nil { - log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) - return - } - groupMsgCount++ - } - go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) - log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now)) - - case constant.NotificationChatType: - log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) - if isHistory { - err := saveUserChat(msgKey, &msgFromMQ) - if err != nil { - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - return - } - log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } - if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { - } else { - go sendMessageToPush(&msgFromMQ, msgKey) - } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) - default: - log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) - return - } - sess.MarkMessage(cMsg, "") - log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) -} -func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { - msg := cMsg.Value - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(msg, &msgFromMQ) - if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) - return - } - operationID := msgFromMQ.OperationID - log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) - //Control whether to store offline messages (mongo) - isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) - isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) - if isHistory { - seq, err := db.DB.IncrUserSeq(msgKey) - if err != nil { - log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) - return - } - sess.MarkMessage(cMsg, "") - msgFromMQ.MsgData.Seq = uint32(seq) - log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) - //mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} - //err := saveUserChat(msgKey, &msgFromMQ) - //if err != nil { - // singleMsgFailedCount++ - // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) - // return - //} - //singleMsgSuccessCountMutex.Lock() - //singleMsgSuccessCount++ - //singleMsgSuccessCountMutex.Unlock() - //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) - } else { - if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { - go sendMessageToPush(&msgFromMQ, msgKey) - } - } -} - -func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } - -//func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, -// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group -// //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) -// //for msg := range claim.Messages() { -// // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") -// // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) -// //} -// for msg := range claim.Messages() { -// if GetOnlineTopicStatus() == OnlineTopicVacancy { -// log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) -// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) -// } else { -// select { -// case <-mc.cmdCh: -// log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) -// case <-time.After(time.Millisecond * time.Duration(100)): -// log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) -// } -// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) -// } -// } -// -// return nil -//} -func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 0, 500) - t := time.NewTicker(time.Duration(500) * time.Millisecond) - var triggerID string - for msg := range claim.Messages() { - //och.TriggerCmd(OnlineTopicBusy) - cMsg = append(cMsg, msg) - select { - case <-t.C: - if len(cMsg) >= 0 { - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: cMsg}} - sess.MarkMessage(msg, "") - cMsg = cMsg[0:0] - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) - } - default: - if len(cMsg) >= 500 { - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "length trigger msg consumer start", len(cMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: cMsg}} - sess.MarkMessage(msg, "") - cMsg = cMsg[0:0] - log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) - } - - } - log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - - } - return nil -} diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index fb5b33ce9..62f0ea0e8 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -22,10 +22,10 @@ import ( ) type MsgChannelValue struct { - userID string - triggerID string - msgList []*pbMsg.MsgDataToMQ - lastSeq uint64 + aggregationID string //maybe userID or super groupID + triggerID string + msgList []*pbMsg.MsgDataToMQ + lastSeq uint64 } type TriggerChannelValue struct { triggerID string @@ -98,13 +98,13 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { select { case cmd := <-och.chArrays[channelID]: switch cmd.Cmd { - case UserMessages: + case AggregationMessages: msgChannelValue := cmd.Value.(MsgChannelValue) msgList := msgChannelValue.msgList triggerID := msgChannelValue.triggerID storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - notStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) + notStoragePushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.aggregationID, len(msgList)) for _, v := range msgList { log.Debug(triggerID, "msg come to storage center", v.String()) isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) @@ -113,9 +113,10 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { storageMsgList = append(storageMsgList, v) //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) } else { - if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { - notStoragepushMsgList = append(notStoragepushMsgList, v) + if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) { + notStoragePushMsgList = append(notStoragePushMsgList, v) } + } } @@ -128,8 +129,8 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // return //} - log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList)) - err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) + log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) + err, lastSeq := saveUserChatList(msgChannelValue.aggregationID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) @@ -137,28 +138,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() - och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) + och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) go func(push, storage []*pbMsg.MsgDataToMQ) { for _, v := range storage { - sendMessageToPush(v, msgChannelValue.userID) + sendMessageToPush(v, msgChannelValue.aggregationID) } for _, x := range push { - sendMessageToPush(x, msgChannelValue.userID) + sendMessageToPush(x, msgChannelValue.aggregationID) } - }(notStoragepushMsgList, storageMsgList) + }(notStoragePushMsgList, storageMsgList) } } } } } -func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { - hashCode := getHashCode(userID) +func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { + hashCode := getHashCode(aggregationID) channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} + och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} } func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { for { @@ -169,9 +170,9 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { msgChannelValue := cmd.Value.(MsgChannelValue) msgList := msgChannelValue.msgList triggerID := msgChannelValue.triggerID - userID := msgChannelValue.userID + aggregationID := msgChannelValue.aggregationID lastSeq := msgChannelValue.lastSeq - err := db.DB.BatchInsertChat2DB(userID, msgList, triggerID, lastSeq) + err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq) if err != nil { log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) } @@ -202,7 +203,7 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for { - UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) select { case cmd := <-och.msgDistributionCh: switch cmd.Cmd { @@ -220,23 +221,23 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { return } log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) - if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) - UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + aggregationMsgs[string(consumerMessages[i].Key)] = oldM } else { m := make([]*pbMsg.MsgDataToMQ, 0, 100) m = append(m, &msgFromMQ) - UserAggregationMsgs[string(consumerMessages[i].Key)] = m + aggregationMsgs[string(consumerMessages[i].Key)] = m } } - log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) - for userID, v := range UserAggregationMsgs { + log.Debug(triggerID, "generate map list users len", len(aggregationMsgs)) + for aggregationID, v := range aggregationMsgs { if len(v) >= 0 { - hashCode := getHashCode(userID) + hashCode := getHashCode(aggregationID) channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} + och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: v, triggerID: triggerID}} //}(channelID, userID, v) } } diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 1811d3847..d9bb49344 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -6,18 +6,25 @@ import ( commonDB "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" - pbMsg "Open_IM/pkg/proto/chat" open_im_sdk "Open_IM/pkg/proto/sdk_ws" ) -func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) { +func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *open_im_sdk.GetMaxAndMinSeqReq) (*open_im_sdk.GetMaxAndMinSeqResp, error) { log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String()) + resp := new(open_im_sdk.GetMaxAndMinSeqResp) + m := make(map[string]*open_im_sdk.MaxAndMinSeq) //seq, err := model.GetBiggestSeqFromReceive(in.UserID) maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID) minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID) - resp := new(pbMsg.GetMaxAndMinSeqResp) if err1 == nil { resp.MaxSeq = uint32(maxSeq) + for _, v := range in.GroupIDList { + x := new(open_im_sdk.MaxAndMinSeq) + maxSeq, _ := commonDB.DB.GetUserMaxSeq(v) + x.MaxSeq = uint32(maxSeq) + m[v] = x + } + resp.GroupMaxAndMinSeq = m } else if err1 == redis.ErrNil { resp.MaxSeq = 0 } else { @@ -39,6 +46,7 @@ func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeq func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.PullMessageBySeqListReq) (*open_im_sdk.PullMessageBySeqListResp, error) { log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) resp := new(open_im_sdk.PullMessageBySeqListResp) + m := make(map[string]*open_im_sdk.MsgDataList) //msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) if err != nil { @@ -60,6 +68,32 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull } else { resp.List = redisMsgList } + for k, v := range in.GroupSeqList { + x := new(open_im_sdk.MsgDataList) + redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID) + if err != nil { + if err != redis.ErrNil { + log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) + } else { + log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) + } + msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(k, failedSeqList, in.OperationID) + if err1 != nil { + log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err.Error()) + resp.ErrCode = 201 + resp.ErrMsg = err.Error() + return resp, nil + } else { + redisMsgList = append(redisMsgList, msgList...) + x.MsgDataList = redisMsgList + m[k] = x + } + } else { + x.MsgDataList = redisMsgList + m[k] = x + } + } + resp.GroupMsgDataList = m //respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) //respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) return resp, nil diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index a97dd84ed..00a20ae13 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -50,46 +50,49 @@ type MsgCallBackResp struct { } func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string) { - if data.MsgData.SessionType == constant.GroupChatType { - return true, 0, "" - } - log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify) - reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID} - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) - cacheClient := cacheRpc.NewCacheClient(etcdConn) - cacheResp, err := cacheClient.GetBlackIDListFromCache(context.Background(), reqGetBlackIDListFromCache) - if err != nil { - log.NewError(data.OperationID, "GetBlackIDListFromCache rpc call failed ", err.Error()) - } else { - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(data.OperationID, "GetBlackIDListFromCache rpc logic call failed ", cacheResp.String()) - } else { - if utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { - return false, 600, "in black list" - } - } - } - log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify) - if config.Config.MessageVerify.FriendVerify { - reqGetFriendIDListFromCache := &cacheRpc.GetFriendIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID} + if data.MsgData.SessionType == constant.SingleChatType { + log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify) + reqGetBlackIDListFromCache := &cacheRpc.GetBlackIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID} etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) cacheClient := cacheRpc.NewCacheClient(etcdConn) - cacheResp, err := cacheClient.GetFriendIDListFromCache(context.Background(), reqGetFriendIDListFromCache) + cacheResp, err := cacheClient.GetBlackIDListFromCache(context.Background(), reqGetBlackIDListFromCache) if err != nil { - log.NewError(data.OperationID, "GetFriendIDListFromCache rpc call failed ", err.Error()) + log.NewError(data.OperationID, "GetBlackIDListFromCache rpc call failed ", err.Error()) } else { if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(data.OperationID, "GetFriendIDListFromCache rpc logic call failed ", cacheResp.String()) + log.NewError(data.OperationID, "GetBlackIDListFromCache rpc logic call failed ", cacheResp.String()) } else { - if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { - return false, 601, "not friend" + if utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { + return false, 600, "in black list" } } } - return true, 0, "" + log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify) + if config.Config.MessageVerify.FriendVerify { + reqGetFriendIDListFromCache := &cacheRpc.GetFriendIDListFromCacheReq{UserID: data.MsgData.RecvID, OperationID: data.OperationID} + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) + cacheClient := cacheRpc.NewCacheClient(etcdConn) + cacheResp, err := cacheClient.GetFriendIDListFromCache(context.Background(), reqGetFriendIDListFromCache) + if err != nil { + log.NewError(data.OperationID, "GetFriendIDListFromCache rpc call failed ", err.Error()) + } else { + if cacheResp.CommonResp.ErrCode != 0 { + log.NewError(data.OperationID, "GetFriendIDListFromCache rpc logic call failed ", cacheResp.String()) + } else { + if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { + return false, 601, "not friend" + } + } + } + return true, 0, "" + } else { + return true, 0, "" + } + } else { return true, 0, "" } + } func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { msg.ServerMsgID = GetMsgID(msg.SendID) @@ -368,6 +371,34 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) + case constant.SuperGroupChatType: + // callback + callbackResp := callbackBeforeSendSingleMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg resp: ", callbackResp) + } + if callbackResp.ActionCode != constant.ActionAllow { + if callbackResp.ErrCode == 0 { + callbackResp.ErrCode = 201 + } + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp) + return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) + } + msgToMQSingle.MsgData = pb.MsgData + log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus) + if err1 != nil { + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + + // callback + callbackResp = callbackAfterSendSingleMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) + } + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) + default: return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0) } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 5878c3851..05906924a 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -105,9 +105,9 @@ const ( SysMsgType = 200 //SessionType - SingleChatType = 1 - GroupChatType = 2 - + SingleChatType = 1 + GroupChatType = 2 + SuperGroupChatType = 3 NotificationChatType = 4 //token NormalToken = 0 diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index fb87df4e0..97d8885a6 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -949,6 +949,10 @@ func getSeqUid(uid string, seq uint32) string { seqSuffix := seq / singleGocMsgNum return indexGen(uid, seqSuffix) } +func getSeqSuperGroupID(groupID string, seq uint32) string { + seqSuffix := seq / singleGocMsgNum + return superGroupIndexGen(groupID, seqSuffix) +} func GetSeqUid(uid string, seq uint32) string { return getSeqUid(uid, seq) @@ -986,3 +990,6 @@ func isNotContainInt32(target uint32, List []uint32) bool { func indexGen(uid string, seqSuffix uint32) string { return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) } +func superGroupIndexGen(groupID string, seqSuffix uint32) string { + return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) +}