diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 8998d2438..480603da9 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -8,7 +8,6 @@ import ( "Open_IM/pkg/proto/sdkws" "Open_IM/pkg/utils" "context" - go_redis "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" "strings" "sync" @@ -262,98 +261,109 @@ func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg } } -func (m *msgServer) GetMaxAndMinSeq(_ context.Context, in *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { - log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String()) +func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { resp := new(sdkws.GetMaxAndMinSeqResp) - m := make(map[string]*sdkws.MaxAndMinSeq) - var maxSeq, minSeq uint64 - var err1, err2 error - maxSeq, err1 = commonDB.DB.GetUserMaxSeq(in.UserID) - minSeq, err2 = commonDB.DB.GetUserMinSeq(in.UserID) - if (err1 != nil && err1 != go_redis.Nil) || (err2 != nil && err2 != go_redis.Nil) { - log.NewError(in.OperationID, "getMaxSeq from redis error", in.String()) - if err1 != nil { - log.NewError(in.OperationID, utils.GetSelfFuncName(), err1.Error()) - } - if err2 != nil { - log.NewError(in.OperationID, utils.GetSelfFuncName(), err2.Error()) - } - resp.ErrCode = 200 - resp.ErrMsg = "redis get err" - return resp, nil - } - resp.MaxSeq = uint32(maxSeq) - resp.MinSeq = uint32(minSeq) - for _, groupID := range in.GroupIDList { - x := new(sdkws.MaxAndMinSeq) - maxSeq, _ := commonDB.DB.GetGroupMaxSeq(groupID) - minSeq, _ := commonDB.DB.GetGroupUserMinSeq(groupID, in.UserID) - x.MaxSeq = uint32(maxSeq) - x.MinSeq = uint32(minSeq) - m[groupID] = x - } - resp.GroupMaxAndMinSeq = m - return resp, nil -} - -func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { - log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) - resp := new(sdkws.PullMessageBySeqListResp) - m := make(map[string]*sdkws.MsgDataList) - redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID) + m2 := make(map[string]*sdkws.MaxAndMinSeq) + maxSeq, err := m.MsgInterface.GetUserMaxSeq(ctx, req.UserID) if err != nil { - if err != go_redis.Nil { - promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) - log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) - } else { - log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) - } - msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, failedSeqList, in.OperationID) - if err1 != nil { - promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) - log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error()) - resp.ErrCode = 201 - resp.ErrMsg = err1.Error() - return resp, nil - } else { - promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) - redisMsgList = append(redisMsgList, msgList...) - resp.List = redisMsgList - } - } else { - promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) - resp.List = redisMsgList + return nil, err } - - for k, v := range in.GroupSeqList { - x := new(sdkws.MsgDataList) - redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID) - if err != nil { - if err != go_redis.Nil { - promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) - log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) - } else { - log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) + minSeq, err := m.MsgInterface.GetUserMinSeq(ctx, req.UserID) + if err != nil { + return nil, err + } + resp.MaxSeq = maxSeq + resp.MinSeq = minSeq + if len(req.GroupIDList) > 0 { + resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq) + for _, groupID := range req.GroupIDList { + maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, groupID) + if err != nil { + return nil, err } - msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) - if err1 != nil { - promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) - log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error()) - resp.ErrCode = 201 - resp.ErrMsg = err1.Error() - return resp, nil - } else { - promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) - redisMsgList = append(redisMsgList, msgList...) - x.MsgDataList = redisMsgList - m[k] = x + minSeq, err := m.MsgInterface.GetGroupMinSeq(ctx, groupID) + if err != nil { + return nil, err + } + m2[groupID] = &sdkws.MaxAndMinSeq{ + MaxSeq: maxSeq, + MinSeq: minSeq, } - } else { - promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) - x.MsgDataList = redisMsgList - m[k] = x } } - resp.GroupMsgDataList = m + return resp, nil +} + +func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { + resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)} + msgs, err := m.MsgInterface.GetMessageListBySeq(ctx, req.UserID, req.SeqList) + if err != nil { + return nil, err + } + resp.List = msgs + for userID, list := range req.GroupSeqList { + msgs, err := m.MsgInterface.GetMessageListBySeq(ctx, userID, list.SeqList) + if err != nil { + return nil, err + } + resp.GroupMsgDataList[userID] = &sdkws.MsgDataList{ + MsgDataList: msgs, + } + } + + //redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.UserID, req.SeqList, req.OperationID) + //if err != nil { + // if err != go_redis.Nil { + // promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) + // log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList) + // } else { + // log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) + // } + // msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(req.UserID, failedSeqList, req.OperationID) + // if err1 != nil { + // promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) + // log.Error(req.OperationID, "PullMessageBySeqList data error", req.String(), err1.Error()) + // resp.ErrCode = 201 + // resp.ErrMsg = err1.Error() + // return resp, nil + // } else { + // promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) + // redisMsgList = append(redisMsgList, msgList...) + // resp.List = redisMsgList + // } + //} else { + // promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) + // resp.List = redisMsgList + //} + // + //for k, v := range req.GroupSeqList { + // x := new(sdkws.MsgDataList) + // redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, req.OperationID) + // if err != nil { + // if err != go_redis.Nil { + // promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) + // log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList) + // } else { + // log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) + // } + // msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, req.OperationID) + // if err1 != nil { + // promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) + // log.Error(req.OperationID, "PullMessageBySeqList data error", req.String(), err1.Error()) + // resp.ErrCode = 201 + // resp.ErrMsg = err1.Error() + // return resp, nil + // } else { + // promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) + // redisMsgList = append(redisMsgList, msgList...) + // x.MsgDataList = redisMsgList + // m[k] = x + // } + // } else { + // promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) + // x.MsgDataList = redisMsgList + // m[k] = x + // } + //} return resp, nil } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 3445e8dd1..cf10162cc 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -3,6 +3,7 @@ package controller import ( "Open_IM/pkg/proto/msg" pbMsg "Open_IM/pkg/proto/msg" + "Open_IM/pkg/proto/sdkws" "context" ) @@ -25,8 +26,13 @@ type MsgInterface interface { // delete DelMsgFromCache(ctx context.Context, userID string, seqs []uint32) error GetGroupMaxSeq(ctx context.Context, groupID string) (uint32, error) + GetGroupMinSeq(ctx context.Context, groupID string) (uint32, error) SetGroupUserMinSeq(ctx context.Context, groupID string, seq uint32) error DelUserAllSeq(ctx context.Context, userID string) error // redis and mongodb + GetUserMaxSeq(ctx context.Context, userID string) (uint32, error) + GetUserMinSeq(ctx context.Context, userID string) (uint32, error) + + GetMessageListBySeq(ctx context.Context, userID string, seqs []uint32) ([]*sdkws.MsgData, error) } type MsgDatabaseInterface interface {