notification

This commit is contained in:
withchao 2023-05-04 12:11:29 +08:00
parent 0935f1d57b
commit 3e1f97e300
26 changed files with 948 additions and 195 deletions

View File

@ -2,8 +2,6 @@ package msgtransfer
import ( import (
"fmt" "fmt"
"sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@ -43,14 +41,16 @@ func StartTransfer(prometheusPort int) error {
if err != nil { if err != nil {
return err return err
} }
cacheModel := cache.NewCacheModel(rdb) msgModel := cache.NewMsgCacheModel(rdb)
notificationModel := cache.NewNotificationCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
notificationDocModel := unrelation.NewNotificationMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db)) chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient())) extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel) msgDatabase := controller.NewMsgDatabase(msgDocModel, msgModel)
notificationDatabase := controller.NewNotificationDatabase(msgDocModel, cacheModel) notificationDatabase := controller.NewNotificationDatabase(notificationDocModel, notificationModel)
conversationRpcClient := rpcclient.NewConversationClient(client) conversationRpcClient := rpcclient.NewConversationClient(client)
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, notificationDatabase, conversationRpcClient) msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, notificationDatabase, conversationRpcClient)
@ -77,8 +77,8 @@ func (m *MsgTransfer) initPrometheus() {
} }
func (m *MsgTransfer) Start(prometheusPort int) error { func (m *MsgTransfer) Start(prometheusPort int) error {
var wg sync.WaitGroup //var wg sync.WaitGroup
wg.Add(4) //wg.Add(4)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if config.Config.ChatPersistenceMysql { if config.Config.ChatPersistenceMysql {
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH) go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
@ -92,6 +92,6 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
if err != nil { if err != nil {
return err return err
} }
wg.Wait() //wg.Wait()
return nil return nil
} }

View File

@ -20,10 +20,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan
type Fcm struct { type Fcm struct {
fcmMsgCli *messaging.Client fcmMsgCli *messaging.Client
cache cache.Model cache cache.MsgModel
} }
func NewClient(cache cache.Model) *Fcm { func NewClient(cache cache.MsgModel) *Fcm {
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
fcmApp, err := firebase.NewApp(context.Background(), nil, opt) fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil { if err != nil {

View File

@ -9,7 +9,7 @@ import (
) )
func Test_Push(t *testing.T) { func Test_Push(t *testing.T) {
var redis cache.Model var redis cache.MsgModel
offlinePusher := NewClient(redis) offlinePusher := NewClient(redis)
err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &offlinepush.Opts{}) err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &offlinepush.Opts{})
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -38,12 +38,12 @@ const (
) )
type Client struct { type Client struct {
cache cache.Model cache cache.MsgModel
tokenExpireTime int64 tokenExpireTime int64
taskIDTTL int64 taskIDTTL int64
} }
func NewClient(cache cache.Model) *Client { func NewClient(cache cache.MsgModel) *Client {
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
} }

View File

@ -21,7 +21,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil { if err != nil {
return err return err
} }
cacheModel := cache.NewCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
offlinePusher := NewOfflinePusher(cacheModel) offlinePusher := NewOfflinePusher(cacheModel)
database := controller.NewPushDatabase(cacheModel) database := controller.NewPushDatabase(cacheModel)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client))

View File

@ -44,7 +44,7 @@ func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl
} }
} }
func NewOfflinePusher(cache cache.Model) offlinepush.OfflinePusher { func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
var offlinePusher offlinepush.OfflinePusher var offlinePusher offlinepush.OfflinePusher
if config.Config.Push.Getui.Enable { if config.Config.Push.Getui.Enable {
offlinePusher = getui.NewClient(cache) offlinePusher = getui.NewClient(cache)

View File

@ -32,7 +32,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
pbAuth.RegisterAuthServer(server, &authServer{ pbAuth.RegisterAuthServer(server, &authServer{
userRpcClient: rpcclient.NewUserClient(client), userRpcClient: rpcclient.NewUserClient(client),
RegisterCenter: client, RegisterCenter: client,
authDatabase: controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), authDatabase: controller.NewAuthDatabase(cache.NewMsgCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
}) })
return nil return nil
} }

View File

@ -15,10 +15,10 @@ type MessageLocker interface {
UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error) UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
} }
type LockerMessage struct { type LockerMessage struct {
cache cache.Model cache cache.MsgModel
} }
func NewLockerMessage(cache cache.Model) *LockerMessage { func NewLockerMessage(cache cache.MsgModel) *LockerMessage {
return &LockerMessage{cache: cache} return &LockerMessage{cache: cache}
} }
func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) { func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) {

View File

@ -57,7 +57,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil { if err != nil {
return err return err
} }
cacheModel := cache.NewCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())

View File

@ -37,7 +37,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
return err return err
} }
third.RegisterThirdServer(server, &thirdServer{ third.RegisterThirdServer(server, &thirdServer{
thirdDatabase: controller.NewThirdDatabase(cache.NewCacheModel(rdb)), thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb)),
userRpcClient: rpcclient.NewUserClient(client), userRpcClient: rpcclient.NewUserClient(client),
s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db), u), s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db), u),
}) })

View File

@ -63,7 +63,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
if err != nil { if err != nil {
return return
} }
cacheModel := cache.NewCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName()) mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName())
ctx := context.Background() ctx := context.Background()

View File

@ -14,13 +14,13 @@ const (
blackExpireTime = time.Second * 60 * 60 * 12 blackExpireTime = time.Second * 60 * 60 * 12
) )
// args fn will exec when no data in cache // args fn will exec when no data in msgCache
type BlackCache interface { type BlackCache interface {
//get blackIDs from cache //get blackIDs from msgCache
metaCache metaCache
NewCache() BlackCache NewCache() BlackCache
GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error)
//del user's blackIDs cache, exec when a user's black list changed //del user's blackIDs msgCache, exec when a user's black list changed
DelBlackIDs(ctx context.Context, userID string) BlackCache DelBlackIDs(ctx context.Context, userID string) BlackCache
} }

View File

