From 56c5c1f01511064cb0ef787cf4fa15dbdb1834f9 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Tue, 6 May 2025 15:10:10 +0800 Subject: [PATCH] fix: delete token by correct platformID && feat: adminToken can be retained for five minutes after deleting (#3313) --- internal/rpc/auth/auth.go | 17 +++-- pkg/common/storage/cache/cachekey/token.go | 7 +- pkg/common/storage/cache/mcache/token.go | 38 +++++++++- pkg/common/storage/cache/redis/token.go | 73 +++++++++++++++++-- pkg/common/storage/cache/token.go | 3 + pkg/common/storage/controller/auth.go | 82 ++++++++++++++-------- pkg/common/storage/database/mgo/cache.go | 2 +- 7 files changed, 177 insertions(+), 45 deletions(-) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 399d0bd4b..6697e27c0 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -159,15 +159,17 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim if err != nil { return nil, err } - isAdmin := authverify.IsManagerUserID(claims.UserID, s.config.Share.IMAdminUserID) - if isAdmin { - return claims, nil - } m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID) if err != nil { return nil, err } if len(m) == 0 { + isAdmin := authverify.IsManagerUserID(claims.UserID, s.config.Share.IMAdminUserID) + if isAdmin { + if err = s.authDatabase.GetTemporaryTokensWithoutError(ctx, claims.UserID, claims.PlatformID, tokensString); err == nil { + return claims, nil + } + } return nil, servererrs.ErrTokenNotExist.Wrap() } if v, ok := m[tokensString]; ok { @@ -179,6 +181,13 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim default: return nil, errs.Wrap(errs.ErrTokenUnknown) } + } else { + isAdmin := authverify.IsManagerUserID(claims.UserID, s.config.Share.IMAdminUserID) + if isAdmin { + if err = s.authDatabase.GetTemporaryTokensWithoutError(ctx, claims.UserID, claims.PlatformID, tokensString); err == nil { + return claims, nil + } + } } return nil, servererrs.ErrTokenNotExist.Wrap() } diff --git a/pkg/common/storage/cache/cachekey/token.go b/pkg/common/storage/cache/cachekey/token.go index 83ba2f211..6fe1bdfef 100644 --- a/pkg/common/storage/cache/cachekey/token.go +++ b/pkg/common/storage/cache/cachekey/token.go @@ -1,8 +1,9 @@ package cachekey import ( - "github.com/openimsdk/protocol/constant" "strings" + + "github.com/openimsdk/protocol/constant" ) const ( @@ -13,6 +14,10 @@ func GetTokenKey(userID string, platformID int) string { return UidPidToken + userID + ":" + constant.PlatformIDToName(platformID) } +func GetTemporaryTokenKey(userID string, platformID int, token string) string { + return UidPidToken + ":TEMPORARY:" + userID + ":" + constant.PlatformIDToName(platformID) + ":" + token +} + func GetAllPlatformTokenKey(userID string) []string { res := make([]string, len(constant.PlatformID2Name)) for k := range constant.PlatformID2Name { diff --git a/pkg/common/storage/cache/mcache/token.go b/pkg/common/storage/cache/mcache/token.go index d7ae29cfc..98b9cc066 100644 --- a/pkg/common/storage/cache/mcache/token.go +++ b/pkg/common/storage/cache/mcache/token.go @@ -27,7 +27,6 @@ type tokenCache struct { func (x *tokenCache) getTokenKey(userID string, platformID int, token string) string { return cachekey.GetTokenKey(userID, platformID) + ":" + token - } func (x *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { @@ -57,6 +56,14 @@ func (x *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, p return mm, nil } +func (x *tokenCache) HasTemporaryToken(ctx context.Context, userID string, platformID int, token string) error { + key := cachekey.GetTemporaryTokenKey(userID, platformID, token) + if _, err := x.cache.Get(ctx, []string{key}); err != nil { + return err + } + return nil +} + func (x *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) { prefix := cachekey.UidPidToken + userID + ":" tokens, err := x.cache.Prefix(ctx, prefix) @@ -128,3 +135,32 @@ func (x *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, pla func (x *tokenCache) getExpireTime(t int64) time.Duration { return time.Hour * 24 * time.Duration(t) } + +func (x *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, tokens map[int][]string) error { + keys := make([]string, 0, len(tokens)) + for platformID, ts := range tokens { + for _, t := range ts { + keys = append(keys, x.getTokenKey(userID, platformID, t)) + } + } + return x.cache.Del(ctx, keys) +} + +func (x *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error { + keys := make([]string, 0, len(fields)) + for _, f := range fields { + keys = append(keys, x.getTokenKey(userID, platformID, f)) + } + if err := x.cache.Del(ctx, keys); err != nil { + return err + } + + for _, f := range fields { + k := cachekey.GetTemporaryTokenKey(userID, platformID, f) + if err := x.cache.Set(ctx, k, "", time.Minute*5); err != nil { + return errs.Wrap(err) + } + } + + return nil +} diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index 510da43e3..b3870daee 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -9,6 +9,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" ) @@ -55,6 +56,14 @@ func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, p return mm, nil } +func (c *tokenCache) HasTemporaryToken(ctx context.Context, userID string, platformID int, token string) error { + err := c.rdb.Get(ctx, cachekey.GetTemporaryTokenKey(userID, platformID, token)).Err() + if err != nil { + return errs.Wrap(err) + } + return nil +} + func (c *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) { var ( res = make(map[int]map[string]int) @@ -101,13 +110,19 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla } func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { - pipe := c.rdb.Pipeline() - for k, v := range tokens { - pipe.HSet(ctx, k, v) - } - _, err := pipe.Exec(ctx) - if err != nil { - return errs.Wrap(err) + keys := datautil.Keys(tokens) + if err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { + pipe := c.rdb.Pipeline() + for k, v := range tokens { + pipe.HSet(ctx, k, v) + } + _, err := pipe.Exec(ctx) + if err != nil { + return errs.Wrap(err) + } + return nil + }); err != nil { + return err } return nil } @@ -119,3 +134,47 @@ func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, pla func (c *tokenCache) getExpireTime(t int64) time.Duration { return time.Hour * 24 * time.Duration(t) } + +// DeleteTokenByTokenMap tokens key is platformID, value is token slice +func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, tokens map[int][]string) error { + var ( + keys = make([]string, 0, len(tokens)) + keyMap = make(map[string][]string) + ) + for k, v := range tokens { + k1 := cachekey.GetTokenKey(userID, k) + keys = append(keys, k1) + keyMap[k1] = v + } + + if err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { + pipe := c.rdb.Pipeline() + for k, v := range tokens { + pipe.HDel(ctx, cachekey.GetTokenKey(userID, k), v...) + } + _, err := pipe.Exec(ctx) + if err != nil { + return errs.Wrap(err) + } + return nil + }); err != nil { + return err + } + + return nil +} + +func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error { + key := cachekey.GetTokenKey(userID, platformID) + if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { + return errs.Wrap(err) + } + for _, f := range fields { + k := cachekey.GetTemporaryTokenKey(userID, platformID, f) + if err := c.rdb.Set(ctx, k, "", time.Minute*5).Err(); err != nil { + return errs.Wrap(err) + } + } + + return nil +} diff --git a/pkg/common/storage/cache/token.go b/pkg/common/storage/cache/token.go index e5e0a9383..441c08939 100644 --- a/pkg/common/storage/cache/token.go +++ b/pkg/common/storage/cache/token.go @@ -9,8 +9,11 @@ type TokenModel interface { // SetTokenFlagEx set token and flag with expire time SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) + HasTemporaryToken(ctx context.Context, userID string, platformID int, token string) error GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error + DeleteTokenByTokenMap(ctx context.Context, userID string, tokens map[int][]string) error + DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 013d8b155..488a116c3 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -17,6 +17,8 @@ import ( type AuthDatabase interface { // If the result is empty, no error is returned. GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) + + GetTemporaryTokensWithoutError(ctx context.Context, userID string, platformID int, token string) error // Create token CreateToken(ctx context.Context, userID string, platformID int) (string, error) @@ -52,6 +54,10 @@ func (a *authDatabase) GetTokensWithoutError(ctx context.Context, userID string, return a.cache.GetTokensWithoutError(ctx, userID, platformID) } +func (a *authDatabase) GetTemporaryTokensWithoutError(ctx context.Context, userID string, platformID int, token string) error { + return a.cache.HasTemporaryToken(ctx, userID, platformID, token) +} + func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { return a.cache.SetTokenMapByUidPid(ctx, userID, platformID, m) } @@ -85,23 +91,30 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return "", err } - deleteTokenKey, kickedTokenKey, err := a.checkToken(ctx, tokens, platformID) + deleteTokenKey, kickedTokenKey, adminTokens, err := a.checkToken(ctx, tokens, platformID) if err != nil { return "", err } if len(deleteTokenKey) != 0 { - err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) + err = a.cache.DeleteTokenByTokenMap(ctx, userID, deleteTokenKey) if err != nil { return "", err } } if len(kickedTokenKey) != 0 { - for _, k := range kickedTokenKey { - err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken) - if err != nil { - return "", err + for plt, ks := range kickedTokenKey { + for _, k := range ks { + err := a.cache.SetTokenFlagEx(ctx, userID, plt, k, constant.KickedToken) + if err != nil { + return "", err + } + log.ZDebug(ctx, "kicked token in create token", "token", k) } - log.ZDebug(ctx, "kicked token in create token", "token", k) + } + } + if len(adminTokens) != 0 { + if err = a.cache.DeleteAndSetTemporary(ctx, userID, constant.AdminPlatformID, adminTokens); err != nil { + return "", err } } @@ -119,12 +132,13 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return tokenString, nil } -func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string]int, platformID int) ([]string, []string, error) { - // todo: Move the logic for handling old data to another location. +// checkToken will check token by tokenPolicy and return deleteToken,kickToken,deleteAdminToken +func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string]int, platformID int) (map[int][]string, map[int][]string, []string, error) { + // todo: Asynchronous deletion of old data. var ( loginTokenMap = make(map[int][]string) // The length of the value of the map must be greater than 0 - deleteToken = make([]string, 0) - kickToken = make([]string, 0) + deleteToken = make(map[int][]string) + kickToken = make(map[int][]string) adminToken = make([]string, 0) unkickTerminal = "" ) @@ -133,7 +147,7 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string for k, v := range tks { _, err := tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret)) if err != nil || v != constant.NormalToken { - deleteToken = append(deleteToken, k) + deleteToken[plfID] = append(deleteToken[plfID], k) } else { if plfID != constant.AdminPlatformID { loginTokenMap[plfID] = append(loginTokenMap[plfID], k) @@ -153,14 +167,15 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string } limit := a.multiLogin.MaxNumOneEnd if l > limit { - kickToken = append(kickToken, ts[:l-limit]...) + kickToken[plt] = ts[:l-limit] } } case constant.AllLoginButSameTermKick: for plt, ts := range loginTokenMap { - kickToken = append(kickToken, ts[:len(ts)-1]...) + kickToken[plt] = ts[:len(ts)-1] + if plt == platformID { - kickToken = append(kickToken, ts[len(ts)-1]) + kickToken[plt] = append(kickToken[plt], ts[len(ts)-1]) } } case constant.PCAndOther: @@ -168,29 +183,33 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string if constant.PlatformIDToClass(platformID) != unkickTerminal { for plt, ts := range loginTokenMap { if constant.PlatformIDToClass(plt) != unkickTerminal { - kickToken = append(kickToken, ts...) + kickToken[plt] = ts } } } else { var ( - preKick []string - isReserve = true + preKickToken string + preKickPlt int + reserveToken = false ) for plt, ts := range loginTokenMap { if constant.PlatformIDToClass(plt) != unkickTerminal { // Keep a token from another end - if isReserve { - isReserve = false - kickToken = append(kickToken, ts[:len(ts)-1]...) - preKick = append(preKick, ts[len(ts)-1]) + if !reserveToken { + reserveToken = true + kickToken[plt] = ts[:len(ts)-1] + preKickToken = ts[len(ts)-1] + preKickPlt = plt continue } else { // Prioritize keeping Android if plt == constant.AndroidPlatformID { - kickToken = append(kickToken, preKick...) - kickToken = append(kickToken, ts[:len(ts)-1]...) + if preKickToken != "" { + kickToken[preKickPlt] = append(kickToken[preKickPlt], preKickToken) + } + kickToken[plt] = ts[:len(ts)-1] } else { - kickToken = append(kickToken, ts...) + kickToken[plt] = ts } } } @@ -203,19 +222,19 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string for plt, ts := range loginTokenMap { if constant.PlatformIDToClass(plt) == constant.PlatformIDToClass(platformID) { - kickToken = append(kickToken, ts...) + kickToken[plt] = ts } else { if _, ok := reserved[constant.PlatformIDToClass(plt)]; !ok { reserved[constant.PlatformIDToClass(plt)] = struct{}{} - kickToken = append(kickToken, ts[:len(ts)-1]...) + kickToken[plt] = ts[:len(ts)-1] continue } else { - kickToken = append(kickToken, ts...) + kickToken[plt] = ts } } } default: - return nil, nil, errs.New("unknown multiLogin policy").Wrap() + return nil, nil, nil, errs.New("unknown multiLogin policy").Wrap() } //var adminTokenMaxNum = a.multiLogin.MaxNumOneEnd @@ -226,8 +245,9 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string //if l > adminTokenMaxNum { // kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...) //} + var deleteAdminToken []string if platformID == constant.AdminPlatformID { - kickToken = append(kickToken, adminToken...) + deleteAdminToken = adminToken } - return deleteToken, kickToken, nil + return deleteToken, kickToken, deleteAdminToken, nil } diff --git a/pkg/common/storage/database/mgo/cache.go b/pkg/common/storage/database/mgo/cache.go index bcf86cd56..991dfa874 100644 --- a/pkg/common/storage/database/mgo/cache.go +++ b/pkg/common/storage/database/mgo/cache.go @@ -127,7 +127,7 @@ func (x *CacheMgo) Del(ctx context.Context, key []string) error { return nil } _, err := x.coll.DeleteMany(ctx, bson.M{"key": bson.M{"$in": key}}) - return err + return errs.Wrap(err) } func (x *CacheMgo) lockKey(key string) string {