diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index f8c424dcc..96412fa40 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -16,7 +16,7 @@ const Msg = 2 const ConsumerMsgs = 3 const UserMessages = 4 const MongoMessages = 5 -const ChannelNum = 100 +const ChannelNum = 10 var ( persistentCH PersistentConsumerHandler diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 805eef805..138292fdc 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -9,6 +9,7 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" + server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" "errors" @@ -103,7 +104,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { msgList := msgChannelValue.msgList triggerID := msgChannelValue.triggerID storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - noStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + notStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) for _, v := range msgList { log.Debug(triggerID, "msg come to storage center", v.String()) @@ -114,7 +115,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { //log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) } else { if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { - noStoragepushMsgList = append(noStoragepushMsgList, v) + notStoragepushMsgList = append(notStoragepushMsgList, v) } } @@ -128,7 +129,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // return //} - log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList)) + log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList)) err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) @@ -146,7 +147,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { sendMessageToPush(x, msgChannelValue.userID) } - }(noStoragepushMsgList, storageMsgList) + }(notStoragepushMsgList, storageMsgList) } } @@ -175,6 +176,26 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { if err != nil { log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList) } + for _, v := range msgList { + if v.MsgData.ContentType == constant.DeleteMessageNotification { + tips := server_api_params.TipsComm{} + DeleteMessageTips := server_api_params.DeleteMessageTips{} + err := proto.Unmarshal(v.MsgData.Content, &tips) + if err != nil { + log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String()) + continue + } + err = proto.Unmarshal(tips.Detail, &DeleteMessageTips) + if err != nil { + log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String()) + continue + } + if err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { + log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error()) + } + + } + } } } } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 66a4439e3..d8d305f95 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -46,8 +46,8 @@ const ( GroupMsg = 201 //SysRelated - NotificationBegin = 1000 - + NotificationBegin = 1000 + DeleteMessageNotification = 1100 FriendApplicationApprovedNotification = 1201 //add_friend_response FriendApplicationRejectedNotification = 1202 //add_friend_response FriendApplicationNotification = 1203 //add_friend