Error code standardization

This commit is contained in:
skiffer-git 2023-02-13 15:52:41 +08:00
parent 03a7048d31
commit 6b47c471f7
4 changed files with 261 additions and 317 deletions

View File

@ -33,7 +33,7 @@ func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbapi.CommonCallbackReq
return req return req
} }
func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) error {
callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable {
return callbackResp return callbackResp
@ -65,7 +65,7 @@ func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackRes
return callbackResp return callbackResp
} }
func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error {
callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
return callbackResp return callbackResp
@ -88,7 +88,7 @@ func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp
return callbackResp return callbackResp
} }
func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) error {
callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable { if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable {
return callbackResp return callbackResp
@ -117,7 +117,7 @@ func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp
return callbackResp return callbackResp
} }
func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error {
callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
return callbackResp return callbackResp
@ -140,7 +140,7 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp
return callbackResp return callbackResp
} }
func callbackMsgModify(msg *pbChat.SendMsgReq) cbapi.CommonCallbackResp { func callbackMsgModify(msg *pbChat.SendMsgReq) (err error) {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg)
callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID} callbackResp := cbapi.CommonCallbackResp{OperationID: msg.OperationID}
if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text {

View File

@ -66,24 +66,6 @@ type MsgCallBackResp struct {
} }
} }
func isMessageHasReadEnabled(pb *msg.SendMsgReq) (bool, int32, string) {
switch pb.MsgData.ContentType {
case constant.HasReadReceipt:
if config.Config.SingleMessageHasReadReceiptEnable {
return true, 0, ""
} else {
return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg
}
case constant.GroupHasReadReceipt:
if config.Config.GroupMessageHasReadReceiptEnable {
return true, 0, ""
} else {
return false, constant.ErrMessageHasReadDisable.ErrCode, constant.ErrMessageHasReadDisable.ErrMsg
}
}
return true, 0, ""
}
func userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (isMute bool, isAdmin bool, err error) { func userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (isMute bool, isAdmin bool, err error) {
groupMemberInfo, err := rocksCache.GetGroupMemberInfoFromCache(ctx, groupID, userID) groupMemberInfo, err := rocksCache.GetGroupMemberInfoFromCache(ctx, groupID, userID)
if err != nil { if err != nil {
@ -107,7 +89,7 @@ func groupIsMuted(ctx context.Context, groupID string) (bool, error) {
return false, nil return false, nil
} }
func (rpc *msgServer) messageVerification(ctx context.Context, data *pbChat.SendMsgReq) (bool, int32, string, []string) { func (rpc *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) ([]string, error) {
switch data.MsgData.SessionType { switch data.MsgData.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
@ -314,12 +296,10 @@ func (rpc *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
case constant.HasReadReceipt: case constant.HasReadReceipt:
log.Info("", "this is a test start", msg, msg.Options)
utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
log.Info("", "this is a test end", msg, msg.Options)
case constant.Typing: case constant.Typing:
utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false) utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false)
utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false) utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false)
@ -379,11 +359,10 @@ func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32,
return replay, nil return replay, nil
} }
func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool { func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) (bool, error) {
opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID) opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID)
if err != nil { if err != nil {
log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error()) log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error())
} }
switch opt { switch opt {
case constant.ReceiveMessage: case constant.ReceiveMessage:
@ -538,7 +517,7 @@ func (rpc *msgServer) sendMsgToGroup(ctx context.Context, list []string, pb pbCh
wg.Done() wg.Done()
} }
func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { func (rpc *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, sendTag *bool, wg *sync.WaitGroup) {
msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData}
tempOptions := make(map[string]bool, 1) tempOptions := make(map[string]bool, 1)
for k, v := range groupPB.MsgData.Options { for k, v := range groupPB.MsgData.Options {

View File

@ -4,309 +4,265 @@ import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prometheus"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
pbChat "Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws" "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils"
"context" "context"
go_redis "github.com/go-redis/redis/v8" go_redis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"strings" "strings"
"sync" "sync"
"time"
) )
func (m *msgServer) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
replay := pbChat.SendMsgResp{} promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
flag, errCode, errMsg := isMessageHasReadEnabled(pb)
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
t1 := time.Now()
m.encapsulateMsgData(pb.MsgData)
log.Debug(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1))
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
// callback // callback
t1 = time.Now() if err = callbackBeforeSendGroupMsg(req); err != nil {
callbackResp := callbackMsgModify(pb) return nil, err
log.Debug(pb.OperationID, "callbackMsgModify ", callbackResp, "cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 {
log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackMsgModify resp: ", callbackResp)
} }
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp)
if callbackResp.ActionCode != constant.ActionAllow { if _, err = m.messageVerification(ctx, req); err != nil {
if callbackResp.ErrCode == 0 { promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
callbackResp.ErrCode = 201 return nil, err
} }
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackMsgModify result", "end rpc and return", pb.MsgData) msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle)
if err != nil {
return nil, err
}
// callback
if err = callbackAfterSendGroupMsg(req); err != nil {
return nil, err
} }
switch pb.MsgData.SessionType {
case constant.SingleChatType:
promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter)
// callback
t1 = time.Now()
callbackResp := callbackBeforeSendSingleMsg(pb)
log.Debug(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
t1 = time.Now()
flag, errCode, errMsg, _ = rpc.messageVerification(ctx, pb)
log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
t1 = time.Now()
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
log.Info(pb.OperationID, "modifyMessageByUserMessageReceiveOpt ", " cost time: ", time.Since(t1))
if isSend {
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
t1 = time.Now()
err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1))
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
t1 = time.Now()
err2 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
log.Info(pb.OperationID, "sendMsgToWriter ", " cost time: ", time.Since(t1))
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
// callback
t1 = time.Now()
callbackResp = callbackAfterSendSingleMsg(pb)
log.Info(pb.OperationID, "callbackAfterSendSingleMsg ", " cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
}
promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.GroupChatType:
// callback
promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter)
callbackResp := callbackBeforeSendGroupMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg resp:", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
var memberUserIDList []string
if flag, errCode, errMsg, memberUserIDList = rpc.messageVerification(ctx, pb); !flag {
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
log.Debug(pb.OperationID, "GetGroupAllMember userID list", memberUserIDList, "len: ", len(memberUserIDList))
var addUidList []string
switch pb.MsgData.ContentType {
case constant.MemberKickedNotification:
var tips sdkws.TipsComm
var memberKickedTips sdkws.MemberKickedTips
err := proto.Unmarshal(pb.MsgData.Content, &tips)
if err != nil {
log.Error(pb.OperationID, "Unmarshal err", err.Error())
}
err = proto.Unmarshal(tips.Detail, &memberKickedTips)
if err != nil {
log.Error(pb.OperationID, "Unmarshal err", err.Error())
}
log.Info(pb.OperationID, "data is ", memberKickedTips)
for _, v := range memberKickedTips.KickedUserList {
addUidList = append(addUidList, v.UserID)
}
case constant.MemberQuitNotification:
addUidList = append(addUidList, pb.MsgData.SendID)
default: promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
}
func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle)
if err != nil {
return nil, err
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
err = m.MsgInterface.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle)
if err != nil {
return nil, err
} }
if len(addUidList) > 0 { }
memberUserIDList = append(memberUserIDList, addUidList...)
}
m := make(map[string][]string, 2)
m[constant.OnlineStatus] = memberUserIDList
t1 = time.Now()
//split parallel send resp.SendTime = msgToMQSingle.MsgData.SendTime
var wg sync.WaitGroup resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
var sendTag bool resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
var split = 20 return resp, nil
for k, v := range m { }
remain := len(v) % split
for i := 0; i < len(v)/split; i++ { func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
wg.Add(1) promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter)
tmp := valueCopy(pb) if err = callbackBeforeSendSingleMsg(req); err != nil {
// go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) return nil, err
go rpc.sendMsgToGroupOptimization(ctx, v[i*split:(i+1)*split], tmp, k, &sendTag, &wg) }
} _, err = m.messageVerification(ctx, req)
if remain > 0 { if err != nil {
wg.Add(1) return nil, err
tmp := valueCopy(pb) }
// go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) isSend, err := modifyMessageByUserMessageReceiveOpt(req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req)
go rpc.sendMsgToGroupOptimization(ctx, v[split*(len(v)/split):], tmp, k, &sendTag, &wg) if err != nil {
} return nil, err
}
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
if isSend {
err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle)
if err != nil {
return nil, constant.ErrInternalServer.Wrap("insert to mq")
} }
log.Debug(pb.OperationID, "send msg cost time22 ", time.Since(t1), pb.MsgData.ClientMsgID, "uidList : ", len(addUidList)) }
//wg.Add(1) if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
//go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) err = m.MsgInterface.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle)
wg.Wait() if err != nil {
t1 = time.Now() return nil, constant.ErrInternalServer.Wrap("insert to mq")
// callback
callbackResp = callbackAfterSendGroupMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg resp: ", callbackResp)
} }
if !sendTag { }
log.NewWarn(pb.OperationID, "send tag is ", sendTag) err = callbackAfterSendSingleMsg(req)
promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) return nil, err
} else { }
if pb.MsgData.ContentType == constant.AtText { promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter)
go func() { resp.SendTime = msgToMQSingle.MsgData.SendTime
var conversationReq pbConversation.ModifyConversationFieldReq resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
var tag bool resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
var atUserID []string return resp, nil
conversation := pbConversation.Conversation{ }
OwnerUserID: pb.MsgData.SendID,
ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType), func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
ConversationType: constant.GroupChatType, // callback
GroupID: pb.MsgData.GroupID, promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter)
} err = callbackBeforeSendGroupMsg(req)
conversationReq.Conversation = &conversation if err != nil {
conversationReq.OperationID = pb.OperationID return nil, err
conversationReq.FieldType = constant.FieldGroupAtType }
tagAll := utils.IsContain(constant.AtAllString, pb.MsgData.AtUserIDList)
if tagAll { var memberUserIDList []string
atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList) if memberUserIDList, err = m.messageVerification(ctx, req); err != nil {
if len(atUserID) == 0 { //just @everyone promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter)
conversationReq.UserIDList = memberUserIDList return nil, err
conversation.GroupAtType = constant.AtAll }
} else { //@Everyone and @other people
conversationReq.UserIDList = atUserID var addUidList []string
conversation.GroupAtType = constant.AtAllAtMe switch req.MsgData.ContentType {
tag = true case constant.MemberKickedNotification:
} var tips sdkws.TipsComm
} else { var memberKickedTips sdkws.MemberKickedTips
conversationReq.UserIDList = pb.MsgData.AtUserIDList err := proto.Unmarshal(req.MsgData.Content, &tips)
conversation.GroupAtType = constant.AtMe if err != nil {
}
etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName)
if err != nil {
errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(pb.OperationID, errMsg)
return
}
client := pbConversation.NewConversationClient(etcdConn)
conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq)
if err != nil {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error())
} else if conversationReply.CommonResp.ErrCode != 0 {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String())
}
if tag {
conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList)
conversation.GroupAtType = constant.AtAll
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, pb.OperationID)
if etcdConn == nil {
errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(pb.OperationID, errMsg)
return
}
client := pbConversation.NewConversationClient(etcdConn)
conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq)
if err != nil {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error())
} else if conversationReply.CommonResp.ErrCode != 0 {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String())
}
}
}()
}
log.Debug(pb.OperationID, "send msg cost time3 ", time.Since(t1), pb.MsgData.ClientMsgID)
promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
} }
case constant.NotificationChatType: err = proto.Unmarshal(tips.Detail, &memberKickedTips)
t1 = time.Now() if err != nil {
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
} }
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself for _, v := range memberKickedTips.KickedUserList {
err2 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) addUidList = append(addUidList, v.UserID)
if err2 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
} }
case constant.MemberQuitNotification:
log.Debug(pb.OperationID, "send msg cost time ", time.Since(t1), pb.MsgData.ClientMsgID) addUidList = append(addUidList, req.MsgData.SendID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.SuperGroupChatType:
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
// callback
callbackResp := callbackBeforeSendGroupMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSuperGroupMsg result", "end rpc and return", callbackResp)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
}
if flag, errCode, errMsg, _ = rpc.messageVerification(ctx, pb); !flag {
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
}
msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToWriter(ctx, &msgToMQSingle, msgToMQSingle.MsgData.GroupID, constant.OnlineStatus)
if err1 != nil {
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String())
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
// callback
callbackResp = callbackAfterSendGroupMsg(pb)
if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSuperGroupMsg resp: ", callbackResp)
}
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
default: default:
return returnMsg(&replay, pb, 203, "unknown sessionType", "", 0) }
if len(addUidList) > 0 {
memberUserIDList = append(memberUserIDList, addUidList...)
}
//split parallel send
var wg sync.WaitGroup
var sendTag bool
var split = 20
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
mErr := make(map[string]error, 0)
remain := len(memberUserIDList) % split
for i := 0; i < len(memberUserIDList)/split; i++ {
wg.Add(1)
tmp := valueCopy(req)
go m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &sendTag, &wg)
}
if remain > 0 {
wg.Add(1)
tmp := valueCopy(req)
go m.sendMsgToGroupOptimization(ctx, memberUserIDList[split*(len(memberUserIDList)/split):], tmp, &sendTag, &wg)
}
wg.Wait()
// callback
err = callbackAfterSendGroupMsg(req)
if err != nil {
return nil, err
}
for _, v := range mErr {
if v != nil {
return nil, v
}
}
if req.MsgData.ContentType == constant.AtText {
go func() {
var conversationReq pbConversation.ModifyConversationFieldReq
var tag bool
var atUserID []string
conversation := pbConversation.Conversation{
OwnerUserID: req.MsgData.SendID,
ConversationID: utils.GetConversationIDBySessionType(pb.MsgData.GroupID, constant.GroupChatType),
ConversationType: constant.GroupChatType,
GroupID: req.MsgData.GroupID,
}
conversationReq.Conversation = &conversation
conversationReq.OperationID = pb.OperationID
conversationReq.FieldType = constant.FieldGroupAtType
tagAll := utils.IsContain(constant.AtAllString, req.MsgData.AtUserIDList)
if tagAll {
atUserID = utils.DifferenceString([]string{constant.AtAllString}, pb.MsgData.AtUserIDList)
if len(atUserID) == 0 { //just @everyone
conversationReq.UserIDList = memberUserIDList
conversation.GroupAtType = constant.AtAll
} else { //@Everyone and @other people
conversationReq.UserIDList = atUserID
conversation.GroupAtType = constant.AtAllAtMe
tag = true
}
} else {
conversationReq.UserIDList = req.MsgData.AtUserIDList
conversation.GroupAtType = constant.AtMe
}
etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName)
if err != nil {
errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(pb.OperationID, errMsg)
return
}
client := pbConversation.NewConversationClient(etcdConn)
conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq)
if err != nil {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error())
} else if conversationReply.CommonResp.ErrCode != 0 {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String())
}
if tag {
conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList)
conversation.GroupAtType = constant.AtAll
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, pb.OperationID)
if etcdConn == nil {
errMsg := pb.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(pb.OperationID, errMsg)
return
}
client := pbConversation.NewConversationClient(etcdConn)
conversationReply, err := client.ModifyConversationField(context.Background(), &conversationReq)
if err != nil {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), err.Error())
} else if conversationReply.CommonResp.ErrCode != 0 {
log.NewError(conversationReq.OperationID, "ModifyConversationField rpc failed, ", conversationReq.String(), conversationReply.String())
}
}
}()
}
promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter)
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
}
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
resp = &msg.SendMsgResp{}
flag := isMessageHasReadEnabled(req.MsgData)
if !flag {
return nil, constant.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
if err := callbackMsgModify(req); err != nil {
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.GroupChatType:
return m.sendMsgGroupChat(ctx, req)
case constant.NotificationChatType:
return m.sendMsgNotification(ctx, req)
case constant.SuperGroupChatType:
return m.sendMsgSuperGroupChat(ctx, req)
default:
return nil, constant.ErrArgs.Wrap("unknown sessionType")
} }
} }
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { func (m *msgServer) GetMaxAndMinSeq(_ context.Context, in *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) {
log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String()) log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String())
resp := new(sdkws.GetMaxAndMinSeqResp) resp := new(sdkws.GetMaxAndMinSeqResp)
m := make(map[string]*sdkws.MaxAndMinSeq) m := make(map[string]*sdkws.MaxAndMinSeq)

View File

@ -1,16 +1,25 @@
package msg package msg
import ( import (
"Open_IM/pkg/utils" "Open_IM/pkg/common/config"
"github.com/go-redis/redis/v8" "Open_IM/pkg/common/constant"
"gorm.io/gorm" "Open_IM/pkg/proto/sdkws"
) )
func IsNotFound(err error) bool { func isMessageHasReadEnabled(msgData *sdkws.MsgData) bool {
switch utils.Unwrap(err) { switch msgData.ContentType {
case gorm.ErrRecordNotFound, redis.Nil: case constant.HasReadReceipt:
return true if config.Config.SingleMessageHasReadReceiptEnable {
default: return true
return false } else {
return false
}
case constant.GroupHasReadReceipt:
if config.Config.GroupMessageHasReadReceiptEnable {
return true
} else {
return false
}
} }
return true
} }