@ -24,22 +24,22 @@ const (
conversationExpireTime = time.Second * 60 * 60 * 12 conversationExpireTime = time.Second * 60 * 60 * 12
) )
// arg fn will exec when no data in cache // arg fn will exec when no data in msgCache
type ConversationCache interface { type ConversationCache interface {
metaCache metaCache
NewCache() ConversationCache NewCache() ConversationCache
// get user's conversationIDs from cache // get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
DelConversationIDs(userIDs []string) ConversationCache DelConversationIDs(userIDs []string) ConversationCache
// get one conversation from cache // get one conversation from msgCache
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache
DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
// get one conversation from cache // get one conversation from msgCache
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
// get one user's all conversations from cache // get one user's all conversations from msgCache
GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
// get user conversation recv msg from cache // get user conversation recv msg from msgCache
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
// get one super group recv msg but do not notification userID list // get one super group recv msg but do not notification userID list

View File

@ -17,14 +17,14 @@ const (
friendKey = "FRIEND_INFO:" friendKey = "FRIEND_INFO:"
) )
// args fn will exec when no data in cache // args fn will exec when no data in msgCache
type FriendCache interface { type FriendCache interface {
metaCache metaCache
NewCache() FriendCache NewCache() FriendCache
GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error)
// call when friendID List changed // call when friendID List changed
DelFriendIDs(ownerUserID ...string) FriendCache DelFriendIDs(ownerUserID ...string) FriendCache
// get single friendInfo from cache // get single friendInfo from msgCache
GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)
// del friend when friend info changed // del friend when friend info changed
DelFriend(ownerUserID, friendUserID string) FriendCache DelFriend(ownerUserID, friendUserID string) FriendCache

View File

@ -20,26 +20,25 @@ import (
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
const ( //const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq // userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN" // userMinSeq = "REDIS_USER_MIN_SEQ:"
userMinSeq = "REDIS_USER_MIN_SEQ:" // getuiToken = "GETUI_TOKEN"
getuiToken = "GETUI_TOKEN" // getuiTaskID = "GETUI_TASK_ID"
getuiTaskID = "GETUI_TASK_ID" // messageCache = "MESSAGE_CACHE:"
messageCache = "MESSAGE_CACHE:" // signalCache = "SIGNAL_CACHE:"
signalCache = "SIGNAL_CACHE:" // signalListCache = "SIGNAL_LIST_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:" // FcmToken = "FCM_TOKEN:"
FcmToken = "FCM_TOKEN:" // groupUserMinSeq = "GROUP_USER_MIN_SEQ:"
groupUserMinSeq = "GROUP_USER_MIN_SEQ:" // groupMaxSeq = "GROUP_MAX_SEQ:"
groupMaxSeq = "GROUP_MAX_SEQ:" // groupMinSeq = "GROUP_MIN_SEQ:"
groupMinSeq = "GROUP_MIN_SEQ:" // sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" // userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" // exTypeKeyLocker = "EX_LOCK:"
exTypeKeyLocker = "EX_LOCK:" // uidPidToken = "UID_PID_TOKEN_STATUS:"
uidPidToken = "UID_PID_TOKEN_STATUS:" //)
)
type Model interface { type MsgModel interface {
IncrUserSeq(ctx context.Context, userID string) (int64, error) IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error) GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
@ -87,16 +86,16 @@ type Model interface {
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
} }
func NewCacheModel(client redis.UniversalClient) Model { func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
return &cache{rdb: client} return &msgCache{rdb: client}
} }
type cache struct { type msgCache struct {
rdb redis.UniversalClient rdb redis.UniversalClient
} }
// 兼容老版本调用 // 兼容老版本调用
func (c *cache) DelKeys() { func (c *msgCache) DelKeys() {
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:",
"GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} {
fName := utils.GetSelfFuncName() fName := utils.GetSelfFuncName()
@ -127,66 +126,66 @@ func (c *cache) DelKeys() {
} }
} }
func (c *cache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { func (c *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64())
} }
func (c *cache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { func (c *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64())
} }
func (c *cache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { func (c *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err())
} }
func (c *cache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { func (c *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err())
} }
func (c *cache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { func (c *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, userMinSeq+userID).Int64()) return utils.Wrap2(c.rdb.Get(ctx, userMinSeq+userID).Int64())
} }
func (c *cache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { func (c *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err())
} }
func (c *cache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { func (c *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := groupUserMinSeq + "g:" + groupID + "u:" + userID key := groupUserMinSeq + "g:" + groupID + "u:" + userID
return utils.Wrap2(c.rdb.Get(ctx, key).Int64()) return utils.Wrap2(c.rdb.Get(ctx, key).Int64())
} }
func (c *cache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { func (c *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) return utils.Wrap2(c.rdb.Get(ctx, groupMaxSeq+groupID).Int64())
} }
func (c *cache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { func (c *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, groupMinSeq+groupID).Int64()) return utils.Wrap2(c.rdb.Get(ctx, groupMinSeq+groupID).Int64())
} }
func (c *cache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { func (c *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
seq, err := c.rdb.Incr(ctx, key).Uint64() seq, err := c.rdb.Incr(ctx, key).Uint64()
return int64(seq), errs.Wrap(err) return int64(seq), errs.Wrap(err)
} }
func (c *cache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { func (c *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := groupMaxSeq + groupID key := groupMaxSeq + groupID
return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err())
} }
func (c *cache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { func (c *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := groupMinSeq + groupID key := groupMinSeq + groupID
return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err())
} }
func (c *cache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
} }
func (c *cache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
key := uidPidToken + userID + ":" + platformID key := uidPidToken + userID + ":" + platformID
m, err := c.rdb.HGetAll(ctx, key).Result() m, err := c.rdb.HGetAll(ctx, key).Result()
if err != nil { if err != nil {
@ -199,7 +198,7 @@ func (c *cache) GetTokensWithoutError(ctx context.Context, userID, platformID st
return mm, nil return mm, nil
} }
func (c *cache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error {
key := uidPidToken + userID + ":" + platform key := uidPidToken + userID + ":" + platform
mm := make(map[string]interface{}) mm := make(map[string]interface{})
for k, v := range m { for k, v := range m {
@ -208,20 +207,20 @@ func (c *cache) SetTokenMapByUidPid(ctx context.Context, userID string, platform
return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err())
} }
func (c *cache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error {
key := uidPidToken + userID + ":" + platform key := uidPidToken + userID + ":" + platform
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
} }
func (c *cache) getMessageCacheKey(sourceID string, seq int64) string { func (c *msgCache) getMessageCacheKey(sourceID string, seq int64) string {
return messageCache + sourceID + "_" + strconv.Itoa(int(seq)) return messageCache + sourceID + "_" + strconv.Itoa(int(seq))
} }
func (c *cache) allMessageCacheKey(sourceID string) string { func (c *msgCache) allMessageCacheKey(sourceID string) string {
return messageCache + sourceID + "_*" return messageCache + sourceID + "_*"
} }
func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { func (c *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline() pipe := c.rdb.Pipeline()
for _, v := range seqs { for _, v := range seqs {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
@ -247,7 +246,7 @@ func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int6
return seqMsgs, failedSeqs, err return seqMsgs, failedSeqs, err
} }
func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) { func (c *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) {
pipe := c.rdb.Pipeline() pipe := c.rdb.Pipeline()
var failedMsgs []sdkws.MsgData var failedMsgs []sdkws.MsgData
for _, msg := range msgList { for _, msg := range msgList {
@ -262,13 +261,13 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
} }
} }
if len(failedMsgs) != 0 { if len(failedMsgs) != 0 {
return len(failedMsgs), fmt.Errorf("set msg to cache failed, failed lists: %v, %s", failedMsgs, userID) return len(failedMsgs), fmt.Errorf("set msg to msgCache failed, failed lists: %v, %s", failedMsgs, userID)
} }
_, err := pipe.Exec(ctx) _, err := pipe.Exec(ctx)
return 0, err return 0, err
} }
func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { func (c *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error {
pipe := c.rdb.Pipeline() pipe := c.rdb.Pipeline()
for _, v := range msgList { for _, v := range msgList {
if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil {
@ -279,7 +278,7 @@ func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgLi
return errs.Wrap(err) return errs.Wrap(err)
} }
func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { func (c *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result() vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result()
if err == redis.Nil { if err == redis.Nil {
return nil return nil
@ -297,7 +296,7 @@ func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
return errs.Wrap(err) return errs.Wrap(err)
} }
func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { func (c *msgCache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
req := &sdkws.SignalReq{} req := &sdkws.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil { if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, errs.Wrap(err) return false, errs.Wrap(err)
@ -349,7 +348,7 @@ func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, push
return true, nil return true, nil
} }
func (c *cache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) { func (c *msgCache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) {
bytes, err := c.rdb.Get(ctx, signalCache+clientMsgID).Bytes() bytes, err := c.rdb.Get(ctx, signalCache+clientMsgID).Bytes()
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -370,7 +369,7 @@ func (c *cache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, client
return signalInviteReq, nil return signalInviteReq, nil
} }
func (c *cache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) { func (c *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) {
key, err := c.rdb.LPop(ctx, signalListCache+userID).Result() key, err := c.rdb.LPop(ctx, signalListCache+userID).Result()
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -382,11 +381,11 @@ func (c *cache) GetAvailableSignalInvitationInfo(ctx context.Context, userID str
return invitationInfo, errs.Wrap(c.DelUserSignalList(ctx, userID)) return invitationInfo, errs.Wrap(c.DelUserSignalList(ctx, userID))
} }
func (c *cache) DelUserSignalList(ctx context.Context, userID string) error { func (c *msgCache) DelUserSignalList(ctx context.Context, userID string) error {
return errs.Wrap(c.rdb.Del(ctx, signalListCache+userID).Err()) return errs.Wrap(c.rdb.Del(ctx, signalListCache+userID).Err())
} }
func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
for _, seq := range seqs { for _, seq := range seqs {
key := c.getMessageCacheKey(userID, seq) key := c.getMessageCacheKey(userID, seq)
result, err := c.rdb.Get(ctx, key).Result() result, err := c.rdb.Get(ctx, key).Result()
@ -412,67 +411,67 @@ func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64
return nil return nil
} }
func (c *cache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err())
} }
func (c *cache) GetGetuiToken(ctx context.Context) (string, error) { func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result()) return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result())
} }
func (c *cache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
} }
func (c *cache) GetGetuiTaskID(ctx context.Context) (string, error) { func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result()) return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result())
} }
func (c *cache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err())
} }
func (c *cache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int() result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int()
return int32(result), errs.Wrap(err) return int32(result), errs.Wrap(err)
} }
func (c *cache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) return errs.Wrap(c.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
} }
func (c *cache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) return utils.Wrap2(c.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result())
} }
func (c *cache) DelFcmToken(ctx context.Context, account string, platformID int) error { func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return errs.Wrap(c.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) return errs.Wrap(c.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err())
} }
func (c *cache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result()
return int(seq), errs.Wrap(err) return int(seq), errs.Wrap(err)
} }
func (c *cache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err())
} }
func (c *cache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int())
} }
func (c *cache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
} }
func (c *cache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.Del(ctx, key).Err()) return errs.Wrap(c.rdb.Del(ctx, key).Err())
} }
func (c *cache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType { switch sessionType {
case constant.SingleChatType: case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID return "EX_SINGLE_" + clientMsgID
@ -486,7 +485,7 @@ func (c *cache) getMessageReactionExPrefix(clientMsgID string, sessionType int32
return "" return ""
} }
func (c *cache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil { if err != nil {
return false, utils.Wrap(err, "") return false, utils.Wrap(err, "")
@ -494,22 +493,22 @@ func (c *cache) JudgeMessageReactionExist(ctx context.Context, clientMsgID strin
return n > 0, nil return n > 0, nil
} }
func (c *cache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
} }
func (c *cache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
} }
func (c *cache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
} }
func (c *cache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
} }
func (c *cache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
} }

