diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index b0897aa3a..fddfa3034 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit b0897aa3abe719729c2ce099404f08806917bfe3 +Subproject commit fddfa30348d77fe7d5767ef410b1f30d042062bf diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 4ea631b07..013ee16cc 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -146,7 +146,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) rpc.encapsulateMsgData(pb.MsgData) log.Info("", "this is a test MsgData ", pb.MsgData) - msgToMQ := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} + msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} //options := utils.JsonStringToMap(pbData.Options) isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory) mReq := MsgCallBackReq{ @@ -186,18 +186,18 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { - msgToMQ.MsgData = pb.MsgData - log.NewInfo(msgToMQ.OperationID, msgToMQ) - err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID) + msgToMQSingle.MsgData = pb.MsgData + log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) if err1 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID) + if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) if err2 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } @@ -205,7 +205,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if err := callbackAfterSendSingleMsg(pb); err != nil { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error()) } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: // callback canSend, err := callbackBeforeSendGroupMsg(pb) @@ -268,14 +268,15 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S for i := 0; i < len(memberUserIDList)/split; i++ { wg.Add(1) go func(list []string) { + msgToMQGroup := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} for _, v := range list { pb.MsgData.RecvID = v isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) + msgToMQGroup.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQGroup, v) if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) } else { sendTag = true } @@ -288,13 +289,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S wg.Add(1) go func(list []string) { for _, v := range list { + msgToMQGroup := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} pb.MsgData.RecvID = v isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) + msgToMQGroup.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQGroup, v) if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) } else { sendTag = true } @@ -305,16 +307,16 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } wg.Wait() - log.Info(msgToMQ.OperationID, "addUidList", addUidList) + log.Info(msgToMQSingle.OperationID, "addUidList", addUidList) for _, v := range addUidList { pb.MsgData.RecvID = v isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, pb) - log.Info(msgToMQ.OperationID, "isSend", isSend) + log.Info(msgToMQSingle.OperationID, "isSend", isSend) if isSend { - msgToMQ.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQ, v) + msgToMQSingle.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQSingle, v) if err != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:UserId", v, msgToMQSingle.String()) } else { sendTag = true } @@ -378,26 +380,26 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } }() } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) } case constant.NotificationChatType: - msgToMQ.MsgData = pb.MsgData - log.NewInfo(msgToMQ.OperationID, msgToMQ) - err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID) + msgToMQSingle.MsgData = pb.MsgData + log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) if err1 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } - if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID) + if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) if err2 != nil { - log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) default: return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) }