msg send pull

This commit is contained in:
withchao 2023-02-13 16:36:05 +08:00
parent 6b47c471f7
commit 884d28b9a8
2 changed files with 103 additions and 87 deletions

View File

@ -8,7 +8,6 @@ import (
"Open_IM/pkg/proto/sdkws" "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
go_redis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"strings" "strings"
"sync" "sync"
@ -262,98 +261,109 @@ func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg
} }
} }
func (m *msgServer) GetMaxAndMinSeq(_ context.Context, in *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) { func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) {
log.NewInfo(in.OperationID, "rpc getMaxAndMinSeq is arriving", in.String())
resp := new(sdkws.GetMaxAndMinSeqResp) resp := new(sdkws.GetMaxAndMinSeqResp)
m := make(map[string]*sdkws.MaxAndMinSeq) m2 := make(map[string]*sdkws.MaxAndMinSeq)
var maxSeq, minSeq uint64 maxSeq, err := m.MsgInterface.GetUserMaxSeq(ctx, req.UserID)
var err1, err2 error if err != nil {
maxSeq, err1 = commonDB.DB.GetUserMaxSeq(in.UserID) return nil, err
minSeq, err2 = commonDB.DB.GetUserMinSeq(in.UserID)
if (err1 != nil && err1 != go_redis.Nil) || (err2 != nil && err2 != go_redis.Nil) {
log.NewError(in.OperationID, "getMaxSeq from redis error", in.String())
if err1 != nil {
log.NewError(in.OperationID, utils.GetSelfFuncName(), err1.Error())
} }
if err2 != nil { minSeq, err := m.MsgInterface.GetUserMinSeq(ctx, req.UserID)
log.NewError(in.OperationID, utils.GetSelfFuncName(), err2.Error()) if err != nil {
return nil, err
}
resp.MaxSeq = maxSeq
resp.MinSeq = minSeq
if len(req.GroupIDList) > 0 {
resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq)
for _, groupID := range req.GroupIDList {
maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, groupID)
if err != nil {
return nil, err
}
minSeq, err := m.MsgInterface.GetGroupMinSeq(ctx, groupID)
if err != nil {
return nil, err
}
m2[groupID] = &sdkws.MaxAndMinSeq{
MaxSeq: maxSeq,
MinSeq: minSeq,
} }
resp.ErrCode = 200
resp.ErrMsg = "redis get err"
return resp, nil
} }
resp.MaxSeq = uint32(maxSeq)
resp.MinSeq = uint32(minSeq)
for _, groupID := range in.GroupIDList {
x := new(sdkws.MaxAndMinSeq)
maxSeq, _ := commonDB.DB.GetGroupMaxSeq(groupID)
minSeq, _ := commonDB.DB.GetGroupUserMinSeq(groupID, in.UserID)
x.MaxSeq = uint32(maxSeq)
x.MinSeq = uint32(minSeq)
m[groupID] = x
} }
resp.GroupMaxAndMinSeq = m
return resp, nil return resp, nil
} }
func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
resp := new(sdkws.PullMessageBySeqListResp) msgs, err := m.MsgInterface.GetMessageListBySeq(ctx, req.UserID, req.SeqList)
m := make(map[string]*sdkws.MsgDataList)
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(in.UserID, in.SeqList, in.OperationID)
if err != nil { if err != nil {
if err != go_redis.Nil { return nil, err
promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList))
log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList)
} else {
log.Debug(in.OperationID, "get message from redis is nil", failedSeqList)
} }
msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, failedSeqList, in.OperationID) resp.List = msgs
if err1 != nil { for userID, list := range req.GroupSeqList {
promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) msgs, err := m.MsgInterface.GetMessageListBySeq(ctx, userID, list.SeqList)
log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error()) if err != nil {
resp.ErrCode = 201 return nil, err
resp.ErrMsg = err1.Error() }
return resp, nil resp.GroupMsgDataList[userID] = &sdkws.MsgDataList{
} else { MsgDataList: msgs,
promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList))
redisMsgList = append(redisMsgList, msgList...)
resp.List = redisMsgList
} }
} else {
promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList))
resp.List = redisMsgList
} }
for k, v := range in.GroupSeqList { //redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.UserID, req.SeqList, req.OperationID)
x := new(sdkws.MsgDataList) //if err != nil {
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, in.OperationID) // if err != go_redis.Nil {
if err != nil { // promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList))
if err != go_redis.Nil { // log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList)
promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) // } else {
log.Error(in.OperationID, "get message from redis exception", err.Error(), failedSeqList) // log.Debug(req.OperationID, "get message from redis is nil", failedSeqList)
} else { // }
log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) // msgList, err1 := commonDB.DB.GetMsgBySeqListMongo2(req.UserID, failedSeqList, req.OperationID)
} // if err1 != nil {
msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) // promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList))
if err1 != nil { // log.Error(req.OperationID, "PullMessageBySeqList data error", req.String(), err1.Error())
promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) // resp.ErrCode = 201
log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error()) // resp.ErrMsg = err1.Error()
resp.ErrCode = 201 // return resp, nil
resp.ErrMsg = err1.Error() // } else {
return resp, nil // promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList))
} else { // redisMsgList = append(redisMsgList, msgList...)
promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) // resp.List = redisMsgList
redisMsgList = append(redisMsgList, msgList...) // }
x.MsgDataList = redisMsgList //} else {
m[k] = x // promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList))
} // resp.List = redisMsgList
} else { //}
promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) //
x.MsgDataList = redisMsgList //for k, v := range req.GroupSeqList {
m[k] = x // x := new(sdkws.MsgDataList)
} // redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(k, v.SeqList, req.OperationID)
} // if err != nil {
resp.GroupMsgDataList = m // if err != go_redis.Nil {
// promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList))
// log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList)
// } else {
// log.Debug(req.OperationID, "get message from redis is nil", failedSeqList)
// }
// msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, req.OperationID)
// if err1 != nil {
// promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList))
// log.Error(req.OperationID, "PullMessageBySeqList data error", req.String(), err1.Error())
// resp.ErrCode = 201
// resp.ErrMsg = err1.Error()
// return resp, nil
// } else {
// promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList))
// redisMsgList = append(redisMsgList, msgList...)
// x.MsgDataList = redisMsgList
// m[k] = x
// }
// } else {
// promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList))
// x.MsgDataList = redisMsgList
// m[k] = x
// }
//}
return resp, nil return resp, nil
} }

