diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 3ebddc9ef..c6848a82f 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -33,7 +33,7 @@ func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbapi.CommonCallbackReq return req } -func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { +func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) error { callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { return callbackResp @@ -65,7 +65,7 @@ func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackRes return callbackResp } -func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { +func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { return callbackResp @@ -88,7 +88,7 @@ func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp return callbackResp } -func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { +func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) error { callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable { return callbackResp @@ -117,7 +117,7 @@ func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp return callbackResp } -func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { +func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { return callbackResp @@ -140,7 +140,7 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp return callbackResp } -func callbackMsgModify(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { +func callbackMsgModify(msg *pbChat.SendMsgReq) (err error) { log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 380f91d7f..dc1f9b481 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -66,24 +66,6 @@ type MsgCallBackResp struct { } } -func isMessageHasReadEnabled(pb *msg.SendMsgReq) (bool, int32, string) { - switch pb.MsgData.ContentType { - case constant.HasReadReceipt: - if config.Config.SingleMessageHasReadReceiptEnable { - return true, 0, "" - } else { - return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg - } - case constant.GroupHasReadReceipt: - if config.Config.GroupMessageHasReadReceiptEnable { - return true, 0, "" - } else { - return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg - } - } - return true, 0, "" -} - func userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (isMute bool, isAdmin bool, err error) { groupMemberInfo, err := rocksCache.GetGroupMemberInfoFromCache(ctx, groupID, userID) if err != nil { @@ -107,7 +89,7 @@ func groupIsMuted(ctx context.Context, groupID string) (bool, error) { return false, nil } -func (rpc *msgServer) messageVerification(ctx context.Context, data *pbChat.SendMsgReq) (bool, int32, string, []string) { +func (rpc *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) ([]string, error) { switch data.MsgData.SessionType { case constant.SingleChatType: if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { @@ -314,12 +296,10 @@ func (rpc *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) case constant.HasReadReceipt: - log.Info("", "this is a test start", msg, msg.Options) utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) - log.Info("", "this is a test end", msg, msg.Options) case constant.Typing: utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false) utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false) @@ -379,11 +359,10 @@ func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, return replay, nil } -func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool { +func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) (bool, error) { opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID) if err != nil { log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error()) - } switch opt { case constant.ReceiveMessage: @@ -538,7 +517,7 @@ func (rpc *msgServer) sendMsgToGroup(ctx context.Context, list []string, pb pbCh wg.Done() } -func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { +func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, sendTag *bool, wg *sync.WaitGroup) { msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} tempOptions := make(map[string]bool, 1) for k, v := range groupPB.MsgData.Options { diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 77b204f7f..8998d2438 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -4,309 +4,265 @@ import ( "Open_IM/pkg/common/constant" promePkg "Open_IM/pkg/common/prometheus" pbConversation "Open_IM/pkg/proto/conversation" - pbChat "Open_IM/pkg/proto/msg" + "Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/utils" "context" go_redis "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" "strings" "sync" - "time" ) -func (m *msgServer) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { - replay := pbChat.SendMsgResp{} - - flag, errCode, errMsg := isMessageHasReadEnabled(pb) - if !flag { - return returnMsg(&replay, pb, errCode, errMsg, "", 0) - } - t1 := time.Now() - m.encapsulateMsgData(pb.MsgData) - log.Debug(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1)) - msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} +func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) // callback - t1 = time.Now() - callbackResp := callbackMsgModify(pb) - log.Debug(pb.OperationID, "callbackMsgModify ", callbackResp, "cost time: ", time.Since(t1)) - if callbackResp.ErrCode != 0 { - log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackMsgModify resp: ", callbackResp) + if err = callbackBeforeSendGroupMsg(req); err != nil { + return nil, err } - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp) - if callbackResp.ActionCode != constant.ActionAllow { - if callbackResp.ErrCode == 0 { - callbackResp.ErrCode = 201 - } - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackMsgModify result", "end rpc and return", pb.MsgData) - return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) + + if _, err = m.messageVerification(ctx, req); err != nil { + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) + return nil, err + } + msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} + err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle) + if err != nil { + return nil, err + } + // callback + if err = callbackAfterSendGroupMsg(req); err != nil { + return nil, err } - switch pb.MsgData.SessionType { - case constant.SingleChatType: - promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter) - // callback - t1 = time.Now() - callbackResp := callbackBeforeSendSingleMsg(pb) - log.Debug(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1)) - if callbackResp.ErrCode != 0 { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - if callbackResp.ErrCode == 0 { - callbackResp.ErrCode = 201 - } - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) - promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) - } - t1 = time.Now() - flag, errCode, errMsg, _ = rpc.messageVerification(ctx, pb) - log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) - if !flag { - return returnMsg(&replay, pb, errCode, errMsg, "", 0) - } - t1 = time.Now() - isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) - log.Info(pb.OperationID, "modifyMessageByUserMessageReceiveOpt ", " cost time: ", time.Since(t1)) - if isSend { - msgToMQSingle.MsgData = pb.MsgData - log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - t1 = time.Now() - err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) - log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1)) - if err1 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) - promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - } - if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - t1 = time.Now() - err2 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) - log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1)) - if err2 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) - promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - } - // callback - t1 = time.Now() - callbackResp = callbackAfterSendSingleMsg(pb) - log.Info(pb.OperationID, "callbackAfterSendSingleMsg ", " cost time: ", time.Since(t1)) - if callbackResp.ErrCode != 0 { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) - } - promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) - return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) - case constant.GroupChatType: - // callback - promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter) - callbackResp := callbackBeforeSendGroupMsg(pb) - if callbackResp.ErrCode != 0 { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg resp:", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - if callbackResp.ErrCode == 0 { - callbackResp.ErrCode = 201 - } - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) - promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) - } - var memberUserIDList []string - if flag, errCode, errMsg, memberUserIDList = rpc.messageVerification(ctx, pb); !flag { - promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, errCode, errMsg, "", 0) - } - log.Debug(pb.OperationID, "GetGroupAllMember userID list", memberUserIDList, "len: ", len(memberUserIDList)) - var addUidList []string - switch pb.MsgData.ContentType { - case constant.MemberKickedNotification: - var tips sdkws.TipsComm - var memberKickedTips sdkws.MemberKickedTips - err := proto.Unmarshal(pb.MsgData.Content, &tips) - if err != nil { - log.Error(pb.OperationID, "Unmarshal err", err.Error()) - } - err = proto.Unmarshal(tips.Detail, &memberKickedTips) - if err != nil { - log.Error(pb.OperationID, "Unmarshal err", err.Error()) - } - log.Info(pb.OperationID, "data is ", memberKickedTips) - for _, v := range memberKickedTips.KickedUserList { - addUidList = append(addUidList, v.UserID) - } - case constant.MemberQuitNotification: - addUidList = append(addUidList, pb.MsgData.SendID) - default: + promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) + resp.SendTime = msgToMQSingle.MsgData.SendTime + resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID + resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID + return resp, nil +} +func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { + msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} + err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle) + if err != nil { + return nil, err + } + if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle) + if err != nil { + return nil, err } - if len(addUidList) > 0 { - memberUserIDList = append(memberUserIDList, addUidList...) - } - m := make(map[string][]string, 2) - m[constant.OnlineStatus] = memberUserIDList - t1 = time.Now() + } - //split parallel send - var wg sync.WaitGroup - var sendTag bool - var split = 20 - for k, v := range m { - remain := len(v) % split - for i := 0; i < len(v)/split; i++ { - wg.Add(1) - tmp := valueCopy(pb) - // go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) - go rpc.sendMsgToGroupOptimization(ctx, v[i*split:(i+1)*split], tmp, k, &sendTag, &wg) - } - if remain > 0 { - wg.Add(1) - tmp := valueCopy(pb) - // go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) - go rpc.sendMsgToGroupOptimization(ctx, v[split*(len(v)/split):], tmp, k, &sendTag, &wg) - } + resp.SendTime = msgToMQSingle.MsgData.SendTime + resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID + resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID + return resp, nil +} + +func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { + promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter) + if err = callbackBeforeSendSingleMsg(req); err != nil { + return nil, err + } + _, err = m.messageVerification(ctx, req) + if err != nil { + return nil, err + } + isSend, err := modifyMessageByUserMessageReceiveOpt(req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req) + if err != nil { + return nil, err + } + msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} + if isSend { + err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle) + if err != nil { + return nil, constant.ErrInternalServer.Wrap("insert to mq") } - log.Debug(pb.OperationID, "send msg cost time22 ", time.Since(t1), pb.MsgData.ClientMsgID, "uidList : ", len(addUidList)) - //wg.Add(1) - //go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) - wg.Wait() - t1 = time.Now() - // callback - callbackResp = callbackAfterSendGroupMsg(pb) - if callbackResp.ErrCode != 0 { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg resp: ", callbackResp) + } + if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle) + if err != nil { + return nil, constant.ErrInternalServer.Wrap("insert to mq") } - if !sendTag { - log.NewWarn(pb.OperationID, "send tag is ", sendTag) - promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } else { - if pb.MsgData.ContentType == constant.AtText { - go func() { - var conversationReq pbConversation.ModifyConversationFieldReq - var tag bool - var atUserID []string - conversation := pbConversation.Conversation{ - OwnerUserID: pb.MsgData.SendID, - ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), - ConversationType: constant.GroupChatType, - GroupID: pb.MsgData.GroupID, - } - conversationReq.Conversation = &conversation - conversationReq.OperationID = pb.OperationID - conversationReq.FieldType = constant.FieldGroupAtType - tagAll := utils.IsContain(constant.AtAllString, pb.MsgData.AtUserIDList) - if tagAll { - atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) - if len(atUserID) == 0 { //just @everyone - conversationReq.UserIDList = memberUserIDList - conversation.GroupAtType = constant.AtAll - } else { //@Everyone and @other people - conversationReq.UserIDList = atUserID - conversation.GroupAtType = constant.AtAllAtMe - tag = true - } - } else { - conversationReq.UserIDList = pb.MsgData.AtUserIDList - conversation.GroupAtType = constant.AtMe - } - etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) - if err != nil { - errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pb.OperationID, errMsg) - return - } - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) - if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) - } - if tag { - conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList) - conversation.GroupAtType = constant.AtAll - etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, pb.OperationID) - if etcdConn == nil { - errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pb.OperationID, errMsg) - return - } - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) - if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) - } - } - }() - } - log.Debug(pb.OperationID, "send msg cost time3 ", time.Since(t1), pb.MsgData.ClientMsgID) - promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) - return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) + } + err = callbackAfterSendSingleMsg(req) + if err != nil { + return nil, err + } + promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) + resp.SendTime = msgToMQSingle.MsgData.SendTime + resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID + resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID + return resp, nil +} + +func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { + // callback + promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter) + err = callbackBeforeSendGroupMsg(req) + if err != nil { + return nil, err + } + + var memberUserIDList []string + if memberUserIDList, err = m.messageVerification(ctx, req); err != nil { + promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) + return nil, err + } + + var addUidList []string + switch req.MsgData.ContentType { + case constant.MemberKickedNotification: + var tips sdkws.TipsComm + var memberKickedTips sdkws.MemberKickedTips + err := proto.Unmarshal(req.MsgData.Content, &tips) + if err != nil { + } - case constant.NotificationChatType: - t1 = time.Now() - msgToMQSingle.MsgData = pb.MsgData - log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) - if err1 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + err = proto.Unmarshal(tips.Detail, &memberKickedTips) + if err != nil { + } - if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) - if err2 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } + for _, v := range memberKickedTips.KickedUserList { + addUidList = append(addUidList, v.UserID) } - - log.Debug(pb.OperationID, "send msg cost time ", time.Since(t1), pb.MsgData.ClientMsgID) - return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) - case constant.SuperGroupChatType: - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) - // callback - callbackResp := callbackBeforeSendGroupMsg(pb) - if callbackResp.ErrCode != 0 { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg resp: ", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - if callbackResp.ErrCode == 0 { - callbackResp.ErrCode = 201 - } - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp) - return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) - } - if flag, errCode, errMsg, _ = rpc.messageVerification(ctx, pb); !flag { - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, errCode, errMsg, "", 0) - } - msgToMQSingle.MsgData = pb.MsgData - log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus) - if err1 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - // callback - callbackResp = callbackAfterSendGroupMsg(pb) - if callbackResp.ErrCode != 0 { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp) - } - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) - return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) + case constant.MemberQuitNotification: + addUidList = append(addUidList, req.MsgData.SendID) default: - return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0) + } + if len(addUidList) > 0 { + memberUserIDList = append(memberUserIDList, addUidList...) + } + + //split parallel send + var wg sync.WaitGroup + var sendTag bool + var split = 20 + msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} + mErr := make(map[string]error, 0) + remain := len(memberUserIDList) % split + for i := 0; i < len(memberUserIDList)/split; i++ { + wg.Add(1) + tmp := valueCopy(req) + go m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &sendTag, &wg) + } + if remain > 0 { + wg.Add(1) + tmp := valueCopy(req) + go m.sendMsgToGroupOptimization(ctx, memberUserIDList[split*(len(memberUserIDList)/split):], tmp, &sendTag, &wg) + } + + wg.Wait() + + // callback + err = callbackAfterSendGroupMsg(req) + if err != nil { + return nil, err + } + + for _, v := range mErr { + if v != nil { + return nil, v + } + } + + if req.MsgData.ContentType == constant.AtText { + go func() { + var conversationReq pbConversation.ModifyConversationFieldReq + var tag bool + var atUserID []string + conversation := pbConversation.Conversation{ + OwnerUserID: req.MsgData.SendID, + ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), + ConversationType: constant.GroupChatType, + GroupID: req.MsgData.GroupID, + } + conversationReq.Conversation = &conversation + conversationReq.OperationID = pb.OperationID + conversationReq.FieldType = constant.FieldGroupAtType + tagAll := utils.IsContain(constant.AtAllString, req.MsgData.AtUserIDList) + if tagAll { + atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) + if len(atUserID) == 0 { //just @everyone + conversationReq.UserIDList = memberUserIDList + conversation.GroupAtType = constant.AtAll + } else { //@Everyone and @other people + conversationReq.UserIDList = atUserID + conversation.GroupAtType = constant.AtAllAtMe + tag = true + } + } else { + conversationReq.UserIDList = req.MsgData.AtUserIDList + conversation.GroupAtType = constant.AtMe + } + etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) + if err != nil { + errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" + log.NewError(pb.OperationID, errMsg) + return + } + client := pbConversation.NewConversationClient(etcdConn) + conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) + if err != nil { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) + } else if conversationReply.CommonResp.ErrCode != 0 { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + } + if tag { + conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList) + conversation.GroupAtType = constant.AtAll + etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, pb.OperationID) + if etcdConn == nil { + errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" + log.NewError(pb.OperationID, errMsg) + return + } + client := pbConversation.NewConversationClient(etcdConn) + conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) + if err != nil { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) + } else if conversationReply.CommonResp.ErrCode != 0 { + log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + } + } + }() + } + + promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) + resp.SendTime = msgToMQSingle.MsgData.SendTime + resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID + resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID + return resp, nil + +} +func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) { + resp = &msg.SendMsgResp{} + flag := isMessageHasReadEnabled(req.MsgData) + if !flag { + return nil, constant.ErrMessageHasReadDisable.Wrap() + } + m.encapsulateMsgData(req.MsgData) + if err := callbackMsgModify(req); err != nil { + return nil, err + } + switch req.MsgData.SessionType { + case constant.SingleChatType: + return m.sendMsgSingleChat(ctx, req) + case constant.GroupChatType: + return m.sendMsgGroupChat(ctx, req) + case constant.NotificationChatType: + return m.sendMsgNotification(ctx, req) + case constant.SuperGroupChatType: + return m.sendMsgSuperGroupChat(ctx, req) + default: + return nil, constant.ErrArgs.Wrap("unknown sessionType") } } -func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { +func (m *msgServer) GetMaxAndMinSeq(_ context.Context, in *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String()) resp := new(sdkws.GetMaxAndMinSeqResp) m := make(map[string]*sdkws.MaxAndMinSeq) diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index fd0f4e9b3..1bc863994 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -1,16 +1,25 @@ package msg import ( - "Open_IM/pkg/utils" - "github.com/go-redis/redis/v8" - "gorm.io/gorm" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/proto/sdkws" ) -func IsNotFound(err error) bool { - switch utils.Unwrap(err) { - case gorm.ErrRecordNotFound, redis.Nil: - return true - default: - return false +func isMessageHasReadEnabled(msgData *sdkws.MsgData) bool { + switch msgData.ContentType { + case constant.HasReadReceipt: + if config.Config.SingleMessageHasReadReceiptEnable { + return true + } else { + return false + } + case constant.GroupHasReadReceipt: + if config.Config.GroupMessageHasReadReceiptEnable { + return true + } else { + return false + } } + return true }