diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index dc1f9b481..e12f4440a 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -7,17 +7,15 @@ import ( rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/common/tokenverify" + "Open_IM/pkg/common/tracelog" cacheRpc "Open_IM/pkg/proto/cache" "Open_IM/pkg/proto/msg" - pbPush "Open_IM/pkg/proto/push" - pbRelay "Open_IM/pkg/proto/relay" sdkws "Open_IM/pkg/proto/sdkws" "Open_IM/pkg/utils" "context" "errors" "math/rand" "strconv" - "strings" "sync" "time" @@ -311,58 +309,15 @@ func (rpc *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { } } -func (rpc *msgServer) sendMsgToWriter(ctx context.Context, m *pbChat.MsgDataToMQ, key string, status string) error { - switch status { - case constant.OnlineStatus: - if m.MsgData.ContentType == constant.SignalingNotification { - rpcPushMsg := pbPush.PushMsgReq{OperationID: m.OperationID, MsgData: m.MsgData, PushToUserID: key} - grpcConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImPushName) - if err != nil { - return err - } - msgClient := pbPush.NewPushMsgServiceClient(grpcConn) - _, err = msgClient.PushMsg(context.Background(), &rpcPushMsg) - if err != nil { - log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) - return err - } else { - return nil - } - } - pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID) - if err != nil { - log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) - } else { - // log.NewWarn(m.OperationID, "sendMsgToWriter client msgID ", m.MsgData.ClientMsgID) - } - return err - case constant.OfflineStatus: - pid, offset, err := rpc.messageWriter.SendMessage(m, key, m.OperationID) - if err != nil { - log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) - } - return err - } - return errors.New("status error") -} func GetMsgID(sendID string) string { t := time.Now().Format("2006-01-02 15:04:05") return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int())) } -func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) { - replay.ErrCode = errCode - replay.ErrMsg = errMsg - replay.ServerMsgID = serverMsgID - replay.ClientMsgID = pb.MsgData.ClientMsgID - replay.SendTime = sendTime - return replay, nil -} - -func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) (bool, error) { +func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *msg.SendMsgReq) (bool, error) { opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID) if err != nil { - log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error()) + return false, err } switch opt { case constant.ReceiveMessage: @@ -400,71 +355,7 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i return true } -func modifyMessageByUserMessageReceiveOptoptimization(userID, sourceID string, sessionType int, operationID string, options *map[string]bool) bool { - conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) - opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID) - if err != nil && err != go_redis.Nil { - log.NewError(operationID, "GetSingleConversationMsgOpt from redis err", userID, conversationID, err.Error()) - return true - } - - switch opt { - case constant.ReceiveMessage: - return true - case constant.NotReceiveMessage: - return false - case constant.ReceiveNotNotifyMessage: - if *options == nil { - *options = make(map[string]bool, 10) - } - utils.SetSwitchFromOptions(*options, constant.IsOfflinePush, false) - return true - } - return true -} - -func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, operationID string) { - var onllUserIDList, offlUserIDList []string - var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult - req := &pbRelay.GetUsersOnlineStatusReq{} - req.UserIDList = memberList - req.OperationID = operationID - req.OpUserID = config.Config.Manager.AppManagerUid[0] - flag := false - grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), operationID) - for _, v := range grpcCons { - client := pbRelay.NewRelayClient(v) - reply, err := client.GetUsersOnlineStatus(context.Background(), req) - if err != nil { - log.NewError(operationID, "GetUsersOnlineStatus rpc err", req.String(), err.Error()) - continue - } else { - if reply.ErrCode == 0 { - wsResult = append(wsResult, reply.SuccessResult...) - } - } - } - log.NewInfo(operationID, "call GetUsersOnlineStatus rpc server is success", wsResult) - //Online data merge of each node - for _, v1 := range memberList { - flag = false - - for _, v2 := range wsResult { - if v2.UserID == v1 { - flag = true - onllUserIDList = append(onllUserIDList, v1) - } - - } - if !flag { - offlUserIDList = append(offlUserIDList, v1) - } - } - m[constant.OnlineStatus] = onllUserIDList - m[constant.OfflineStatus] = offlUserIDList -} - -func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq { +func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq { offlinePushInfo := sdkws.OfflinePushInfo{} if pb.MsgData.OfflinePushInfo != nil { offlinePushInfo = *pb.MsgData.OfflinePushInfo @@ -478,47 +369,11 @@ func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq { options[key] = value } msgData.Options = options - return &pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} + return &msg.SendMsgReq{MsgData: &msgData} } -func (rpc *msgServer) sendMsgToGroup(ctx context.Context, list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { - // log.Debug(pb.OperationID, "split userID ", list) - offlinePushInfo := sdkws.OfflinePushInfo{} - if pb.MsgData.OfflinePushInfo != nil { - offlinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgData := sdkws.MsgData{} - msgData = *pb.MsgData - msgData.OfflinePushInfo = &offlinePushInfo - - groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} - msgToMQGroup := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} - for _, v := range list { - options := make(map[string]bool, 10) - for key, value := range pb.MsgData.Options { - options[key] = value - } - groupPB.MsgData.RecvID = v - groupPB.MsgData.Options = options - isSend := modifyMessageByUserMessageReceiveOpt(v, msgData.GroupID, constant.GroupChatType, &groupPB) - if isSend { - msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToWriter, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToWriter(ctx, &msgToMQGroup, v, status) - if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - *sendTag = true - } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v) - } - } - wg.Done() -} - -func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, sendTag *bool, wg *sync.WaitGroup) { - msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} +func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, wg *sync.WaitGroup) error { + msgToMQGroup := msg.MsgDataToMQ{OperationID: tracelog.GetOperationID(ctx), MsgData: groupPB.MsgData} tempOptions := make(map[string]bool, 1) for k, v := range groupPB.MsgData.Options { tempOptions[k] = v @@ -530,21 +385,22 @@ func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []str options[k] = v } groupPB.MsgData.Options = options - isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) + isSend, err := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) + if err != nil { + wg.Done() + return err + } if isSend { if v == "" || groupPB.MsgData.SendID == "" { - log.Error(msgToMQGroup.OperationID, "sendMsgToGroupOptimization userID nil ", msgToMQGroup.String()) - continue + return constant.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty") } - err := rpc.sendMsgToWriter(ctx, &msgToMQGroup, v, status) + err := m.MsgInterface.MsgToMQ(ctx, v, &msgToMQGroup) if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - *sendTag = true + wg.Done() + return err } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v) } } wg.Done() + return nil } diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 480603da9..51298fe99 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -9,7 +9,6 @@ import ( "Open_IM/pkg/utils" "context" "github.com/golang/protobuf/proto" - "strings" "sync" ) @@ -138,20 +137,28 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( //split parallel send var wg sync.WaitGroup - var sendTag bool var split = 20 msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} - mErr := make(map[string]error, 0) + mErr := make([]error, 0) + var mutex sync.RWMutex remain := len(memberUserIDList) % split for i := 0; i < len(memberUserIDList)/split; i++ { wg.Add(1) tmp := valueCopy(req) - go m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &sendTag, &wg) + go func() { + err := m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &wg) + if err != nil { + mutex.Lock() + mErr = append(mErr, err) + mutex.Unlock() + } + + }() } if remain > 0 { wg.Add(1) tmp := valueCopy(req) - go m.sendMsgToGroupOptimization(ctx, memberUserIDList[split*(len(memberUserIDList)/split):], tmp, &sendTag, &wg) + go m.sendMsgToGroupOptimization(ctx, memberUserIDList[split*(len(memberUserIDList)/split):], tmp, &wg) } wg.Wait() @@ -175,16 +182,15 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( var atUserID []string conversation := pbConversation.Conversation{ OwnerUserID: req.MsgData.SendID, - ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), + ConversationID: utils.GetConversationIDBySessionType(req.MsgData.GroupID, constant.GroupChatType), ConversationType: constant.GroupChatType, GroupID: req.MsgData.GroupID, } conversationReq.Conversation = &conversation - conversationReq.OperationID = pb.OperationID conversationReq.FieldType = constant.FieldGroupAtType tagAll := utils.IsContain(constant.AtAllString, req.MsgData.AtUserIDList) if tagAll { - atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) + atUserID = utils.DifferenceString([]string{constant.AtAllString}, req.MsgData.AtUserIDList) if len(atUserID) == 0 { //just @everyone conversationReq.UserIDList = memberUserIDList conversation.GroupAtType = constant.AtAll @@ -197,46 +203,35 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( conversationReq.UserIDList = req.MsgData.AtUserIDList conversation.GroupAtType = constant.AtMe } - etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) + + _, err := m.ModifyConversationField(context.Background(), &conversationReq) if err != nil { - errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pb.OperationID, errMsg) return } - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) - if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) - } + if tag { conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList) conversation.GroupAtType = constant.AtAll - etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, pb.OperationID) - if etcdConn == nil { - errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pb.OperationID, errMsg) - return - } - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) + _, err := m.ModifyConversationField(context.Background(), &conversationReq) if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + return } } }() } + // promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) resp.SendTime = msgToMQSingle.MsgData.SendTime resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID return resp, nil +} + +func (m *msgServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) { } + func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) { resp = &msg.SendMsgResp{} flag := isMessageHasReadEnabled(req.MsgData)