OpenIM-Gordon b76816bc14
refactor: 3.7.0 code conventions. (#2148)
* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* feat: add code lint

* feat: add code lint

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Script Refactoring

* feat: code format

* Script Refactoring

* Script Refactoring

* Script Refactoring

* Adjust MinIO configuration settings

* Adjust configuration settings

* Adjust configuration settings

* refactor: config change.

* refactor: webhooks update.

* Adjust configuration settings

* refactor: webhooks update.

* Adjust configuration settings

* Adjust configuration settings

* Adjust configuration settings

* feat: s3 api addr

* refactor: webhooks update.

* Adjust configuration settings

* Adjust configuration settings

* Adjust configuration settings

* Adjust configuration settings

* Adjust configuration settings

* Adjust configuration settings

* Adjust configuration settings

* refactor: webhooks update.

* refactor: kafka update.

* Simplify the Docker Compose configuration, remove unnecessary environment variables, and eliminate the gateway service.

* refactor: kafka update.

* refactor: kafka update.

* Simplify the Docker Compose configuration, remove unnecessary environment variables, and eliminate the gateway service.

* Simplify the Docker Compose configuration, remove unnecessary environment variables, and eliminate the gateway service.

* Windows can compile and run.

* Windows can compile and run.

* refactor: kafka update.

* feat: msg cache split

* refactor: webhooks update

* refactor: webhooks update

* refactor: friends update

* refactor: group update

* refactor: third update

* refactor: api update

* refactor: crontab update

* refactor: msggateway update

* mage

* mage

* refactor: all module update.

* check

* refactor: all module update.

* load config

* load config

* load config

* load config

* refactor: all module update.

* refactor: all module update.

* refactor: all module update.

* refactor: all module update.

* refactor: all module update.

* Optimize Docker configuration and script.

* refactor: all module update.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* refactor: all module update.

* Optimize Docker configuration and script.

* refactor: all module update.

* refactor: all module update.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* update tools

* update tools

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* update protocol

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* refactor: all module update.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* Optimize Docker configuration and script.

* refactor: api remove token auth by redis directly.

* Code Refactoring

* refactor: websocket auth change to call rpc of auth.

* refactor: kick online user and remove token change to call auth rpc.

* refactor: kick online user and remove token change to call auth rpc.

* refactor: remove msggateway redis.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor webhook

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor webhook

* refactor: cmd update.

* refactor: cmd update.

* fix: runtime: goroutine stack exceeds

* refactor: cmd update.

* refactor notification

* refactor notification

* refactor

* refactor: cmd update.

* refactor: cmd update.

* refactor

* refactor

* refactor

* protojson

* protojson

* protojson

* go mod

* wrapperspb

* refactor: cmd update.

* refactor: cmd update.

* refactor: cmd update.

* refactor: context update.

* refactor: websocket update info.

* refactor: websocket update info.

* refactor: websocket update info.

* refactor: websocket update info.

* refactor: api name change.

* refactor: debug info.

* refactor: debug info.

* refactor: debug info.

* fix: update file

* refactor

* refactor

* refactor: debug info.

* refactor: debug info.

* refactor: debug info.

* refactor: debug info.

* refactor: debug info.

* refactor: debug info.

* fix: callback update.

* fix: callback update.

* refactor

* fix: update message.

* fix: msg cache timeout.

* refactor

* refactor

* fix: push update.

* fix: push update.

* fix: push update.

* fix: push update.

* fix: push update.

* fix: push update.

* fix: push update.

* fix: websocket handle error remove when upgrade error.

---------

Co-authored-by: skiffer-git <44203734@qq.com>
Co-authored-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
Co-authored-by: withchao <993506633@qq.com>
2024-04-19 22:23:08 +08:00

183 lines
7.1 KiB
Go

package cache
import (
"context"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
// seqs map: key userID value minSeq
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// k: user, v: seq
SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error
// k: conversation, v :seq
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
}
func NewSeqCache(rdb redis.UniversalClient) SeqCache {
return &seqCache{rdb: rdb}
}
type seqCache struct {
rdb redis.UniversalClient
}
func (c *seqCache) getMaxSeqKey(conversationID string) string {
return maxSeq + conversationID
}
func (c *seqCache) getMinSeqKey(conversationID string) string {
return minSeq + conversationID
}
func (c *seqCache) getHasReadSeqKey(conversationID string, userID string) string {
return hasReadSeq + userID + ":" + conversationID
}
func (c *seqCache) getConversationUserMinSeqKey(conversationID, userID string) string {
return conversationUserMinSeq + conversationID + "u:" + userID
}
func (c *seqCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) {
val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
val := stringutil.StringToInt64(res)
if val != 0 {
m[items[i]] = val
}
}
return m, nil
}
func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
}
func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
}
func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs {
if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, errs.Wrap(err)
}
return val, nil
}
func (c *seqCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, userIDs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err())
}
func (c *seqCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *seqCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *seqCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(userID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error {
return c.setSeqs(ctx, hasReadSeqs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *seqCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()
if err != nil {
return 0, err
}
return val, nil
}