mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-25 11:06:43 +08:00
mark as read
This commit is contained in:
parent
52fa046346
commit
086835a31b
13
pkg/common/db/cache/msg.go
vendored
13
pkg/common/db/cache/msg.go
vendored
@ -61,7 +61,10 @@ type SeqCache interface {
|
|||||||
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
|
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
|
||||||
// has read seq
|
// has read seq
|
||||||
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||||||
|
// k: user, v: seq
|
||||||
|
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
|
||||||
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
|
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
|
||||||
|
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type thirdCache interface {
|
type thirdCache interface {
|
||||||
@ -236,12 +239,22 @@ func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversatio
|
|||||||
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
|
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
|
||||||
|
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
|
||||||
|
return c.getHasReadSeqKey(conversationID, userID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
|
func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
|
||||||
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
|
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
|
||||||
return c.getHasReadSeqKey(conversationID, userID)
|
return c.getHasReadSeqKey(conversationID, userID)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
|
||||||
|
return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64())
|
||||||
|
}
|
||||||
|
|
||||||
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
||||||
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
||||||
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
|
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
|
||||||
|
@ -343,9 +343,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
|
|||||||
isNew = true
|
isNew = true
|
||||||
}
|
}
|
||||||
lastMaxSeq := currentMaxSeq
|
lastMaxSeq := currentMaxSeq
|
||||||
|
userSeqMap := make(map[string]int64)
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
currentMaxSeq++
|
currentMaxSeq++
|
||||||
m.Seq = currentMaxSeq
|
m.Seq = currentMaxSeq
|
||||||
|
userSeqMap[m.SendID] = m.Seq
|
||||||
}
|
}
|
||||||
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
|
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -360,6 +362,13 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
|
|||||||
} else {
|
} else {
|
||||||
prome.Inc(prome.SeqSetSuccessCounter)
|
prome.Inc(prome.SeqSetSuccessCounter)
|
||||||
}
|
}
|
||||||
|
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||||
|
prome.Inc(prome.SeqSetFailedCounter)
|
||||||
|
} else {
|
||||||
|
prome.Inc(prome.SeqSetSuccessCounter)
|
||||||
|
}
|
||||||
return lastMaxSeq, isNew, utils.Wrap(err, "")
|
return lastMaxSeq, isNew, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user