From 4f184a6723619b451ec5bcbc71efa0a4c77d0f44 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Thu, 9 Jun 2022 14:24:24 +0800 Subject: [PATCH] batch push --- internal/msg_gateway/gate/batch_push.go | 47 +++++++++++++++++-------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/internal/msg_gateway/gate/batch_push.go b/internal/msg_gateway/gate/batch_push.go index 6a7e7886f..089c91e4c 100644 --- a/internal/msg_gateway/gate/batch_push.go +++ b/internal/msg_gateway/gate/batch_push.go @@ -1,7 +1,13 @@ package gate import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/log" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbChat "Open_IM/pkg/proto/chat" sdk_ws "Open_IM/pkg/proto/sdk_ws" + "context" + "strings" ) var MaxPullMsgNum = 100 @@ -9,21 +15,34 @@ var MaxPullMsgNum = 100 func (r *RPCServer) GenPullSeqList(currentSeq uint32, operationID string, userID string) ([]uint32, error) { return nil, nil } - -func (r *RPCServer) GetSingleMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID string) []*sdk_ws.MsgData { - //seqList, err := r.GenPullSeqList(msgData.Seq, operationID, pushToUserID) - //if err != nil { - // return nil - //} - //rpcReq := sdk_ws.PullMessageBySeqListReq{} - //rpcReq.SeqList = seqList - //rpcReq.UserID = pushToUserID - //rpcReq.OperationID = operationID - //grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) - //msgClient := pbChat.NewChatClient(grpcConn) - //reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq) - +func (r *RPCServer) GetMergeSingleMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) []*sdk_ws.MsgData { return nil + //ws.getUserConn(pushToUserID, platformID) + //msgData.Seq + //msgList := r.GetSingleMsgForPush(operationID, msgData, pushToUserID, platformID) + +} +func (r *RPCServer) GetSingleMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID string) []*sdk_ws.MsgData { + seqList, err := r.GenPullSeqList(msgData.Seq, operationID, pushToUserID) + if err != nil { + log.Error(operationID, "GenPullSeqList failed ", err.Error(), msgData.Seq, pushToUserID) + return nil + } + rpcReq := sdk_ws.PullMessageBySeqListReq{} + rpcReq.SeqList = seqList + rpcReq.UserID = pushToUserID + rpcReq.OperationID = operationID + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq) + if err != nil { + log.Error(operationID, "PullMessageBySeqList failed ", err.Error(), rpcReq.String()) + return nil + } + if len(reply.List) == 0 { + return nil + } + return reply.List } func (r *RPCServer) GetBatchMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserIDList []string, platformID string) map[string][]*sdk_ws.MsgData {