514
pkg/common/db/cache/notification.go vendored Normal file
View File

@ -0,0 +1,514 @@
package cache
import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"strconv"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/gogo/protobuf/jsonpb"
"google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
)
const (
NotificationUserIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
NotificationUserMinSeq = "REDIS_USER_MIN_SEQ:"
NotificationGetuiToken = "GETUI_TOKEN"
NotificationGetuiTaskID = "GETUI_TASK_ID"
NotificationMessageCache = "MESSAGE_CACHE:"
NotificationSignalCache = "SIGNAL_CACHE:"
NotificationSignalListCache = "SIGNAL_LIST_CACHE:"
NotificationFcmToken = "FCM_TOKEN:"
NotificationGroupUserMinSeq = "GROUP_USER_MIN_SEQ:"
NotificationGroupMaxSeq = "GROUP_MAX_SEQ:"
NotificationGroupMinSeq = "GROUP_MIN_SEQ:"
NotificationSendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
NotificationUserBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
NotificationExTypeKeyLocker = "EX_LOCK:"
NotificationUidPidToken = "UID_PID_TOKEN_STATUS:"
)
type NotificationModel interface {
IncrUserSeq(ctx context.Context, userID string) (int64, error)
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error
DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error
GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error)
DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error
CleanUpOneUserAllMsg(ctx context.Context, userID string) error
HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error)
GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error)
GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error)
DelUserSignalList(ctx context.Context, userID string) error
DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
SetGetuiToken(ctx context.Context, token string, expireTime int64) error
GetGetuiToken(ctx context.Context) (string, error)
SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error
GetGetuiTaskID(ctx context.Context) (string, error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error)
GetFcmToken(ctx context.Context, account string, platformID int) (string, error)
DelFcmToken(ctx context.Context, account string, platformID int) error
IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error
GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error)
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
}
func NewNotificationCacheModel(client redis.UniversalClient) NotificationModel {
return &notificationCache{rdb: client}
}
type notificationCache struct {
rdb redis.UniversalClient
}
// 兼容老版本调用
func (c *notificationCache) DelKeys() {
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:",
"GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} {
fName := utils.GetSelfFuncName()
var cursor uint64
var n int
for {
var keys []string
var err error
keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
if err != nil {
panic(err.Error())
}
n += len(keys)
// for each for redis cluster
for _, key := range keys {
if err = c.rdb.Del(context.Background(), key).Err(); err != nil {
log.NewError("", fName, key, err.Error())
err = c.rdb.Del(context.Background(), key).Err()
if err != nil {
panic(err.Error())
}
}
}
if cursor == 0 {
break
}
}
}
}
func (c *notificationCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationUserIncrSeq+userID).Int64())
}
func (c *notificationCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationUserIncrSeq+userID).Int64())
}
func (c *notificationCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error {
return errs.Wrap(c.rdb.Set(ctx, NotificationUserIncrSeq+userID, maxSeq, 0).Err())
}
func (c *notificationCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, NotificationUserMinSeq+userID, minSeq, 0).Err())
}
func (c *notificationCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationUserMinSeq+userID).Int64())
}
func (c *notificationCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
key := NotificationGroupUserMinSeq + "g:" + groupID + "u:" + userID
return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (c *notificationCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
key := NotificationGroupUserMinSeq + "g:" + groupID + "u:" + userID
return utils.Wrap2(c.rdb.Get(ctx, key).Int64())
}
func (c *notificationCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationGroupMaxSeq+groupID).Int64())
}
func (c *notificationCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationGroupMinSeq+groupID).Int64())
}
func (c *notificationCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
key := NotificationGroupMaxSeq + groupID
seq, err := c.rdb.Incr(ctx, key).Uint64()
return int64(seq), errs.Wrap(err)
}
func (c *notificationCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error {
key := NotificationGroupMaxSeq + groupID
return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err())
}
func (c *notificationCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error {
key := NotificationGroupMinSeq + groupID
return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err())
}
func (c *notificationCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := NotificationUidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
}
func (c *notificationCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) {
key := NotificationUidPidToken + userID + ":" + platformID
m, err := c.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, errs.Wrap(err)
}
mm := make(map[string]int)
for k, v := range m {
mm[k] = utils.StringToInt(v)
}
return mm, nil
}
func (c *notificationCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error {
key := NotificationUidPidToken + userID + ":" + platform
mm := make(map[string]interface{})
for k, v := range m {
mm[k] = v
}
return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err())
}
func (c *notificationCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error {
key := NotificationUidPidToken + userID + ":" + platform
return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err())
}
func (c *notificationCache) getMessageCacheKey(sourceID string, seq int64) string {
return NotificationMessageCache + sourceID + "_" + strconv.Itoa(int(seq))
}
func (c *notificationCache) allMessageCacheKey(sourceID string) string {
return NotificationMessageCache + sourceID + "_*"
}
func (c *notificationCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
for _, v := range seqs {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := c.getMessageCacheKey(userID, v)
if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
return nil, nil, err
}
}
result, err := pipe.Exec(ctx)
for i, v := range result {
if v.Err() != nil {
failedSeqs = append(failedSeqs, seqs[i])
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(v.String(), &msg)
if err != nil {
failedSeqs = append(failedSeqs, seqs[i])
} else {
seqMsgs = append(seqMsgs, &msg)
}
}
}
return seqMsgs, failedSeqs, err
}
func (c *notificationCache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) {
pipe := c.rdb.Pipeline()
var failedMsgs []sdkws.MsgData
for _, msg := range msgList {
key := c.getMessageCacheKey(userID, msg.Seq)
s, err := utils.Pb2String(msg)
if err != nil {
return 0, errs.Wrap(err)
}
err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
if err != nil {
return 0, errs.Wrap(err)
}
}
if len(failedMsgs) != 0 {
return len(failedMsgs), fmt.Errorf("set msg to notificationCache failed, failed lists: %v, %s", failedMsgs, userID)
}
_, err := pipe.Exec(ctx)
return 0, err
}
func (c *notificationCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error {
pipe := c.rdb.Pipeline()
for _, v := range msgList {
if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil {
return errs.Wrap(err)
}
}
_, err := pipe.Exec(ctx)
return errs.Wrap(err)
}
func (c *notificationCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result()
if err == redis.Nil {
return nil
}
if err != nil {
return errs.Wrap(err)
}
pipe := c.rdb.Pipeline()
for _, v := range vals {
if err := pipe.Del(ctx, v).Err(); err != nil {
return errs.Wrap(err)
}
}
_, err = pipe.Exec(ctx)
return errs.Wrap(err)
}
func (c *notificationCache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
req := &sdkws.SignalReq{}
if err := proto.Unmarshal(msg.Content, req); err != nil {
return false, errs.Wrap(err)
}
var inviteeUserIDs []string
var isInviteSignal bool
switch signalInfo := req.Payload.(type) {
case *sdkws.SignalReq_Invite:
inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList
isInviteSignal = true
case *sdkws.SignalReq_InviteInGroup:
inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
isInviteSignal = true
if !utils.Contain(pushToUserID, inviteeUserIDs...) {
return false, nil
}
case *sdkws.SignalReq_HungUp, *sdkws.SignalReq_Cancel, *sdkws.SignalReq_Reject, *sdkws.SignalReq_Accept:
return false, errs.Wrap(errors.New("signalInfo do not need offlinePush"))
default:
return false, nil
}
if isInviteSignal {
pipe := c.rdb.Pipeline()
for _, userID := range inviteeUserIDs {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return false, errs.Wrap(err)
}
keys := NotificationSignalListCache + userID
err = pipe.LPush(ctx, keys, msg.ClientMsgID).Err()
if err != nil {
return false, errs.Wrap(err)
}
err = pipe.Expire(ctx, keys, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, errs.Wrap(err)
}
key := NotificationSignalCache + msg.ClientMsgID
err = pipe.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, errs.Wrap(err)
}
}
_, err := pipe.Exec(ctx)
if err != nil {
return false, errs.Wrap(err)
}
}
return true, nil
}
func (c *notificationCache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) {
bytes, err := c.rdb.Get(ctx, NotificationSignalCache+clientMsgID).Bytes()
if err != nil {
return nil, errs.Wrap(err)
}
signalReq := &sdkws.SignalReq{}
if err = proto.Unmarshal(bytes, signalReq); err != nil {
return nil, errs.Wrap(err)
}
signalInviteReq = &sdkws.SignalInviteReq{}
switch req := signalReq.Payload.(type) {
case *sdkws.SignalReq_Invite:
signalInviteReq.Invitation = req.Invite.Invitation
signalInviteReq.OpUserID = req.Invite.OpUserID
case *sdkws.SignalReq_InviteInGroup:
signalInviteReq.Invitation = req.InviteInGroup.Invitation
signalInviteReq.OpUserID = req.InviteInGroup.OpUserID
}
return signalInviteReq, nil
}
func (c *notificationCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) {
key, err := c.rdb.LPop(ctx, NotificationSignalListCache+userID).Result()
if err != nil {
return nil, errs.Wrap(err)
}
invitationInfo, err = c.GetSignalInvitationInfoByClientMsgID(ctx, key)
if err != nil {
return nil, err
}
return invitationInfo, errs.Wrap(c.DelUserSignalList(ctx, userID))
}
func (c *notificationCache) DelUserSignalList(ctx context.Context, userID string) error {
return errs.Wrap(c.rdb.Del(ctx, NotificationSignalListCache+userID).Err())
}
func (c *notificationCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
for _, seq := range seqs {
key := c.getMessageCacheKey(userID, seq)
result, err := c.rdb.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
continue
}
return errs.Wrap(err)
}
var msg sdkws.MsgData
if err := jsonpb.UnmarshalString(result, &msg); err != nil {
return err
}
msg.Status = constant.MsgDeleted
s, err := utils.Pb2String(&msg)
if err != nil {
return errs.Wrap(err)
}
if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (c *notificationCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, NotificationGetuiToken, token, time.Duration(expireTime)*time.Second).Err())
}
func (c *notificationCache) GetGetuiToken(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationGetuiToken).Result())
}
func (c *notificationCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error {
return errs.Wrap(c.rdb.Set(ctx, NotificationGetuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err())
}
func (c *notificationCache) GetGetuiTaskID(ctx context.Context) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationGetuiTaskID).Result())
}
func (c *notificationCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, NotificationSendMsgFailedFlag+id, status, time.Hour*24).Err())
}
func (c *notificationCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
result, err := c.rdb.Get(ctx, NotificationSendMsgFailedFlag+id).Int()
return int32(result), errs.Wrap(err)
}
func (c *notificationCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) {
return errs.Wrap(c.rdb.Set(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err())
}
func (c *notificationCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID)).Result())
}
func (c *notificationCache) DelFcmToken(ctx context.Context, account string, platformID int) error {
return errs.Wrap(c.rdb.Del(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID)).Err())
}
func (c *notificationCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
seq, err := c.rdb.Incr(ctx, NotificationUserBadgeUnreadCountSum+userID).Result()
return int(seq), errs.Wrap(err)
}
func (c *notificationCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error {
return errs.Wrap(c.rdb.Set(ctx, NotificationUserBadgeUnreadCountSum+userID, value, 0).Err())
}
func (c *notificationCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) {
return utils.Wrap2(c.rdb.Get(ctx, NotificationUserBadgeUnreadCountSum+userID).Int())
}
func (c *notificationCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := NotificationExTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
}
func (c *notificationCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
key := NotificationExTypeKeyLocker + clientMsgID + "_" + TypeKey
return errs.Wrap(c.rdb.Del(ctx, key).Err())
}
func (c *notificationCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return "EX_SINGLE_" + clientMsgID
case constant.GroupChatType:
return "EX_GROUP_" + clientMsgID
case constant.SuperGroupChatType:
return "EX_SUPER_GROUP_" + clientMsgID
case constant.NotificationChatType:
return "EX_NOTIFICATION" + clientMsgID
}
return ""
}
func (c *notificationCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
if err != nil {
return false, utils.Wrap(err, "")
}
return n > 0, nil
}
func (c *notificationCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
}
func (c *notificationCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result())
}
func (c *notificationCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result())
}
func (c *notificationCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result())
}
func (c *notificationCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}

