diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 4be38cee7..bee3cc78d 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -12,6 +12,7 @@ import ( cacheRpc "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" pbChat "Open_IM/pkg/proto/msg" + pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" @@ -237,27 +238,19 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { } func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} - newTime := db.GetCurrentTimestampByMill() - t1 := time.Now() log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String()) flag, errCode, errMsg := isMessageHasReadEnabled(pb) - log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } - //flag, errCode, errMsg, _ = messageVerification(pb) - //log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) - //if !flag { - // return returnMsg(&replay, pb, errCode, errMsg, "", 0) - //} - t1 = time.Now() + t1 := time.Now() rpc.encapsulateMsgData(pb.MsgData) - log.Info(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1)) + log.Debug(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1)) msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} // callback t1 = time.Now() callbackResp := callbackWordFilter(pb) - log.Info(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1)) + log.Debug(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1)) if callbackResp.ErrCode != 0 { log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) } @@ -275,7 +268,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S // callback t1 = time.Now() callbackResp := callbackBeforeSendSingleMsg(pb) - log.Info(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1)) + log.Debug(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1)) if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp) } @@ -287,8 +280,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } + t1 = time.Now() flag, errCode, errMsg, _ = messageVerification(pb) - log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) + log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } @@ -324,7 +318,6 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if callbackResp.ErrCode != 0 { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) } - log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: @@ -375,8 +368,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } m := make(map[string][]string, 2) m[constant.OnlineStatus] = memberUserIDList - log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, pb) - newTime = db.GetCurrentTimestampByMill() + t1 = time.Now() //split parallel send var wg sync.WaitGroup @@ -397,11 +389,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg) } } - log.Debug(pb.OperationID, "send msg cost time22 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, "uidList : ", len(addUidList)) + log.Debug(pb.OperationID, "send msg cost time22 ", time.Since(t1), pb.MsgData.ClientMsgID, "uidList : ", len(addUidList)) //wg.Add(1) //go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) wg.Wait() - newTime = db.GetCurrentTimestampByMill() + t1 = time.Now() // callback callbackResp = callbackAfterSendGroupMsg(pb) if callbackResp.ErrCode != 0 { @@ -473,11 +465,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } }() } - log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + log.Debug(pb.OperationID, "send msg cost time3 ", time.Since(t1), pb.MsgData.ClientMsgID) promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) } case constant.NotificationChatType: + t1 = time.Now() msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) @@ -494,7 +487,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } } - log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) + log.Debug(pb.OperationID, "send msg cost time ", time.Since(t1), pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.SuperGroupChatType: promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) @@ -539,6 +532,22 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { switch status { case constant.OnlineStatus: + if m.MsgData.ContentType == constant.SignalingNotification { + rpcPushMsg := pbPush.PushMsgReq{OperationID: m.OperationID, MsgData: m.MsgData, PushToUserID: key} + grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID) + if grpcConn != nil { + log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) + return errors.New("grpcConn is nil") + } + msgClient := pbPush.NewPushMsgServiceClient(grpcConn) + _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) + if err != nil { + log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) + return err + } else { + return nil + } + } pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)