media message change to push directly

This commit is contained in:
Gordon 2022-09-19 22:16:45 +08:00
parent a99295f594
commit 00d2ff810b

View File

@ -12,6 +12,7 @@ import (
cacheRpc "Open_IM/pkg/proto/cache" cacheRpc "Open_IM/pkg/proto/cache"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
pbChat "Open_IM/pkg/proto/msg" pbChat "Open_IM/pkg/proto/msg"
pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
sdk_ws "Open_IM/pkg/proto/sdk_ws" sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
@ -237,27 +238,19 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
} }
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
replay := pbChat.SendMsgResp{} replay := pbChat.SendMsgResp{}
newTime := db.GetCurrentTimestampByMill()
t1 := time.Now()
log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String()) log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
flag, errCode, errMsg := isMessageHasReadEnabled(pb) flag, errCode, errMsg := isMessageHasReadEnabled(pb)
log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag)
if !flag { if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0) return returnMsg(&replay, pb, errCode, errMsg, "", 0)
} }
//flag, errCode, errMsg, _ = messageVerification(pb) t1 := time.Now()
//log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
//if !flag {
// return returnMsg(&replay, pb, errCode, errMsg, "", 0)
//}
t1 = time.Now()
rpc.encapsulateMsgData(pb.MsgData) rpc.encapsulateMsgData(pb.MsgData)
log.Info(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1)) log.Debug(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1))
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
// callback // callback
t1 = time.Now() t1 = time.Now()
callbackResp := callbackWordFilter(pb) callbackResp := callbackWordFilter(pb)
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1)) log.Debug(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
} }
@ -275,7 +268,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
// callback // callback
t1 = time.Now() t1 = time.Now()
callbackResp := callbackBeforeSendSingleMsg(pb) callbackResp := callbackBeforeSendSingleMsg(pb)
log.Info(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1)) log.Debug(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1))
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp) log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
} }
@ -287,8 +280,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter) promePkg.PromeInc(promePkg.SingleChatMsgProcessFailedCounter)
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
} }
t1 = time.Now()
flag, errCode, errMsg, _ = messageVerification(pb) flag, errCode, errMsg, _ = messageVerification(pb)
log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1)) log.Debug(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
if !flag { if !flag {
return returnMsg(&replay, pb, errCode, errMsg, "", 0) return returnMsg(&replay, pb, errCode, errMsg, "", 0)
} }
@ -324,7 +318,6 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
} }
log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.GroupChatType: case constant.GroupChatType:
@ -375,8 +368,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
} }
m := make(map[string][]string, 2) m := make(map[string][]string, 2)
m[constant.OnlineStatus] = memberUserIDList m[constant.OnlineStatus] = memberUserIDList
log.Debug(pb.OperationID, "send msg cost time1 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, pb) t1 = time.Now()
newTime = db.GetCurrentTimestampByMill()
//split parallel send //split parallel send
var wg sync.WaitGroup var wg sync.WaitGroup
@ -397,11 +389,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg) go rpc.sendMsgToGroupOptimization(v[split*(len(v)/split):], tmp, k, &sendTag, &wg)
} }
} }
log.Debug(pb.OperationID, "send msg cost time22 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID, "uidList : ", len(addUidList)) log.Debug(pb.OperationID, "send msg cost time22 ", time.Since(t1), pb.MsgData.ClientMsgID, "uidList : ", len(addUidList))
//wg.Add(1) //wg.Add(1)
//go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) //go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg)
wg.Wait() wg.Wait()
newTime = db.GetCurrentTimestampByMill() t1 = time.Now()
// callback // callback
callbackResp = callbackAfterSendGroupMsg(pb) callbackResp = callbackAfterSendGroupMsg(pb)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
@ -473,11 +465,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
} }
}() }()
} }
log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) log.Debug(pb.OperationID, "send msg cost time3 ", time.Since(t1), pb.MsgData.ClientMsgID)
promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
} }
case constant.NotificationChatType: case constant.NotificationChatType:
t1 = time.Now()
msgToMQSingle.MsgData = pb.MsgData msgToMQSingle.MsgData = pb.MsgData
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
@ -494,7 +487,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
} }
} }
log.Debug(pb.OperationID, "send msg cost time ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) log.Debug(pb.OperationID, "send msg cost time ", time.Since(t1), pb.MsgData.ClientMsgID)
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
@ -539,6 +532,22 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error {
switch status { switch status {
case constant.OnlineStatus: case constant.OnlineStatus:
if m.MsgData.ContentType == constant.SignalingNotification {
rpcPushMsg := pbPush.PushMsgReq{OperationID: m.OperationID, MsgData: m.MsgData, PushToUserID: key}
grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID)
if grpcConn != nil {
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
return errors.New("grpcConn is nil")
}
msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
if err != nil {
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
return err
} else {
return nil
}
}
pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)