View File

@ -87,11 +87,11 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
return t, nil return t, nil
} }
if v == "" { if v == "" {
return t, errs.ErrRecordNotFound.Wrap("cache is not found") return t, errs.ErrRecordNotFound.Wrap("msgCache is not found")
} }
err = json.Unmarshal([]byte(v), &t) err = json.Unmarshal([]byte(v), &t)
if err != nil { if err != nil {
log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) log.ZError(ctx, "msgCache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire)
return t, utils.Wrap(err, "") return t, utils.Wrap(err, "")
} }
return t, nil return t, nil

View File

@ -18,13 +18,13 @@ type AuthDatabase interface {
} }
type authDatabase struct { type authDatabase struct {
cache cache.Model cache cache.MsgModel
accessSecret string accessSecret string
accessExpire int64 accessExpire int64
} }
func NewAuthDatabase(cache cache.Model, accessSecret string, accessExpire int64) AuthDatabase { func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int64) AuthDatabase {
return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire} return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire}
} }

View File

@ -85,7 +85,7 @@ type MsgDatabase interface {
MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error
} }
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase { func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) MsgDatabase {
return &msgDatabase{ return &msgDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
cache: cacheModel, cache: cacheModel,
@ -97,7 +97,7 @@ func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel ca
} }
func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase { func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase {
cacheModel := cache.NewCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(database) msgDocModel := unrelation.NewMsgMongoDriver(database)
msgDatabase := NewMsgDatabase(msgDocModel, cacheModel) msgDatabase := NewMsgDatabase(msgDocModel, cacheModel)
return msgDatabase return msgDatabase
@ -106,7 +106,7 @@ func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDat
type msgDatabase struct { type msgDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
cache cache.Model cache cache.MsgModel
producer *kafka.Producer producer *kafka.Producer
producerToMongo *kafka.Producer producerToMongo *kafka.Producer
producerToModify *kafka.Producer producerToModify *kafka.Producer

View File

@ -85,7 +85,7 @@ type NotificationDatabase interface {
MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error
} }
func NewNotificationDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) NotificationDatabase { func NewNotificationDatabase(msgDocModel unRelationTb.NotificationDocModelInterface, cacheModel cache.NotificationModel) NotificationDatabase {
return &notificationDatabase{ return &notificationDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
cache: cacheModel, cache: cacheModel,
@ -97,22 +97,23 @@ func NewNotificationDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cach
} }
func InitNotificationDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase { func InitNotificationDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase {
cacheModel := cache.NewCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(database) msgDocModel := unrelation.NewMsgMongoDriver(database)
msgDatabase := NewMsgDatabase(msgDocModel, cacheModel) msgDatabase := NewMsgDatabase(msgDocModel, cacheModel)
return msgDatabase return msgDatabase
} }
type notificationDatabase struct { type notificationDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface msgDocDatabase unRelationTb.NotificationDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
cache cache.Model cache cache.NotificationModel
producer *kafka.Producer producer *kafka.Producer
producerToMongo *kafka.Producer producerToMongo *kafka.Producer
producerToModify *kafka.Producer producerToModify *kafka.Producer
producerToPush *kafka.Producer producerToPush *kafka.Producer
// model // model
msg unRelationTb.MsgDocModel //msg unRelationTb.MsgDocModel
msg unRelationTb.NotificationDocModel
extendMsgSetModel unRelationTb.ExtendMsgSetModel extendMsgSetModel unRelationTb.ExtendMsgSetModel
} }
@ -231,30 +232,30 @@ func (db *notificationDatabase) GetGroupMinSeq(ctx context.Context, groupID stri
func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill() //newTime := utils.GetCurrentTimestampByMill()
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { if int64(len(msgList)) > db.msg.GetsingleGocNotificationNum() {
return errors.New("too large") return errors.New("too large")
} }
var remain int64 var remain int64
blk0 := db.msg.GetSingleGocMsgNum() - 1 blk0 := db.msg.GetsingleGocNotificationNum() - 1
//currentMaxSeq 4998 //currentMaxSeq 4998
if currentMaxSeq < db.msg.GetSingleGocMsgNum() { if currentMaxSeq < db.msg.GetsingleGocNotificationNum() {
remain = blk0 - currentMaxSeq //1 remain = blk0 - currentMaxSeq //1
} else { } else {
excludeBlk0 := currentMaxSeq - blk0 //=1 excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999 //(5000-1)%5000 == 4999
remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum() remain = (db.msg.GetsingleGocNotificationNum() - (excludeBlk0 % db.msg.GetsingleGocNotificationNum())) % db.msg.GetsingleGocNotificationNum()
} }
//remain=1 //remain=1
var insertCounter int64 var insertCounter int64
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0) msgsToMongo := make([]unRelationTb.NotificationInfoModel, 0)
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0) msgsToMongoNext := make([]unRelationTb.NotificationInfoModel, 0)
docID := "" docID := ""
docIDNext := "" docIDNext := ""
var err error var err error
for _, m := range msgList { for _, m := range msgList {
//log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) //log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
currentMaxSeq++ currentMaxSeq++
sMsg := unRelationTb.MsgInfoModel{} sMsg := unRelationTb.NotificationInfoModel{}
sMsg.SendTime = m.SendTime sMsg.SendTime = m.SendTime
m.Seq = currentMaxSeq m.Seq = currentMaxSeq
if sMsg.Msg, err = proto.Marshal(m); err != nil { if sMsg.Msg, err = proto.Marshal(m); err != nil {
@ -279,7 +280,7 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID
err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo) err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo)
if err != nil { if err != nil {
if err == mongo.ErrNoDocuments { if err == mongo.ErrNoDocuments {
doc := &unRelationTb.MsgDocModel{} doc := &unRelationTb.NotificationDocModel{}
doc.DocID = docID doc.DocID = docID
doc.Msg = msgsToMongo doc.Msg = msgsToMongo
if err = db.msgDocDatabase.Create(ctx, doc); err != nil { if err = db.msgDocDatabase.Create(ctx, doc); err != nil {
@ -298,7 +299,7 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID
} }
} }
if docIDNext != "" { if docIDNext != "" {
nextDoc := &unRelationTb.MsgDocModel{} nextDoc := &unRelationTb.NotificationDocModel{}
nextDoc.DocID = docIDNext nextDoc.DocID = docIDNext
nextDoc.Msg = msgsToMongoNext nextDoc.Msg = msgsToMongoNext
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) //log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
@ -324,7 +325,7 @@ func (db *notificationDatabase) NotificationBatchInsertChat2Cache(ctx context.Co
func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error) { func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill() //newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList) lenList := len(msgList)
if int64(lenList) > db.msg.GetSingleGocMsgNum() { if int64(lenList) > db.msg.GetsingleGocNotificationNum() {
return 0, errors.New("too large") return 0, errors.New("too large")
} }
if lenList < 1 { if lenList < 1 {
@ -454,7 +455,7 @@ func (db *notificationDatabase) GetOldestMsg(ctx context.Context, sourceID strin
return db.unmarshalMsg(msgInfo) return db.unmarshalMsg(msgInfo)
} }
func (db *notificationDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { func (db *notificationDatabase) unmarshalMsg(msgInfo *unRelationTb.NotificationInfoModel) (msgPb *sdkws.MsgData, err error) {
msgPb = &sdkws.MsgData{} msgPb = &sdkws.MsgData{}
err = proto.Unmarshal(msgInfo.Msg, msgPb) err = proto.Unmarshal(msgInfo.Msg, msgPb)
if err != nil { if err != nil {
@ -628,7 +629,7 @@ func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, sourceID
return delStruct.getSetMinSeq() + 1, nil return delStruct.getSetMinSeq() + 1, nil
} }
//log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) //log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { if int64(len(msgs.Msg)) > db.msg.GetsingleGocNotificationNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
} }
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {

View File

@ -12,10 +12,10 @@ type PushDatabase interface {
} }
type pushDataBase struct { type pushDataBase struct {
cache cache.Model cache cache.MsgModel
} }
func NewPushDatabase(cache cache.Model) PushDatabase { func NewPushDatabase(cache cache.MsgModel) PushDatabase {
return &pushDataBase{cache: cache} return &pushDataBase{cache: cache}
} }

View File

@ -14,10 +14,10 @@ type ThirdDatabase interface {
} }
type thirdDatabase struct { type thirdDatabase struct {
cache cache.Model cache cache.MsgModel
} }
func NewThirdDatabase(cache cache.Model) ThirdDatabase { func NewThirdDatabase(cache cache.MsgModel) ThirdDatabase {
return &thirdDatabase{cache: cache} return &thirdDatabase{cache: cache}
} }

View File

@ -1,28 +1,133 @@
package unrelation package unrelation
import "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"strconv"
"strings"
)
type MsgModel struct { const (
SendID string `bson:"send_id"` singleGocMsgNum = 5000
RecvID string `bson:"recv_id"` Msg = "msg"
GroupID string `bson:"group_id"` OldestList = 0
ClientMsgID string `bson:"client_msg_id"` // 客户端消息ID NewestList = -1
ServerMsgID string `bson:"server_msg_id"` // 服务端消息ID )
SenderPlatformID int32 `bson:"sender_platform_id"`
SenderNickname string `bson:"sender_nickname"` type MsgDocModel struct {
SenderFaceURL string `bson:"sender_face_url"` DocID string `bson:"uid"`
SessionType int32 `bson:"session_type"` Msg []MsgInfoModel `bson:"msg"`
MsgFrom int32 `bson:"msg_from"` }
ContentType int32 `bson:"contentType"`
Content []byte `bson:"content"` type MsgInfoModel struct {
Seq int64 `bson:"seq"` SendTime int64 `bson:"sendtime"`
SendTime int64 `bson:"sendTime"` Msg []byte `bson:"msg"`
CreateTime int64 `bson:"createTime"` }
Status int32 `bson:"status"`
Options map[string]bool `bson:"options"` type MsgDocModelInterface interface {
OfflinePushInfo *sdkws.OfflinePushInfo `bson:"offlinePushInfo"` PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
AtUserIDList []string `bson:"atUserIDList"` Create(ctx context.Context, model *MsgDocModel) error
MsgDataList []byte `bson:"msgDataList"` UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
AttachedInfo string `bson:"attachedInfo"` FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
Ex string `bson:"ex"` GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
Delete(ctx context.Context, docIDs []string) error
GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error)
UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error
}
func (MsgDocModel) TableName() string {
return Msg
}
func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum
}
func (m *MsgDocModel) IsFull() bool {
index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1])
if index == 0 {
if len(m.Msg) >= singleGocMsgNum-1 {
return true
}
}
if len(m.Msg) >= singleGocMsgNum {
return true
}
return false
}
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.indexGen(sourceID, seqSuffix)
}
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum
var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, int64(i))
seqUserIDs = append(seqUserIDs, seqUserID)
}
return seqUserIDs
}
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.superGroupIndexGen(groupID, seqSuffix)
}
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i])
if value, ok := t[docID]; !ok {
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
}
}
return t
}
func (m MsgDocModel) getMsgIndex(seq uint32) int {
seqSuffix := seq / singleGocMsgNum
var index uint32
if seqSuffix == 0 {
index = (seq - seqSuffix*singleGocMsgNum) - 1
} else {
index = seq - seqSuffix*singleGocMsgNum
}
return int(index)
}
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
}
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
}
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
msg.GroupID = groupID
msg.SessionType = constant.SuperGroupChatType
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
} }

