diff --git a/config/config.yaml b/config/config.yaml index 736b15bce..1cfee4e1d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -22,12 +22,12 @@ mongo: dbUri: ""#当dbUri值不为空则直接使用该值 dbAddress: [ 127.0.0.1:37017 ] #mongo地址 目前仅支持单机,默认即可 dbDirect: false - dbTimeout: 10 + dbTimeout: 60 dbDatabase: openIM #mongo db 默认即可 dbSource: admin dbUserName: #mongo用户名,建议先不设置 dbPassword: #mongo密码,建议先不设置 - dbMaxPoolSize: 20 + dbMaxPoolSize: 100 dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 redis: @@ -222,7 +222,9 @@ secret: tuoyun multiloginpolicy: 1 #chat log insert to db -chatPersistenceMysql: true +chatpersistencemysql: true +#可靠性存储 +reliablestorage: false #token config tokenpolicy: diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 5644f1cb3..884f5f842 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -147,6 +147,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { sendMsgAllCount++ sendMsgAllCountLock.Unlock() log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) + nReply := new(pbChat.SendMsgResp) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) if isPass { diff --git a/internal/msg_transfer/logic/db.go b/internal/msg_transfer/logic/db.go index 806b782c0..7a2af5404 100644 --- a/internal/msg_transfer/logic/db.go +++ b/internal/msg_transfer/logic/db.go @@ -19,5 +19,10 @@ func saveUserChat(uid string, msg *pbMsg.MsgDataToMQ) error { pbSaveData.MsgData = msg.MsgData log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) -// return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) + // return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) +} + +func saveUserChatList(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error { + log.Info(operationID, utils.GetSelfFuncName(), "args ", userID, len(msgList)) + return db.DB.BatchInsertChat(userID, msgList, operationID) } diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 8f5cc0695..f3f1b138e 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -12,6 +12,10 @@ import ( const OnlineTopicBusy = 1 const OnlineTopicVacancy = 0 +const Msg = 2 +const ConsumerMsgs = 3 +const UserMessages = 4 +const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler @@ -35,7 +39,7 @@ func Init() { historyCH.Init(cmdCh) onlineTopicStatus = OnlineTopicVacancy log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) - offlineHistoryCH.Init(cmdCh) + //offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) @@ -48,7 +52,7 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) - go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) + //go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } func SetOnlineTopicStatus(status int) { w.Lock() diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 876beb30e..78c867851 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -3,6 +3,7 @@ package logic import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/chat" @@ -14,21 +15,129 @@ import ( type OfflineHistoryConsumerHandler struct { msgHandle map[string]fcb - cmdCh chan Cmd2Value historyConsumerGroup *kfk.MConsumerGroup + cmdCh chan Cmd2Value + msgCh chan Cmd2Value + chArrays [ChannelNum]chan Cmd2Value + msgDistributionCh chan Cmd2Value } func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { mc.msgHandle = make(map[string]fcb) + mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + go mc.MessagesDistributionHandle() mc.cmdCh = cmdCh - mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo + mc.msgCh = make(chan Cmd2Value, 1000) + for i := 0; i < ChannelNum; i++ { + mc.chArrays[i] = make(chan Cmd2Value, 1000) + go mc.Run(i) + } + if config.Config.ReliableStorage { + mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo + } else { + mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2MongoLowReliability + + } mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) } +func (och *OfflineHistoryConsumerHandler) Run(channelID int) { + for { + select { + case cmd := <-och.chArrays[channelID]: + switch cmd.Cmd { + case UserMessages: + msgChannelValue := cmd.Value.(MsgChannelValue) + msgList := msgChannelValue.msgList + triggerID := msgChannelValue.triggerID + storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) + for _, v := range msgList { + log.Debug(triggerID, "msg come to storage center", v.String()) + isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) + if isHistory { + storageMsgList = append(storageMsgList, v) + } + if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { + pushMsgList = append(pushMsgList, v) + } + } -func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { + //switch msgChannelValue.msg.MsgData.SessionType { + //case constant.SingleChatType: + //case constant.GroupChatType: + //case constant.NotificationChatType: + //default: + // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + // return + //} + + err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) + if err != nil { + singleMsgFailedCount += uint64(len(storageMsgList)) + log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) + } else { + singleMsgSuccessCountMutex.Lock() + singleMsgSuccessCount += uint64(len(storageMsgList)) + singleMsgSuccessCountMutex.Unlock() + for _, v := range pushMsgList { + sendMessageToPush(v, msgChannelValue.userID) + } + + } + } + } + } +} +func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() { + UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + for { + select { + case cmd := <-och.msgDistributionCh: + switch cmd.Cmd { + case ConsumerMsgs: + triggerChannelValue := cmd.Value.(TriggerChannelValue) + triggerID := triggerChannelValue.triggerID + consumerMessages := triggerChannelValue.cmsgList + //Aggregation map[userid]message list + log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) + for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) + if err != nil { + log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) + return + } + log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String()) + if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + oldM = append(oldM, &msgFromMQ) + UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + } else { + m := make([]*pbMsg.MsgDataToMQ, 0, 100) + m = append(m, &msgFromMQ) + UserAggregationMsgs[string(consumerMessages[i].Key)] = m + } + } + log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) + for userID, v := range UserAggregationMsgs { + if len(v) >= 0 { + channelID := getHashCode(userID) % ChannelNum + go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} + }(channelID, userID, v) + } + } + } + } + } + +} +func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + msg := cMsg.Value now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -95,34 +204,110 @@ func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) return } + sess.MarkMessage(cMsg, "") log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } +func (mc *OfflineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + msg := cMsg.Value + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + operationID := msgFromMQ.OperationID + log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) + //Control whether to store offline messages (mongo) + isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return + } + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + //mc.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} + //err := saveUserChat(msgKey, &msgFromMQ) + //if err != nil { + // singleMsgFailedCount++ + // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + // return + //} + //singleMsgSuccessCountMutex.Lock() + //singleMsgSuccessCount++ + //singleMsgSuccessCountMutex.Unlock() + //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } else { + if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { + go sendMessageToPush(&msgFromMQ, msgKey) + } + } +} func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - //for msg := range claim.Messages() { - // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") - // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - //} - for msg := range claim.Messages() { - if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } else { - select { - case <-mc.cmdCh: - log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - case <-time.After(time.Millisecond * time.Duration(100)): - log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - } - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } - } +//func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group +// //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) +// //for msg := range claim.Messages() { +// // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") +// // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) +// //} +// for msg := range claim.Messages() { +// if GetOnlineTopicStatus() == OnlineTopicVacancy { +// log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) +// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) +// } else { +// select { +// case <-mc.cmdCh: +// log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) +// case <-time.After(time.Millisecond * time.Duration(100)): +// log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) +// } +// mc.msgHandle[msg.Topic](msg, string(msg.Key), sess) +// } +// } +// +// return nil +//} +func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + cMsg := make([]*sarama.ConsumerMessage, 0, 500) + t := time.NewTicker(time.Duration(500) * time.Millisecond) + var triggerID string + for msg := range claim.Messages() { + //och.TriggerCmd(OnlineTopicBusy) + cMsg = append(cMsg, msg) + select { + case <-t.C: + if len(cMsg) >= 0 { + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: cMsg}} + sess.MarkMessage(msg, "") + cMsg = cMsg[0:0] + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) + } + default: + if len(cMsg) >= 500 { + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "length trigger msg consumer start", len(cMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: cMsg}} + sess.MarkMessage(msg, "") + cMsg = cMsg[0:0] + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + } + + } + log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } return nil } diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index dd1f22634..8e4e3c2a2 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -3,6 +3,7 @@ package logic import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" kfk "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -13,11 +14,21 @@ import ( "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" + "hash/crc32" "strings" "time" ) -type fcb func(msg []byte, msgKey string) +type MsgChannelValue struct { + userID string + triggerID string + msgList []*pbMsg.MsgDataToMQ +} +type TriggerChannelValue struct { + triggerID string + cmsgList []*sarama.ConsumerMessage +} +type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) type Cmd2Value struct { Cmd int Value interface{} @@ -26,12 +37,27 @@ type OnlineHistoryConsumerHandler struct { msgHandle map[string]fcb historyConsumerGroup *kfk.MConsumerGroup cmdCh chan Cmd2Value + msgCh chan Cmd2Value + chArrays [ChannelNum]chan Cmd2Value + msgDistributionCh chan Cmd2Value } func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) + och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + go och.MessagesDistributionHandle() och.cmdCh = cmdCh - och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + och.msgCh = make(chan Cmd2Value, 1000) + for i := 0; i < ChannelNum; i++ { + och.chArrays[i] = make(chan Cmd2Value, 1000) + go och.Run(i) + } + if config.Config.ReliableStorage { + och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + } else { + och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2MongoLowReliability + + } och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) @@ -61,7 +87,184 @@ func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { return errors.New("send cmd timeout") } } -func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { +func (och *OnlineHistoryConsumerHandler) Run(channelID int) { + for { + select { + case cmd := <-och.chArrays[channelID]: + switch cmd.Cmd { + case UserMessages: + msgChannelValue := cmd.Value.(MsgChannelValue) + msgList := msgChannelValue.msgList + triggerID := msgChannelValue.triggerID + storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + noStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) + for _, v := range msgList { + log.Debug(triggerID, "msg come to storage center", v.String()) + isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) + if isHistory { + storageMsgList = append(storageMsgList, v) + //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) + } else { + if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { + noStoragepushMsgList = append(noStoragepushMsgList, v) + } + } + + } + + //switch msgChannelValue.msg.MsgData.SessionType { + //case constant.SingleChatType: + //case constant.GroupChatType: + //case constant.NotificationChatType: + //default: + // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + // return + //} + log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList)) + err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) + if err != nil { + singleMsgFailedCount += uint64(len(storageMsgList)) + log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) + } else { + singleMsgSuccessCountMutex.Lock() + singleMsgSuccessCount += uint64(len(storageMsgList)) + singleMsgSuccessCountMutex.Unlock() + go func(push, storage []*pbMsg.MsgDataToMQ) { + for _, v := range storage { + sendMessageToPush(v, msgChannelValue.userID) + } + for _, x := range push { + sendMessageToPush(x, msgChannelValue.userID) + } + + }(noStoragepushMsgList, storageMsgList) + + } + } + } + } +} + +//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { +// msg := cMsg.Value +// now := time.Now() +// msgFromMQ := pbMsg.MsgDataToMQ{} +// err := proto.Unmarshal(msg, &msgFromMQ) +// if err != nil { +// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) +// return +// } +// operationID := msgFromMQ.OperationID +// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) +// //Control whether to store offline messages (mongo) +// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) +// //Control whether to store history messages (mysql) +// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) +// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) +// switch msgFromMQ.MsgData.SessionType { +// case constant.SingleChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgKey, &msgFromMQ) +// if err != nil { +// singleMsgFailedCount++ +// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// return +// } +// singleMsgSuccessCountMutex.Lock() +// singleMsgSuccessCount++ +// singleMsgSuccessCountMutex.Unlock() +// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } +// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { +// } else { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) +// case constant.GroupChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) +// if err != nil { +// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) +// return +// } +// groupMsgCount++ +// } +// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) +// case constant.NotificationChatType: +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) +// if isHistory { +// err := saveUserChat(msgKey, &msgFromMQ) +// if err != nil { +// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) +// return +// } +// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) +// } +// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { +// } else { +// go sendMessageToPush(&msgFromMQ, msgKey) +// } +// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) +// default: +// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) +// return +// } +// sess.MarkMessage(cMsg, "") +// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) +//} + +func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { + for { + UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) + select { + case cmd := <-och.msgDistributionCh: + switch cmd.Cmd { + case ConsumerMsgs: + triggerChannelValue := cmd.Value.(TriggerChannelValue) + triggerID := triggerChannelValue.triggerID + consumerMessages := triggerChannelValue.cmsgList + //Aggregation map[userid]message list + log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) + for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) + if err != nil { + log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) + return + } + log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) + if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + oldM = append(oldM, &msgFromMQ) + UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + } else { + m := make([]*pbMsg.MsgDataToMQ, 0, 100) + m = append(m, &msgFromMQ) + UserAggregationMsgs[string(consumerMessages[i].Key)] = m + } + } + log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) + for userID, v := range UserAggregationMsgs { + if len(v) >= 0 { + hashCode := getHashCode(userID) + channelID := hashCode % ChannelNum + log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} + //}(channelID, userID, v) + } + } + } + } + + } + +} +func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + msg := cMsg.Value now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -95,7 +298,7 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s } else { go sendMessageToPush(&msgFromMQ, msgKey) } - log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + log.NewDebug(operationID, "saveSingleMsg cost time ", time.Since(now)) case constant.GroupChatType: log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) if isHistory { @@ -107,6 +310,8 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) + log.NewDebug(operationID, "saveGroupMsg cost time ", time.Since(now)) + case constant.NotificationChatType: log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) if isHistory { @@ -126,26 +331,125 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey s log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) return } + sess.MarkMessage(cMsg, "") log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } +func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { + msg := cMsg.Value + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + operationID := msgFromMQ.OperationID + log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) + //Control whether to store offline messages (mongo) + isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) + isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) + if isHistory { + seq, err := db.DB.IncrUserSeq(msgKey) + if err != nil { + log.NewError(operationID, "data insert to redis err", err.Error(), string(msg)) + return + } + sess.MarkMessage(cMsg, "") + msgFromMQ.MsgData.Seq = uint32(seq) + log.Debug(operationID, "send ch msg is ", msgFromMQ.String()) + //och.msgCh <- Cmd2Value{Cmd: Msg, Value: MsgChannelValue{msgKey, msgFromMQ}} + //err := saveUserChat(msgKey, &msgFromMQ) + //if err != nil { + // singleMsgFailedCount++ + // log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + // return + //} + //singleMsgSuccessCountMutex.Lock() + //singleMsgSuccessCount++ + //singleMsgSuccessCountMutex.Unlock() + //log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } else { + if !(!isSenderSync && msgKey == msgFromMQ.MsgData.SendID) { + go sendMessageToPush(&msgFromMQ, msgKey) + } + } +} + func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group +// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) +// for msg := range claim.Messages() { +// SetOnlineTopicStatus(OnlineTopicBusy) +// //och.TriggerCmd(OnlineTopicBusy) +// log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) +// och.msgHandle[msg.Topic](msg, string(msg.Key), sess) +// if claim.HighWaterMarkOffset()-msg.Offset <= 1 { +// log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) +// SetOnlineTopicStatus(OnlineTopicVacancy) +// och.TriggerCmd(OnlineTopicVacancy) +// } +// } +// return nil +//} + func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - for msg := range claim.Messages() { - SetOnlineTopicStatus(OnlineTopicBusy) - //och.TriggerCmd(OnlineTopicBusy) - log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) - och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - if claim.HighWaterMarkOffset()-msg.Offset <= 1 { - log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) - SetOnlineTopicStatus(OnlineTopicVacancy) - och.TriggerCmd(OnlineTopicVacancy) + + for { + if sess == nil { + log.NewWarn("", " sess == nil, waiting ") + time.Sleep(100 * time.Millisecond) + } else { + break } } + + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + cMsg := make([]*sarama.ConsumerMessage, 0, 1000) + t := time.NewTicker(time.Duration(100) * time.Millisecond) + var triggerID string + for { + //och.TriggerCmd(OnlineTopicBusy) + select { + case msg := <-claim.Messages(): + log.NewDebug("", "claim.Messages ", msg) + cMsg = append(cMsg, msg) + if len(cMsg) >= 1000 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg}} + sess.MarkMessage(msg, "") + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + } + + case <-t.C: + if len(cMsg) > 0 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + } + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg}} + sess.MarkMessage(cMsg[len(cMsg)-1], "") + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) + } + + } + //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + + } return nil } func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { @@ -174,3 +478,12 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { } } + +// String hashes a string to a unique hashcode. +// +// crc32 returns a uint32, but for our use we need +// and non negative integer. Here we cast to an integer +// and invert it if the result is negative. +func getHashCode(s string) uint32 { + return crc32.ChecksumIEEE([]byte(s)) +} diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 6e330a664..7418e629e 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -32,7 +32,8 @@ func (pc *PersistentConsumerHandler) Init() { } -func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) { +func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { + msg := cMsg.Value log.NewInfo("msg come here mysql!!!", "", "msg", string(msg)) var tag bool msgFromMQ := pbMsg.MsgDataToMQ{} @@ -71,7 +72,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - pc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) sess.MarkMessage(msg, "") } return nil diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index b8125be5a..4ee83aa11 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -57,6 +57,9 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { for _, v := range wsResult { if v.ResultCode == 0 { + if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { + break + } continue } if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 16e4c5ab6..4fc66bbaf 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -142,7 +142,8 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { } func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} - log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String()) + newTime := db.GetCurrentTimestampByMill() + log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID) flag, errCode, errMsg := userRelationshipVerification(pb) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) @@ -265,24 +266,34 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S m[constant.OnlineStatus] = memberUserIDList } + log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + newTime = db.GetCurrentTimestampByMill() + //split parallel send var wg sync.WaitGroup var sendTag bool - var split = 50 + var split = 20 for k, v := range m { remain := len(v) % split for i := 0; i < len(v)/split; i++ { wg.Add(1) - go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) + tmp := valueCopy(pb) + // go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) + go rpc.sendMsgToGroupOptimization(v[i*split:(i+1)*split], tmp, k, &sendTag, &wg) } if remain > 0 { wg.Add(1) - go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) + tmp := valueCopy(pb) + // go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) + go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg) } } + log.Debug(pb.OperationID, "send msg cost time22 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, "uidList : ", len(addUidList)) wg.Add(1) go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) wg.Wait() + log.Debug(pb.OperationID, "send msg cost time2 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + newTime = db.GetCurrentTimestampByMill() // callback if err := callbackAfterSendGroupMsg(pb); err != nil { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) @@ -341,6 +352,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } }() } + log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) } @@ -360,9 +372,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } + + log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: - return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) + return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0) } } @@ -372,10 +386,12 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str pid, offset, err := rpc.onlineProducer.SendMessage(m, key) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) + } else { + // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) } return err case constant.OfflineStatus: - pid, offset, err := rpc.offlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } @@ -420,6 +436,29 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i return true } +func modifyMessageByUserMessageReceiveOptoptimization(userID, sourceID string, sessionType int, operationID string, options *map[string]bool) bool { + conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) + opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID) + if err != nil && err != redis.ErrNil { + log.NewError(operationID, "GetSingleConversationMsgOpt from redis err", userID, conversationID, err.Error()) + return true + } + + switch opt { + case constant.ReceiveMessage: + return true + case constant.NotReceiveMessage: + return false + case constant.ReceiveNotNotifyMessage: + if *options == nil { + *options = make(map[string]bool, 10) + } + utils.SetSwitchFromOptions(*options, constant.IsOfflinePush, false) + return true + } + return true +} + type NotificationMsg struct { SendID string RecvID string @@ -737,6 +776,23 @@ func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, o m[constant.OfflineStatus] = offlUserIDList } +func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq { + offlinePushInfo := sdk_ws.OfflinePushInfo{} + if pb.MsgData.OfflinePushInfo != nil { + offlinePushInfo = *pb.MsgData.OfflinePushInfo + } + msgData := sdk_ws.MsgData{} + msgData = *pb.MsgData + msgData.OfflinePushInfo = &offlinePushInfo + + options := make(map[string]bool, 10) + for key, value := range pb.MsgData.Options { + options[key] = value + } + msgData.Options = options + return &pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} +} + func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { // log.Debug(pb.OperationID, "split userID ", list) offlinePushInfo := sdk_ws.OfflinePushInfo{} @@ -772,3 +828,22 @@ func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status s } wg.Done() } + +func (rpc *rpcChat) sendMsgToGroupOptimization(list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { + msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} + for _, v := range list { + groupPB.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) + if isSend { + err := rpc.sendMsgToKafka(&msgToMQGroup, v, status) + if err != nil { + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) + } else { + *sendTag = true + } + } else { + log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) + } + } + wg.Done() +} diff --git a/internal/rpc/msg/tag_send_msg.go b/internal/rpc/msg/tag_send_msg.go index 195640498..a79a90ec6 100644 --- a/internal/rpc/msg/tag_send_msg.go +++ b/internal/rpc/msg/tag_send_msg.go @@ -27,6 +27,7 @@ func TagSendMessage(operationID string, user *db.User, recvID, content string, s msgData.SenderNickname = user.Nickname msgData.Options = map[string]bool{} msgData.Options[constant.IsSenderConversationUpdate] = false + msgData.Options[constant.IsSenderNotificationPush] = false msgData.CreateTime = utils.GetCurrentTimestampByMill() msgData.ClientMsgID = utils.GetMsgID(user.UserID) msgData.SenderPlatformID = senderPlatformID diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index b60794a12..290e63ec2 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -216,7 +216,8 @@ type config struct { } Secret string `yaml:"secret"` MultiLoginPolicy int `yaml:"multiloginpolicy"` - ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"` + ChatPersistenceMysql bool `yaml:"chatpersistencemysql"` + ReliableStorage bool `yaml:"reliablestorage"` TokenPolicy struct { AccessSecret string `yaml:"accessSecret"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 032108ee9..66a4439e3 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -144,6 +144,7 @@ const ( IsSenderSync = "senderSync" IsNotPrivate = "notPrivate" IsSenderConversationUpdate = "senderConversationUpdate" + IsSenderNotificationPush = "senderNotificationPush" //GroupStatus GroupOk = 0 diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go new file mode 100644 index 000000000..b5fca3502 --- /dev/null +++ b/pkg/common/db/batch_insert_chat.go @@ -0,0 +1,106 @@ +package db + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/log" + pbMsg "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" + "context" + "errors" + "github.com/garyburd/redigo/redis" + "github.com/golang/protobuf/proto" + "go.mongodb.org/mongo-driver/bson" +) + +func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error { + newTime := getCurrentTimestampByMill() + if len(msgList) > GetSingleGocMsgNum() { + return errors.New("too large") + } + isInit := false + currentMaxSeq, err := d.GetUserMaxSeq(userID) + if err == nil { + + } else if err == redis.ErrNil { + isInit = true + currentMaxSeq = 0 + } else { + return utils.Wrap(err, "") + } + var remain uint64 + //if currentMaxSeq < uint64(GetSingleGocMsgNum()) { + // remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum())) + //} else { + // remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum())) + //} + + blk0 := uint64(GetSingleGocMsgNum() - 1) + if currentMaxSeq < uint64(GetSingleGocMsgNum()) { + remain = blk0 - currentMaxSeq + } else { + excludeBlk0 := currentMaxSeq - blk0 + remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum()) + } + + insertCounter := uint64(0) + msgListToMongo := make([]MsgInfo, 0) + msgListToMongoNext := make([]MsgInfo, 0) + seqUid := "" + seqUidNext := "" + log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList)) + //4998 remain ==1 + //4999 + for _, m := range msgList { + log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) + currentMaxSeq++ + sMsg := MsgInfo{} + sMsg.SendTime = m.MsgData.SendTime + m.MsgData.Seq = uint32(currentMaxSeq) + if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { + return utils.Wrap(err, "") + } + if isInit { + msgListToMongoNext = append(msgListToMongoNext, sMsg) + seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) + log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) + continue + } + if insertCounter < remain { + msgListToMongo = append(msgListToMongo, sMsg) + insertCounter++ + seqUid = getSeqUid(userID, uint32(currentMaxSeq)) + log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) + } else { + msgListToMongoNext = append(msgListToMongoNext, sMsg) + seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) + log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) + } + } + // ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + + ctx := context.Background() + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + + if seqUid != "" { + filter := bson.M{"uid": seqUid} + log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo) + err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() + if err != nil { + log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter) + return utils.Wrap(err, "") + } + } + if seqUidNext != "" { + filter := bson.M{"uid": seqUidNext} + sChat := UserChat{} + sChat.UID = seqUidNext + sChat.Msg = msgListToMongoNext + log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext) + if _, err = c.InsertOne(ctx, &sChat); err != nil { + log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) + return utils.Wrap(err, "") + } + } + log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList)) + return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "") +} diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 08d605113..80e9d3400 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -31,6 +31,10 @@ const cWorkMoment = "work_moment" const cCommentMsg = "comment_msg" const singleGocMsgNum = 5000 +func GetSingleGocMsgNum() int { + return singleGocMsgNum +} + type MsgInfo struct { SendTime int64 Msg []byte @@ -351,7 +355,7 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD return utils.Wrap(err, "") } err = c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": sMsg}}).Err() - log.NewDebug(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) + log.NewWarn(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) if err != nil { sChat := UserChat{} sChat.UID = seqUid @@ -368,6 +372,47 @@ func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgD return nil } +// +//func (d *DataBases) SaveUserChatListMongo2(uid string, sendTime int64, msgList []*pbMsg.MsgDataToDB) error { +// ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) +// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) +// newTime := getCurrentTimestampByMill() +// operationID := "" +// seqUid := "" +// msgListToMongo := make([]MsgInfo, 0) +// +// for _, m := range msgList { +// seqUid = getSeqUid(uid, m.MsgData.Seq) +// var err error +// sMsg := MsgInfo{} +// sMsg.SendTime = sendTime +// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { +// return utils.Wrap(err, "") +// } +// msgListToMongo = append(msgListToMongo, sMsg) +// } +// +// filter := bson.M{"uid": seqUid} +// log.NewDebug(operationID, "filter ", seqUid) +// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() +// log.NewWarn(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) +// if err != nil { +// sChat := UserChat{} +// sChat.UID = seqUid +// sChat.Msg = msgListToMongo +// +// if _, err = c.InsertOne(ctx, &sChat); err != nil { +// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) +// return utils.Wrap(err, "") +// } +// } else { +// log.NewDebug(operationID, "FindOneAndUpdate ok", filter) +// } +// +// log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime) +// return nil +//} + func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { var seqUid string newTime := getCurrentTimestampByMill() @@ -839,6 +884,9 @@ func generateWorkMomentCommentID(workMomentID string) string { func getCurrentTimestampByMill() int64 { return time.Now().UnixNano() / 1e6 } +func GetCurrentTimestampByMill() int64 { + return time.Now().UnixNano() / 1e6 +} func getSeqUid(uid string, seq uint32) string { seqSuffix := seq / singleGocMsgNum diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index 9996ea356..46318ac4a 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -68,6 +68,13 @@ func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) { return redis.Uint64(d.Exec("GET", key)) } +//set the largest Seq +func (d *DataBases) SetUserMaxSeq(uid string, maxSeq uint64) error { + key := userIncrSeq + uid + _, err := d.Exec("SET", key, maxSeq) + return err +} + //Set the user's minimum seq func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) { key := userMinSeq + uid diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index 800f5864b..63653c42f 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -9,7 +9,7 @@ type Statistics struct { AllCount *uint64 ModuleName string PrintArgs string - SleepTime int + SleepTime uint64 } func (s *Statistics) output() { @@ -17,6 +17,7 @@ func (s *Statistics) output() { t := time.NewTicker(time.Duration(s.SleepTime) * time.Second) defer t.Stop() var sum uint64 + var timeIntervalNum uint64 for { sum = *s.AllCount select { @@ -27,12 +28,13 @@ func (s *Statistics) output() { } else { intervalCount = *s.AllCount - sum } - log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, intervalCount, "total:", *s.AllCount) + timeIntervalNum++ + log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, intervalCount, "total:", *s.AllCount, "intervalNum", timeIntervalNum, "avg", (*s.AllCount)/(timeIntervalNum)/s.SleepTime) } } func NewStatistics(allCount *uint64, moduleName, printArgs string, sleepTime int) *Statistics { - p := &Statistics{AllCount: allCount, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs} + p := &Statistics{AllCount: allCount, ModuleName: moduleName, SleepTime: uint64(sleepTime), PrintArgs: printArgs} go p.output() return p } diff --git a/pkg/utils/map.go b/pkg/utils/map.go index 66ca27471..7a5fb2d6b 100644 --- a/pkg/utils/map.go +++ b/pkg/utils/map.go @@ -124,6 +124,7 @@ func GetSwitchFromOptions(Options map[string]bool, key string) (result bool) { } return false } + func SetSwitchFromOptions(options map[string]bool, key string, value bool) { if options == nil { options = make(map[string]bool, 5)