This commit is contained in:
wangchuxiao 2023-05-15 17:28:37 +08:00
parent 32c2e7f5b1
commit 10c1e1df5a
3 changed files with 28 additions and 27 deletions

View File

@ -46,9 +46,9 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
var err error var err error
switch msgFromMQ.MsgData.SessionType { switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = c.pusher.MsgToSuperGroupUser(ctx, pbData.ConversationID, pbData.MsgData) err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
err = c.pusher.MsgToUser(ctx, pbData.ConversationID, pbData.MsgData) err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData)
} }
if err != nil { if err != nil {
log.ZError(ctx, "push failed", err, "msg", pbData.String()) log.ZError(ctx, "push failed", err, "msg", pbData.String())

View File

@ -47,9 +47,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
func (r *pushServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) { func (r *pushServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) {
switch pbData.MsgData.SessionType { switch pbData.MsgData.SessionType {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = r.pusher.MsgToSuperGroupUser(ctx, pbData.ConversationID, pbData.MsgData) err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
err = r.pusher.MsgToUser(ctx, pbData.ConversationID, pbData.MsgData) err = r.pusher.Push2User(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID}, pbData.MsgData)
} }
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -58,9 +58,8 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
return offlinePusher return offlinePusher
} }
func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgData) error { func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
var userIDs = []string{userID} log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userID", userID, "msg", msg.String())
// callback // callback
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil && err != errs.ErrCallbackContinue { if err := callbackOnlinePush(ctx, userIDs, msg); err != nil && err != errs.ErrCallbackContinue {
return err return err
@ -71,36 +70,38 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat
return err return err
} }
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userID) log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
p.successCount++ p.successCount++
if isOfflinePush && userID != msg.SendID { for _, userID := range userIDs {
// save invitation info for offline push if isOfflinePush && userID != msg.SendID {
for _, v := range wsResults { // save invitation info for offline push
if v.OnlinePush { for _, v := range wsResults {
return nil if v.OnlinePush {
return nil
}
} }
} if msg.ContentType == constant.SignalingNotification {
if msg.ContentType == constant.SignalingNotification { isSend, err := p.database.HandleSignalInvite(ctx, msg, userID)
isSend, err := p.database.HandleSignalInvite(ctx, msg, userID) if err != nil {
return err
}
if !isSend {
return nil
}
}
if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, userID, msg, userIDs)
if err != nil { if err != nil {
return err return err
} }
if !isSend {
return nil
}
}
if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, userID, msg, userIDs)
if err != nil {
return err
} }
} }
return nil return nil
} }
func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
operationID := mcontext.GetOperationID(ctx) operationID := mcontext.GetOperationID(ctx)
log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID) log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID)
var pushToUserIDs []string var pushToUserIDs []string