View File

@ -3,6 +3,7 @@ package controller
import ( import (
"Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
pbMsg "Open_IM/pkg/proto/msg" pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdkws"
"context" "context"
) )
@ -25,8 +26,13 @@ type MsgInterface interface {
// delete // delete
DelMsgFromCache(ctx context.Context, userID string, seqs []uint32) error DelMsgFromCache(ctx context.Context, userID string, seqs []uint32) error
GetGroupMaxSeq(ctx context.Context, groupID string) (uint32, error) GetGroupMaxSeq(ctx context.Context, groupID string) (uint32, error)
GetGroupMinSeq(ctx context.Context, groupID string) (uint32, error)
SetGroupUserMinSeq(ctx context.Context, groupID string, seq uint32) error SetGroupUserMinSeq(ctx context.Context, groupID string, seq uint32) error
DelUserAllSeq(ctx context.Context, userID string) error // redis and mongodb DelUserAllSeq(ctx context.Context, userID string) error // redis and mongodb
GetUserMaxSeq(ctx context.Context, userID string) (uint32, error)
GetUserMinSeq(ctx context.Context, userID string) (uint32, error)
GetMessageListBySeq(ctx context.Context, userID string, seqs []uint32) ([]*sdkws.MsgData, error)
} }
type MsgDatabaseInterface interface { type MsgDatabaseInterface interface {