mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-27 03:58:55 +08:00
push
This commit is contained in:
parent
76565cded1
commit
8bdf86172f
@ -287,13 +287,13 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin
|
|||||||
|
|
||||||
func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
|
func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
|
||||||
resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
|
resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
|
||||||
msgs, err := m.MsgInterface.GetMessageListBySeq(ctx, req.UserID, req.Seqs)
|
msgs, err := m.MsgInterface.GetMessagesBySeq(ctx, req.UserID, req.Seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.List = msgs
|
resp.List = msgs
|
||||||
for userID, list := range req.GroupSeqList {
|
for userID, list := range req.GroupSeqList {
|
||||||
msgs, err := m.MsgInterface.GetMessageListBySeq(ctx, userID, req.Seqs)
|
msgs, err := m.MsgInterface.GetMessagesBySeq(ctx, userID, req.Seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
4
pkg/common/db/cache/msg.go
vendored
4
pkg/common/db/cache/msg.go
vendored
@ -65,7 +65,7 @@ type MsgCache interface {
|
|||||||
|
|
||||||
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
||||||
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
||||||
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
||||||
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
||||||
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
||||||
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
||||||
@ -191,7 +191,7 @@ func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platf
|
|||||||
return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
|
return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgCache) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
|
func (m *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
|
||||||
var errResult error
|
var errResult error
|
||||||
for _, v := range seqList {
|
for _, v := range seqList {
|
||||||
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
|
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
|
||||||
|
2
pkg/common/db/cache/redis.go
vendored
2
pkg/common/db/cache/redis.go
vendored
@ -59,7 +59,7 @@ type Cache interface {
|
|||||||
|
|
||||||
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
||||||
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
||||||
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
||||||
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error)
|
||||||
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error
|
||||||
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
|
||||||
|
4
pkg/common/db/cache/redis_test.go
vendored
4
pkg/common/db/cache/redis_test.go
vendored
@ -77,7 +77,7 @@ func Test_NewSetMessageToCache(t *testing.T) {
|
|||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
}
|
}
|
||||||
func Test_NewGetMessageListBySeq(t *testing.T) {
|
func Test_NewGetMessagesBySeq(t *testing.T) {
|
||||||
var msg pbChat.MsgDataToMQ
|
var msg pbChat.MsgDataToMQ
|
||||||
var data common.MsgData
|
var data common.MsgData
|
||||||
uid := "test_uid"
|
uid := "test_uid"
|
||||||
@ -85,7 +85,7 @@ func Test_NewGetMessageListBySeq(t *testing.T) {
|
|||||||
data.ClientMsgID = "23jwhjsdf"
|
data.ClientMsgID = "23jwhjsdf"
|
||||||
msg.MsgData = &data
|
msg.MsgData = &data
|
||||||
|
|
||||||
seqMsg, failedSeqList, err := DB.GetMessageListBySeq(uid, []uint32{1212}, "cacheTest")
|
seqMsg, failedSeqList, err := DB.GetMessagesBySeq(uid, []uint32{1212}, "cacheTest")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
fmt.Println(seqMsg, failedSeqList)
|
fmt.Println(seqMsg, failedSeqList)
|
||||||
|
|
||||||
|
@ -458,7 +458,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs)
|
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != redis.Nil {
|
if err != redis.Nil {
|
||||||
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||||
@ -479,7 +479,7 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
|
||||||
successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs)
|
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != redis.Nil {
|
if err != redis.Nil {
|
||||||
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user