View File

@ -9,63 +9,63 @@ import (
) )
const ( const (
singleGocMsgNum = 5000 singleGocNotificationNum = 5000
Msg = "msg" Notification = "notification"
OldestList = 0 //OldestList = 0
NewestList = -1 //NewestList = -1
) )
type MsgDocModel struct { type NotificationDocModel struct {
DocID string `bson:"uid"` DocID string `bson:"uid"`
Msg []MsgInfoModel `bson:"msg"` Msg []NotificationInfoModel `bson:"msg"`
} }
type MsgInfoModel struct { type NotificationInfoModel struct {
SendTime int64 `bson:"sendtime"` SendTime int64 `bson:"sendtime"`
Msg []byte `bson:"msg"` Msg []byte `bson:"msg"`
} }
type MsgDocModelInterface interface { type NotificationDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []NotificationInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error Create(ctx context.Context, model *NotificationDocModel) error
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) FindOneByDocID(ctx context.Context, docID string) (*NotificationDocModel, error)
GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error) GetNewestMsg(ctx context.Context, sourceID string) (*NotificationInfoModel, error)
GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, sourceID string) (*NotificationInfoModel, error)
Delete(ctx context.Context, docIDs []string) error Delete(ctx context.Context, docIDs []string) error
GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error) GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*NotificationDocModel, error)
UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error UpdateOneDoc(ctx context.Context, msg *NotificationDocModel) error
} }
func (MsgDocModel) TableName() string { func (NotificationDocModel) TableName() string {
return Msg return Notification
} }
func (MsgDocModel) GetSingleGocMsgNum() int64 { func (NotificationDocModel) GetsingleGocNotificationNum() int64 {
return singleGocMsgNum return singleGocNotificationNum
} }
func (m *MsgDocModel) IsFull() bool { func (m *NotificationDocModel) IsFull() bool {
index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1]) index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1])
if index == 0 { if index == 0 {
if len(m.Msg) >= singleGocMsgNum-1 { if len(m.Msg) >= singleGocNotificationNum-1 {
return true return true
} }
} }
if len(m.Msg) >= singleGocMsgNum { if len(m.Msg) >= singleGocNotificationNum {
return true return true
} }
return false return false
} }
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string { func (m NotificationDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocNotificationNum
return m.indexGen(sourceID, seqSuffix) return m.indexGen(sourceID, seqSuffix)
} }
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { func (m NotificationDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum seqMaxSuffix := maxSeq / singleGocNotificationNum
var seqUserIDs []string var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ { for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, int64(i)) seqUserID := m.indexGen(userID, int64(i))
@ -74,16 +74,16 @@ func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
return seqUserIDs return seqUserIDs
} }
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string { func (m NotificationDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocNotificationNum
return m.superGroupIndexGen(groupID, seqSuffix) return m.superGroupIndexGen(groupID, seqSuffix)
} }
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { func (m NotificationDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
} }
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 { func (m NotificationDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64) t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ { for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i]) docID := m.GetDocID(sourceID, seqs[i])
@ -97,22 +97,22 @@ func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][
return t return t
} }
func (m MsgDocModel) getMsgIndex(seq uint32) int { func (m NotificationDocModel) getMsgIndex(seq uint32) int {
seqSuffix := seq / singleGocMsgNum seqSuffix := seq / singleGocNotificationNum
var index uint32 var index uint32
if seqSuffix == 0 { if seqSuffix == 0 {
index = (seq - seqSuffix*singleGocMsgNum) - 1 index = (seq - seqSuffix*singleGocNotificationNum) - 1
} else { } else {
index = seq - seqSuffix*singleGocMsgNum index = seq - seqSuffix*singleGocNotificationNum
} }
return int(index) return int(index)
} }
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string { func (m NotificationDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10) return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
} }
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { func (NotificationDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs { for _, v := range seqs {
msg := new(sdkws.MsgData) msg := new(sdkws.MsgData)
msg.Seq = v msg.Seq = v
@ -121,7 +121,7 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkw
return exceptionMsg return exceptionMsg
} }
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) { func (NotificationDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs { for _, v := range seqs {
msg := new(sdkws.MsgData) msg := new(sdkws.MsgData)
msg.Seq = v msg.Seq = v

View File

@ -0,0 +1,134 @@
package unrelation
import (
"context"
"errors"
"fmt"
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"google.golang.org/protobuf/proto"
)
var ErrNotificationListNotExist = errors.New("user not have msg in mongoDB")
var ErrNotificationNotFound = errors.New("msg not found")
type NotificationMongoDriver struct {
MsgCollection *mongo.Collection
msg table.NotificationDocModel
}
func NewNotificationMongoDriver(database *mongo.Database) table.NotificationDocModelInterface {
return &NotificationMongoDriver{MsgCollection: database.Collection(table.NotificationDocModel{}.TableName())}
}
func (m *NotificationMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.NotificationInfoModel) error {
filter := bson.M{"uid": docID}
return m.MsgCollection.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
}
func (m *NotificationMongoDriver) Create(ctx context.Context, model *table.NotificationDocModel) error {
_, err := m.MsgCollection.InsertOne(ctx, model)
return err
}
func (m *NotificationMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error {
msg.Status = status
bytes, err := proto.Marshal(msg)
if err != nil {
return utils.Wrap(err, "")
}
_, err = m.MsgCollection.UpdateOne(ctx, bson.M{"uid": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msg.%d.msg", seqIndex): bytes}})
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
func (m *NotificationMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.NotificationDocModel, error) {
doc := &table.NotificationDocModel{}
err := m.MsgCollection.FindOne(ctx, bson.M{"uid": docID}).Decode(doc)
return doc, err
}
func (m *NotificationMongoDriver) GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*table.NotificationDocModel, error) {
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"uid": 1})
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: fmt.Sprintf("^%s:", sourceID)}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var msgs []table.NotificationDocModel
err = cursor.All(context.Background(), &msgs)
if err != nil {
return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
if len(msgs) > 0 {
return &msgs[0], nil
}
return nil, ErrMsgListNotExist
}
func (m *NotificationMongoDriver) GetNewestMsg(ctx context.Context, sourceID string) (*table.NotificationInfoModel, error) {
var msgDocs []table.NotificationDocModel
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", sourceID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": -1}))
if err != nil {
return nil, utils.Wrap(err, "")
}
err = cursor.All(ctx, &msgDocs)
if err != nil {
return nil, utils.Wrap(err, "")
}
if len(msgDocs) > 0 {
if len(msgDocs[0].Msg) > 0 {
return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil
}
return nil, errors.New("len(msgDocs[0].Msg) < 0")
}
return nil, ErrMsgNotFound
}
func (m *NotificationMongoDriver) GetOldestMsg(ctx context.Context, sourceID string) (*table.NotificationInfoModel, error) {
var msgDocs []table.NotificationDocModel
cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", sourceID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": 1}))
if err != nil {
return nil, err
}
err = cursor.All(ctx, &msgDocs)
if err != nil {
return nil, utils.Wrap(err, "")
}
var oldestMsg table.NotificationInfoModel
if len(msgDocs) > 0 {
for _, v := range msgDocs[0].Msg {
if v.SendTime != 0 {
oldestMsg = v
break
}
}
if len(oldestMsg.Msg) == 0 {
if len(msgDocs[0].Msg) > 0 {
oldestMsg = msgDocs[0].Msg[0]
}
}
return &oldestMsg, nil
}
return nil, ErrMsgNotFound
}
func (m *NotificationMongoDriver) Delete(ctx context.Context, docIDs []string) error {
if docIDs == nil {
return nil
}
_, err := m.MsgCollection.DeleteMany(ctx, bson.M{"uid": bson.M{"$in": docIDs}})
return err
}
func (m *NotificationMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.NotificationDocModel) error {
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"uid": msg.DocID}, bson.M{"$set": bson.M{"msg": msg.Msg}})
return err
}

View File

@ -109,7 +109,7 @@ func GinParseOperationID() gin.HandlerFunc {
} }
} }
func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc { func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
dataBase := controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire) dataBase := controller.NewAuthDatabase(cache.NewMsgCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire)
return func(c *gin.Context) { return func(c *gin.Context) {
switch c.Request.Method { switch c.Request.Method {
case http.MethodPost: case http.MethodPost: