diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 9df53d20c..e7d794324 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -65,11 +65,12 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client { ctx: ctx, } } -func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress bool, longConnServer LongConnServer) { +func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer) { c.w = new(sync.Mutex) c.conn = conn c.PlatformID = utils.StringToInt(ctx.GetPlatformID()) c.IsCompress = isCompress + c.IsBackground = isBackground c.UserID = ctx.GetUserID() c.ctx = ctx c.longConnServer = longConnServer diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 5f2fa2b17..58ee6e940 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -11,6 +11,7 @@ const ( OperationID = "operationID" Compression = "compression" GzipCompressionProtocol = "gzip" + BackgroundStatus = "isBackground" ) const ( WebSocket = iota + 1 diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index cbb47fdfd..5baa11fdd 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -91,3 +91,11 @@ func (c *UserConnContext) GetPlatformID() string { func (c *UserConnContext) GetOperationID() string { return c.Req.URL.Query().Get(OperationID) } +func (c *UserConnContext) GetBackground() bool { + b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus)) + if err != nil { + return false + } else { + return b + } +} diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index bc15de73e..749287e7f 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -223,7 +223,7 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { } } client := ws.clientPool.Get().(*Client) - client.ResetClient(context, wsLongConn, compression, ws) + client.ResetClient(context, wsLongConn, context.GetBackground(), compression, ws) ws.registerChan <- client go client.readMessage() } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index be44f36f4..d3bb47b91 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -61,7 +61,10 @@ type SeqCache interface { SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error // has read seq SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error + // k: user, v: seq + SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) + GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) } type thirdCache interface { @@ -236,12 +239,22 @@ func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversatio return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) } +func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { + return c.setSeqs(ctx, hasReadSeqs, func(userID string) string { + return c.getHasReadSeqKey(conversationID, userID) + }) +} + func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return c.getSeqs(ctx, conversationIDs, func(conversationID string) string { return c.getHasReadSeqKey(conversationID, userID) }) } +func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()) +} + func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index bbf37e996..53448682b 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -343,9 +343,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa isNew = true } lastMaxSeq := currentMaxSeq + userSeqMap := make(map[string]int64) for _, m := range msgs { currentMaxSeq++ m.Seq = currentMaxSeq + userSeqMap[m.SendID] = m.Seq } failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs) if err != nil { @@ -360,6 +362,13 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } else { prome.Inc(prome.SeqSetSuccessCounter) } + err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) + if err != nil { + log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) + prome.Inc(prome.SeqSetFailedCounter) + } else { + prome.Inc(prome.SeqSetSuccessCounter) + } return lastMaxSeq, isNew, utils.Wrap(err, "") }