mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 12:42:12 +08:00 
			
		
		
		
	feat: improve auth localcache.
This commit is contained in:
		
							parent
							
								
									8c4e69ec41
								
							
						
					
					
						commit
						721a89cdfa
					
				| @ -83,7 +83,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 		} | 		} | ||||||
| 		token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) | 		token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) | ||||||
| 	} else { | 	} else { | ||||||
| 		token = redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire) | 		token = redis2.NewTokenCacheModel(rdb, &config.LocalCacheConfig, config.RpcConfig.TokenPolicy.Expire) | ||||||
| 	} | 	} | ||||||
| 	userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) | 	userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -135,9 +135,6 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke | |||||||
| 
 | 
 | ||||||
| 	prommetrics.UserLoginCounter.Inc() | 	prommetrics.UserLoginCounter.Inc() | ||||||
| 
 | 
 | ||||||
| 	// Remove local cache for the token |  | ||||||
| 	s.AuthLocalCache.RemoveLocalTokenCache(ctx, req.UserID, int(constant.AdminPlatformID)) |  | ||||||
| 
 |  | ||||||
| 	resp.Token = token | 	resp.Token = token | ||||||
| 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | ||||||
| 	return &resp, nil | 	return &resp, nil | ||||||
| @ -169,9 +166,6 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR | |||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Remove local cache for the token |  | ||||||
| 	s.AuthLocalCache.RemoveLocalTokenCache(ctx, req.UserID, int(req.PlatformID)) |  | ||||||
| 
 |  | ||||||
| 	resp.Token = token | 	resp.Token = token | ||||||
| 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | ||||||
| 	return &resp, nil | 	return &resp, nil | ||||||
|  | |||||||
							
								
								
									
										79
									
								
								pkg/common/storage/cache/redis/token.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										79
									
								
								pkg/common/storage/cache/redis/token.go
									
									
									
									
										vendored
									
									
								
							| @ -2,13 +2,16 @@ package redis | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"github.com/redis/go-redis/v9" | 	"github.com/redis/go-redis/v9" | ||||||
| ) | ) | ||||||
| @ -16,16 +19,26 @@ import ( | |||||||
| type tokenCache struct { | type tokenCache struct { | ||||||
| 	rdb          redis.UniversalClient | 	rdb          redis.UniversalClient | ||||||
| 	accessExpire time.Duration | 	accessExpire time.Duration | ||||||
|  | 	localCache   *config.LocalCache | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { | func NewTokenCacheModel(rdb redis.UniversalClient, localCache *config.LocalCache, accessExpire int64) cache.TokenModel { | ||||||
| 	c := &tokenCache{rdb: rdb} | 	c := &tokenCache{rdb: rdb, localCache: localCache} | ||||||
| 	c.accessExpire = c.getExpireTime(accessExpire) | 	c.accessExpire = c.getExpireTime(accessExpire) | ||||||
| 	return c | 	return c | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { | func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { | ||||||
| 	return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) | 	key := cachekey.GetTokenKey(userID, platformID) | ||||||
|  | 	if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil { | ||||||
|  | 		return errs.Wrap(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // SetTokenFlagEx set token and flag with expire time | // SetTokenFlagEx set token and flag with expire time | ||||||
| @ -37,6 +50,11 @@ func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platform | |||||||
| 	if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { | 	if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { | ||||||
| 		return errs.Wrap(err) | 		return errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -106,7 +124,17 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla | |||||||
| 	for k, v := range m { | 	for k, v := range m { | ||||||
| 		mm[k] = v | 		mm[k] = v | ||||||
| 	} | 	} | ||||||
| 	return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()) | 
 | ||||||
|  | 	err := c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return errs.Wrap(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, cachekey.GetTokenKey(userID, platformID)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { | func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { | ||||||
| @ -124,11 +152,23 @@ func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[st | |||||||
| 	}); err != nil { | 	}); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, keys...) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { | func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { | ||||||
| 	return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) | 	key := cachekey.GetTokenKey(userID, platformID) | ||||||
|  | 	if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { | ||||||
|  | 		return errs.Wrap(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) getExpireTime(t int64) time.Duration { | func (c *tokenCache) getExpireTime(t int64) time.Duration { | ||||||
| @ -161,6 +201,11 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// Remove local cache for the token | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, keys...) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -175,5 +220,29 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p | |||||||
| 	if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { | 	if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { | ||||||
| 		return errs.Wrap(err) | 		return errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *tokenCache) removeLocalTokenCache(ctx context.Context, keys ...string) { | ||||||
|  | 	if len(keys) == 0 { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	topic := c.localCache.Auth.Topic | ||||||
|  | 	if topic == "" { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	data, err := json.Marshal(keys) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys) | ||||||
|  | 	} else { | ||||||
|  | 		if err := c.rdb.Publish(ctx, topic, string(data)).Err(); err != nil { | ||||||
|  | 			log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
| @ -194,7 +194,6 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group, | |||||||
| 			} | 			} | ||||||
| 			for _, group := range groups { | 			for _, group := range groups { | ||||||
| 				c = c.DelGroupsInfo(group.GroupID). | 				c = c.DelGroupsInfo(group.GroupID). | ||||||
| 					DelGroupMembersHash(group.GroupID). |  | ||||||
| 					DelGroupMembersHash(group.GroupID). | 					DelGroupMembersHash(group.GroupID). | ||||||
| 					DelGroupsMemberNum(group.GroupID). | 					DelGroupsMemberNum(group.GroupID). | ||||||
| 					DelGroupMemberIDs(group.GroupID). | 					DelGroupMemberIDs(group.GroupID). | ||||||
|  | |||||||
| @ -2,6 +2,7 @@ package rpccache | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | ||||||
| @ -49,12 +50,13 @@ func (a *AuthLocalCache) GetExistingToken(ctx context.Context, userID string, pl | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, platformID int) (val *auth.GetExistingTokenResp, err error) { | func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, platformID int) (val *auth.GetExistingTokenResp, err error) { | ||||||
|  | 	start := time.Now() | ||||||
| 	log.ZDebug(ctx, "AuthLocalCache GetExistingToken req", "userID", userID, "platformID", platformID) | 	log.ZDebug(ctx, "AuthLocalCache GetExistingToken req", "userID", userID, "platformID", platformID) | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "userID", userID, "platformID", platformID) | 			log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "cost", time.Since(start), "userID", userID, "platformID", platformID) | ||||||
| 		} else { | 		} else { | ||||||
| 			log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "userID", userID, "platformID", platformID, "val", val) | 			log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "cost", time.Since(start), "userID", userID, "platformID", platformID, "val", val) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| @ -65,9 +67,3 @@ func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, pl | |||||||
| 		return cache.Marshal(a.client.AuthClient.GetExistingToken(ctx, &auth.GetExistingTokenReq{UserID: userID, PlatformID: int32(platformID)})) | 		return cache.Marshal(a.client.AuthClient.GetExistingToken(ctx, &auth.GetExistingTokenReq{UserID: userID, PlatformID: int32(platformID)})) | ||||||
| 	})) | 	})) | ||||||
| } | } | ||||||
| 
 |  | ||||||
| func (a *AuthLocalCache) RemoveLocalTokenCache(ctx context.Context, userID string, platformID int) { |  | ||||||
| 	key := cachekey.GetTokenKey(userID, platformID) |  | ||||||
| 	a.local.DelLocal(ctx, key) |  | ||||||
| 	log.ZDebug(ctx, "AuthLocalCache RemoveLocalTokenCache", "userID", userID, "platformID", platformID, "key", key) |  | ||||||
| } |  | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user