From d2309aab3230f928f071bf50e170c3a3785deb98 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Mon, 13 Feb 2023 16:15:16 +0800 Subject: [PATCH] Error code standardization --- internal/rpc/msg/send_msg.go | 62 ++++++++--------------------------- internal/rpc/msg/send_pull.go | 50 +++++++++++++--------------- 2 files changed, 37 insertions(+), 75 deletions(-) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index dc1f9b481..c1ceb3133 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -7,6 +7,7 @@ import ( rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/common/tokenverify" + "Open_IM/pkg/common/tracelog" cacheRpc "Open_IM/pkg/proto/cache" "Open_IM/pkg/proto/msg" pbPush "Open_IM/pkg/proto/push" @@ -359,7 +360,7 @@ func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, return replay, nil } -func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) (bool, error) { +func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *msg.SendMsgReq) (bool, error) { opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID) if err != nil { log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error()) @@ -481,44 +482,8 @@ func valueCopy(pb *pbChat.SendMsgReq) *pbChat.SendMsgReq { return &pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} } -func (rpc *msgServer) sendMsgToGroup(ctx context.Context, list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { - // log.Debug(pb.OperationID, "split userID ", list) - offlinePushInfo := sdkws.OfflinePushInfo{} - if pb.MsgData.OfflinePushInfo != nil { - offlinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgData := sdkws.MsgData{} - msgData = *pb.MsgData - msgData.OfflinePushInfo = &offlinePushInfo - - groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} - msgToMQGroup := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: &msgData} - for _, v := range list { - options := make(map[string]bool, 10) - for key, value := range pb.MsgData.Options { - options[key] = value - } - groupPB.MsgData.RecvID = v - groupPB.MsgData.Options = options - isSend := modifyMessageByUserMessageReceiveOpt(v, msgData.GroupID, constant.GroupChatType, &groupPB) - if isSend { - msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToWriter, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToWriter(ctx, &msgToMQGroup, v, status) - if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - *sendTag = true - } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v) - } - } - wg.Done() -} - -func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, sendTag *bool, wg *sync.WaitGroup) { - msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} +func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, wg *sync.WaitGroup) error { + msgToMQGroup := msg.MsgDataToMQ{OperationID: tracelog.GetOperationID(ctx), MsgData: groupPB.MsgData} tempOptions := make(map[string]bool, 1) for k, v := range groupPB.MsgData.Options { tempOptions[k] = v @@ -530,21 +495,22 @@ func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []str options[k] = v } groupPB.MsgData.Options = options - isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) + isSend, err := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, groupPB) + if err != nil { + wg.Done() + return err + } if isSend { if v == "" || groupPB.MsgData.SendID == "" { - log.Error(msgToMQGroup.OperationID, "sendMsgToGroupOptimization userID nil ", msgToMQGroup.String()) - continue + return constant.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty") } - err := rpc.sendMsgToWriter(ctx, &msgToMQGroup, v, status) + err := m.MsgInterface.MsgToMQ(ctx, v, &msgToMQGroup) if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - *sendTag = true + wg.Done() + return err } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToWriter, ", v) } } wg.Done() + return nil } diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 8998d2438..1900cbe85 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -10,7 +10,6 @@ import ( "context" go_redis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" - "strings" "sync" ) @@ -142,12 +141,21 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( var sendTag bool var split = 20 msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} - mErr := make(map[string]error, 0) + mErr := make([]error, 0) + var mutex sync.RWMutex remain := len(memberUserIDList) % split for i := 0; i < len(memberUserIDList)/split; i++ { wg.Add(1) tmp := valueCopy(req) - go m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &sendTag, &wg) + go func() { + err := m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &sendTag, &wg) + if err != nil { + mutex.Lock() + mErr = append(mErr, err) + mutex.Unlock() + } + + }() } if remain > 0 { wg.Add(1) @@ -176,16 +184,15 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( var atUserID []string conversation := pbConversation.Conversation{ OwnerUserID: req.MsgData.SendID, - ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), + ConversationID: utils.GetConversationIDBySessionType(req.MsgData.GroupID, constant.GroupChatType), ConversationType: constant.GroupChatType, GroupID: req.MsgData.GroupID, } conversationReq.Conversation = &conversation - conversationReq.OperationID = pb.OperationID conversationReq.FieldType = constant.FieldGroupAtType tagAll := utils.IsContain(constant.AtAllString, req.MsgData.AtUserIDList) if tagAll { - atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) + atUserID = utils.DifferenceString([]string{constant.AtAllString}, req.MsgData.AtUserIDList) if len(atUserID) == 0 { //just @everyone conversationReq.UserIDList = memberUserIDList conversation.GroupAtType = constant.AtAll @@ -198,46 +205,35 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( conversationReq.UserIDList = req.MsgData.AtUserIDList conversation.GroupAtType = constant.AtMe } - etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) + + _, err := m.ModifyConversationField(context.Background(), &conversationReq) if err != nil { - errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pb.OperationID, errMsg) return } - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) - if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) - } + if tag { conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList) conversation.GroupAtType = constant.AtAll - etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, pb.OperationID) - if etcdConn == nil { - errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pb.OperationID, errMsg) - return - } - client := pbConversation.NewConversationClient(etcdConn) - conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq) + _, err := m.ModifyConversationField(context.Background(), &conversationReq) if err != nil { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error()) - } else if conversationReply.CommonResp.ErrCode != 0 { - log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String()) + return } } }() } + // promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) resp.SendTime = msgToMQSingle.MsgData.SendTime resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID return resp, nil +} + +func (m *msgServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) { } + func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) { resp = &msg.SendMsgResp{} flag := isMessageHasReadEnabled(req.MsgData)