From 1d4dc81e553ab27af461ea21c984ea758707ef47 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 9 Jun 2022 18:08:42 +0800 Subject: [PATCH] add batch push --- internal/msg_gateway/gate/rpc_server.go | 108 ++++++++++++++++++++---- 1 file changed, 91 insertions(+), 17 deletions(-) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index cc16f1226..05853a04a 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -7,6 +7,7 @@ import ( "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbRelay "Open_IM/pkg/proto/relay" + sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "bytes" "context" @@ -143,30 +144,78 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser return &resp, nil } +//func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) { +// log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) +// var singleUserResult []*pbRelay.SingelMsgToUserResultList +// //r.GetBatchMsgForPush(req.OperationID,req.MsgData,req.PushToUserIDList,) +// msgBytes, _ := proto.Marshal(req.MsgData) +// mReply := Resp{ +// ReqIdentifier: constant.WSPushMsg, +// OperationID: req.OperationID, +// Data: msgBytes, +// } +// var replyBytes bytes.Buffer +// enc := gob.NewEncoder(&replyBytes) +// err := enc.Encode(mReply) +// if err != nil { +// log.NewError(req.OperationID, "data encode err", err.Error()) +// } +// for _, v := range req.PushToUserIDList { +// var resp []*pbRelay.SingleMsgToUserPlatform +// tempT := &pbRelay.SingelMsgToUserResultList{ +// UserID: v, +// } +// userConnMap := ws.getUserAllCons(v) +// for platform, userConn := range userConnMap { +// if userConn != nil { +// resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) +// if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { +// tempT.OnlinePush = true +// log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) +// temp := &pbRelay.SingleMsgToUserPlatform{ +// ResultCode: resultCode, +// RecvID: v, +// RecvPlatFormID: int32(platform), +// } +// resp = append(resp, temp) +// } +// +// } +// } +// tempT.Resp = resp +// singleUserResult = append(singleUserResult, tempT) +// +// } +// +// return &pbRelay.OnlineBatchPushOneMsgResp{ +// SinglePushResult: singleUserResult, +// }, nil +//} func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) { log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) var singleUserResult []*pbRelay.SingelMsgToUserResultList - msgBytes, _ := proto.Marshal(req.MsgData) - mReply := Resp{ - ReqIdentifier: constant.WSPushMsg, - OperationID: req.OperationID, - Data: msgBytes, - } - var replyBytes bytes.Buffer - enc := gob.NewEncoder(&replyBytes) - err := enc.Encode(mReply) - if err != nil { - log.NewError(req.OperationID, "data encode err", err.Error()) - } + for _, v := range req.PushToUserIDList { var resp []*pbRelay.SingleMsgToUserPlatform tempT := &pbRelay.SingelMsgToUserResultList{ UserID: v, } userConnMap := ws.getUserAllCons(v) - for platform, userConn := range userConnMap { - if userConn != nil { - resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) + var platformList []int + for k, _ := range userConnMap { + platformList = append(platformList, k) + } + needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) + for platform, list := range needPushMapList { + if list != nil { + for _, v := range list { + req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) + } + replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) + if err != nil { + continue + } + resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { tempT.OnlinePush = true log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) @@ -178,17 +227,42 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online resp = append(resp, temp) } + } else { + if utils.IsContainInt(platform, r.pushTerminal) { + tempT.OnlinePush = true + temp := &pbRelay.SingleMsgToUserPlatform{ + ResultCode: 0, + RecvID: v, + RecvPlatFormID: int32(platform), + } + resp = append(resp, temp) + } } } tempT.Resp = resp singleUserResult = append(singleUserResult, tempT) - } - return &pbRelay.OnlineBatchPushOneMsgResp{ SinglePushResult: singleUserResult, }, nil } +func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) { + msgBytes, _ := proto.Marshal(wsData) + mReply := Resp{ + ReqIdentifier: constant.WSPushMsg, + OperationID: operationID, + Data: msgBytes, + } + var replyBytes bytes.Buffer + enc := gob.NewEncoder(&replyBytes) + err := enc.Encode(mReply) + if err != nil { + log.NewError(operationID, "data encode err", err.Error()) + return bytes.Buffer{}, err + } + return replyBytes, nil +} + func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOfflineReq) (*pbRelay.KickUserOfflineResp, error) { log.NewInfo(req.OperationID, "KickUserOffline is arriving", req.String()) for _, v := range req.KickUserIDList {