Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-06-11 16:52:16 +08:00
commit 493e9b0570
6 changed files with 34 additions and 2 deletions

View File

@ -65,11 +65,12 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
ctx: ctx,
}
}
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress bool, longConnServer LongConnServer) {
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer) {
c.w = new(sync.Mutex)
c.conn = conn
c.PlatformID = utils.StringToInt(ctx.GetPlatformID())
c.IsCompress = isCompress
c.IsBackground = isBackground
c.UserID = ctx.GetUserID()
c.ctx = ctx
c.longConnServer = longConnServer

View File

@ -11,6 +11,7 @@ const (
OperationID = "operationID"
Compression = "compression"
GzipCompressionProtocol = "gzip"
BackgroundStatus = "isBackground"
)
const (
WebSocket = iota + 1

View File

@ -91,3 +91,11 @@ func (c *UserConnContext) GetPlatformID() string {
func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID)
}
func (c *UserConnContext) GetBackground() bool {
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))
if err != nil {
return false
} else {
return b
}
}

View File

@ -223,7 +223,7 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
}
}
client := ws.clientPool.Get().(*Client)
client.ResetClient(context, wsLongConn, compression, ws)
client.ResetClient(context, wsLongConn, context.GetBackground(), compression, ws)
ws.registerChan <- client
go client.readMessage()
}

View File

@ -61,7 +61,10 @@ type SeqCache interface {
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// k: user, v: seq
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
}
type thirdCache interface {
@ -236,12 +239,22 @@ func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversatio
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64())
}
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())

View File

@ -343,9 +343,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
isNew = true
}
lastMaxSeq := currentMaxSeq
userSeqMap := make(map[string]int64)
for _, m := range msgs {
currentMaxSeq++
m.Seq = currentMaxSeq
userSeqMap[m.SendID] = m.Seq
}
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
if err != nil {
@ -360,6 +362,13 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
} else {
prome.Inc(prome.SeqSetSuccessCounter)
}
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.Inc(prome.SeqSetSuccessCounter)
}
return lastMaxSeq, isNew, utils.Wrap(err, "")
}