mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 04:32:10 +08:00 
			
		
		
		
	feat: implement auth local cache. (#3533)
* feat: add auth local cache. * feat: implement auth local cache. * feat: improve auth localcache.
This commit is contained in:
		
							parent
							
								
									349a8cd9af
								
							
						
					
					
						commit
						6856a864d0
					
				| @ -1,3 +1,10 @@ | |||||||
|  | auth: | ||||||
|  |   topic: DELETE_CACHE_AUTH | ||||||
|  |   slotNum: 100 | ||||||
|  |   slotSize: 2000 | ||||||
|  |   successExpire: 300 | ||||||
|  |   failedExpire: 5 | ||||||
|  | 
 | ||||||
| user: | user: | ||||||
|   topic: DELETE_CACHE_USER |   topic: DELETE_CACHE_USER | ||||||
|   slotNum: 100 |   slotNum: 100 | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | |||||||
| 	github.com/gorilla/websocket v1.5.1 | 	github.com/gorilla/websocket v1.5.1 | ||||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||||
| 	github.com/mitchellh/mapstructure v1.5.0 | 	github.com/mitchellh/mapstructure v1.5.0 | ||||||
| 	github.com/openimsdk/protocol v0.0.73-alpha.12 | 	github.com/openimsdk/protocol v0.0.73-alpha.14 | ||||||
| 	github.com/openimsdk/tools v0.0.50-alpha.97 | 	github.com/openimsdk/tools v0.0.50-alpha.97 | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.18.0 | 	github.com/prometheus/client_golang v1.18.0 | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | |||||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||||
| github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= | github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= | ||||||
| github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||||
| github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= | github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= | ||||||
| github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= | github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||||
|  | |||||||
| @ -18,10 +18,13 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"errors" | 	"errors" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" | 	"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/localcache" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/rpccache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| @ -46,6 +49,7 @@ import ( | |||||||
| type authServer struct { | type authServer struct { | ||||||
| 	pbauth.UnimplementedAuthServer | 	pbauth.UnimplementedAuthServer | ||||||
| 	authDatabase   controller.AuthDatabase | 	authDatabase   controller.AuthDatabase | ||||||
|  | 	AuthLocalCache *rpccache.AuthLocalCache | ||||||
| 	RegisterCenter discovery.Conn | 	RegisterCenter discovery.Conn | ||||||
| 	config         *Config | 	config         *Config | ||||||
| 	userClient     *rpcli.UserClient | 	userClient     *rpcli.UserClient | ||||||
| @ -53,11 +57,12 @@ type authServer struct { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type Config struct { | type Config struct { | ||||||
| 	RpcConfig   config.Auth | 	RpcConfig        config.Auth | ||||||
| 	RedisConfig config.Redis | 	RedisConfig      config.Redis | ||||||
| 	MongoConfig config.Mongo | 	MongoConfig      config.Mongo | ||||||
| 	Share       config.Share | 	Share            config.Share | ||||||
| 	Discovery   config.Discovery | 	LocalCacheConfig config.LocalCache | ||||||
|  | 	Discovery        config.Discovery | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error { | ||||||
| @ -78,12 +83,19 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 		} | 		} | ||||||
| 		token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) | 		token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) | ||||||
| 	} else { | 	} else { | ||||||
| 		token = redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire) | 		token = redis2.NewTokenCacheModel(rdb, &config.LocalCacheConfig, config.RpcConfig.TokenPolicy.Expire) | ||||||
| 	} | 	} | ||||||
| 	userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) | 	userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	authConn, err := client.GetConn(ctx, config.Discovery.RpcService.Auth) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	localcache.InitLocalCache(&config.LocalCacheConfig) | ||||||
|  | 
 | ||||||
| 	pbauth.RegisterAuthServer(server, &authServer{ | 	pbauth.RegisterAuthServer(server, &authServer{ | ||||||
| 		RegisterCenter: client, | 		RegisterCenter: client, | ||||||
| 		authDatabase: controller.NewAuthDatabase( | 		authDatabase: controller.NewAuthDatabase( | ||||||
| @ -93,9 +105,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 			config.Share.MultiLogin, | 			config.Share.MultiLogin, | ||||||
| 			config.Share.IMAdminUser.UserIDs, | 			config.Share.IMAdminUser.UserIDs, | ||||||
| 		), | 		), | ||||||
| 		config:       config, | 		AuthLocalCache: rpccache.NewAuthLocalCache(rpcli.NewAuthClient(authConn), &config.LocalCacheConfig, rdb), | ||||||
| 		userClient:   rpcli.NewUserClient(userConn), | 		config:         config, | ||||||
| 		adminUserIDs: config.Share.IMAdminUser.UserIDs, | 		userClient:     rpcli.NewUserClient(userConn), | ||||||
|  | 		adminUserIDs:   config.Share.IMAdminUser.UserIDs, | ||||||
| 	}) | 	}) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @ -121,6 +134,7 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	prommetrics.UserLoginCounter.Inc() | 	prommetrics.UserLoginCounter.Inc() | ||||||
|  | 
 | ||||||
| 	resp.Token = token | 	resp.Token = token | ||||||
| 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | ||||||
| 	return &resp, nil | 	return &resp, nil | ||||||
| @ -151,20 +165,34 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	resp.Token = token | 	resp.Token = token | ||||||
| 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | 	resp.ExpireTimeSeconds = s.config.RpcConfig.TokenPolicy.Expire * 24 * 60 * 60 | ||||||
| 	return &resp, nil | 	return &resp, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (s *authServer) GetExistingToken(ctx context.Context, req *pbauth.GetExistingTokenReq) (*pbauth.GetExistingTokenResp, error) { | ||||||
|  | 	m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return &pbauth.GetExistingTokenResp{ | ||||||
|  | 		TokenStates: convert.TokenMapDB2Pb(m), | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { | func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { | ||||||
| 	claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret)) | 	claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID) | 
 | ||||||
|  | 	m, err := s.AuthLocalCache.GetExistingToken(ctx, claims.UserID, claims.PlatformID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	if len(m) == 0 { | 	if len(m) == 0 { | ||||||
| 		isAdmin := authverify.CheckUserIsAdmin(ctx, claims.UserID) | 		isAdmin := authverify.CheckUserIsAdmin(ctx, claims.UserID) | ||||||
| 		if isAdmin { | 		if isAdmin { | ||||||
|  | |||||||
| @ -40,6 +40,7 @@ func NewAuthRpcCmd() *AuthRpcCmd { | |||||||
| 		config.RedisConfigFileName:      &authConfig.RedisConfig, | 		config.RedisConfigFileName:      &authConfig.RedisConfig, | ||||||
| 		config.MongodbConfigFileName:    &authConfig.MongoConfig, | 		config.MongodbConfigFileName:    &authConfig.MongoConfig, | ||||||
| 		config.ShareFileName:            &authConfig.Share, | 		config.ShareFileName:            &authConfig.Share, | ||||||
|  | 		config.LocalCacheConfigFileName: &authConfig.LocalCacheConfig, | ||||||
| 		config.DiscoveryConfigFilename:  &authConfig.Discovery, | 		config.DiscoveryConfigFilename:  &authConfig.Discovery, | ||||||
| 	} | 	} | ||||||
| 	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) | 	ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) | ||||||
|  | |||||||
| @ -43,6 +43,7 @@ type CacheConfig struct { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type LocalCache struct { | type LocalCache struct { | ||||||
|  | 	Auth         CacheConfig `yaml:"auth"` | ||||||
| 	User         CacheConfig `yaml:"user"` | 	User         CacheConfig `yaml:"user"` | ||||||
| 	Group        CacheConfig `yaml:"group"` | 	Group        CacheConfig `yaml:"group"` | ||||||
| 	Friend       CacheConfig `yaml:"friend"` | 	Friend       CacheConfig `yaml:"friend"` | ||||||
|  | |||||||
							
								
								
									
										25
									
								
								pkg/common/convert/auth.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								pkg/common/convert/auth.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,25 @@ | |||||||
|  | package convert | ||||||
|  | 
 | ||||||
|  | func TokenMapDB2Pb(tokenMapDB map[string]int) map[string]int32 { | ||||||
|  |     if tokenMapDB == nil { | ||||||
|  |         return nil | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     tokenMapPB := make(map[string]int32, len(tokenMapDB)) | ||||||
|  |     for k, v := range tokenMapDB { | ||||||
|  |         tokenMapPB[k] = int32(v) | ||||||
|  |     } | ||||||
|  |     return tokenMapPB | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TokenMapPb2DB(tokenMapPB map[string]int32) map[string]int { | ||||||
|  |     if tokenMapPB == nil { | ||||||
|  |         return nil | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     tokenMapDB := make(map[string]int, len(tokenMapPB)) | ||||||
|  |     for k, v := range tokenMapPB { | ||||||
|  |         tokenMapDB[k] = int(v) | ||||||
|  |     } | ||||||
|  |     return tokenMapDB | ||||||
|  | } | ||||||
							
								
								
									
										79
									
								
								pkg/common/storage/cache/redis/token.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										79
									
								
								pkg/common/storage/cache/redis/token.go
									
									
									
									
										vendored
									
									
								
							| @ -2,13 +2,16 @@ package redis | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/utils/datautil" | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"github.com/redis/go-redis/v9" | 	"github.com/redis/go-redis/v9" | ||||||
| ) | ) | ||||||
| @ -16,16 +19,26 @@ import ( | |||||||
| type tokenCache struct { | type tokenCache struct { | ||||||
| 	rdb          redis.UniversalClient | 	rdb          redis.UniversalClient | ||||||
| 	accessExpire time.Duration | 	accessExpire time.Duration | ||||||
|  | 	localCache   *config.LocalCache | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel { | func NewTokenCacheModel(rdb redis.UniversalClient, localCache *config.LocalCache, accessExpire int64) cache.TokenModel { | ||||||
| 	c := &tokenCache{rdb: rdb} | 	c := &tokenCache{rdb: rdb, localCache: localCache} | ||||||
| 	c.accessExpire = c.getExpireTime(accessExpire) | 	c.accessExpire = c.getExpireTime(accessExpire) | ||||||
| 	return c | 	return c | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { | func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { | ||||||
| 	return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err()) | 	key := cachekey.GetTokenKey(userID, platformID) | ||||||
|  | 	if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil { | ||||||
|  | 		return errs.Wrap(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // SetTokenFlagEx set token and flag with expire time | // SetTokenFlagEx set token and flag with expire time | ||||||
| @ -37,6 +50,11 @@ func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platform | |||||||
| 	if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { | 	if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil { | ||||||
| 		return errs.Wrap(err) | 		return errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -106,7 +124,17 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla | |||||||
| 	for k, v := range m { | 	for k, v := range m { | ||||||
| 		mm[k] = v | 		mm[k] = v | ||||||
| 	} | 	} | ||||||
| 	return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()) | 
 | ||||||
|  | 	err := c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return errs.Wrap(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, cachekey.GetTokenKey(userID, platformID)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { | func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { | ||||||
| @ -124,11 +152,23 @@ func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[st | |||||||
| 	}); err != nil { | 	}); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, keys...) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { | func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { | ||||||
| 	return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) | 	key := cachekey.GetTokenKey(userID, platformID) | ||||||
|  | 	if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { | ||||||
|  | 		return errs.Wrap(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *tokenCache) getExpireTime(t int64) time.Duration { | func (c *tokenCache) getExpireTime(t int64) time.Duration { | ||||||
| @ -161,6 +201,11 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// Remove local cache for the token | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, keys...) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -175,5 +220,29 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p | |||||||
| 	if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { | 	if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { | ||||||
| 		return errs.Wrap(err) | 		return errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if c.localCache != nil { | ||||||
|  | 		c.removeLocalTokenCache(ctx, key) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *tokenCache) removeLocalTokenCache(ctx context.Context, keys ...string) { | ||||||
|  | 	if len(keys) == 0 { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	topic := c.localCache.Auth.Topic | ||||||
|  | 	if topic == "" { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	data, err := json.Marshal(keys) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys) | ||||||
|  | 	} else { | ||||||
|  | 		if err := c.rdb.Publish(ctx, topic, string(data)).Err(); err != nil { | ||||||
|  | 			log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
| @ -194,7 +194,6 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group, | |||||||
| 			} | 			} | ||||||
| 			for _, group := range groups { | 			for _, group := range groups { | ||||||
| 				c = c.DelGroupsInfo(group.GroupID). | 				c = c.DelGroupsInfo(group.GroupID). | ||||||
| 					DelGroupMembersHash(group.GroupID). |  | ||||||
| 					DelGroupMembersHash(group.GroupID). | 					DelGroupMembersHash(group.GroupID). | ||||||
| 					DelGroupsMemberNum(group.GroupID). | 					DelGroupsMemberNum(group.GroupID). | ||||||
| 					DelGroupMemberIDs(group.GroupID). | 					DelGroupMemberIDs(group.GroupID). | ||||||
|  | |||||||
| @ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] { | |||||||
| 	if opt.localSlotNum > 0 && opt.localSlotSize > 0 { | 	if opt.localSlotNum > 0 && opt.localSlotSize > 0 { | ||||||
| 		createSimpleLRU := func() lru.LRU[string, V] { | 		createSimpleLRU := func() lru.LRU[string, V] { | ||||||
| 			if opt.expirationEvict { | 			if opt.expirationEvict { | ||||||
| 				return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) | 				return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) | ||||||
| 			} else { | 			} else { | ||||||
| 				return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) | 				return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		if opt.localSlotNum == 1 { | 		if opt.localSlotNum == 1 { | ||||||
| 			c.local = createSimpleLRU() | 			c.local = createSimpleLRU() | ||||||
| 		} else { | 		} else { | ||||||
| 			c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU) | 			c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU) | ||||||
| 		} | 		} | ||||||
| 		if opt.linkSlotNum > 0 { | 		if opt.linkSlotNum > 0 { | ||||||
| 			c.link = link.New(opt.linkSlotNum) | 			c.link = link.New(opt.linkSlotNum) | ||||||
| @ -71,6 +71,8 @@ type cache[V any] struct { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *cache[V]) onEvict(key string, value V) { | func (c *cache[V]) onEvict(key string, value V) { | ||||||
|  | 	_ = value | ||||||
|  | 
 | ||||||
| 	if c.link != nil { | 	if c.link != nil { | ||||||
| 		lks := c.link.Del(key) | 		lks := c.link.Del(key) | ||||||
| 		for k := range lks { | 		for k := range lks { | ||||||
|  | |||||||
| @ -15,10 +15,11 @@ | |||||||
| package localcache | package localcache | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" |  | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" |  | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ( | var ( | ||||||
| @ -32,6 +33,10 @@ func InitLocalCache(localCache *config.LocalCache) { | |||||||
| 			Local config.CacheConfig | 			Local config.CacheConfig | ||||||
| 			Keys  []string | 			Keys  []string | ||||||
| 		}{ | 		}{ | ||||||
|  | 			{ | ||||||
|  | 				Local: localCache.Auth, | ||||||
|  | 				Keys:  []string{cachekey.UidPidToken}, | ||||||
|  | 			}, | ||||||
| 			{ | 			{ | ||||||
| 				Local: localCache.User, | 				Local: localCache.User, | ||||||
| 				Keys:  []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, | 				Keys:  []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, | ||||||
|  | |||||||
							
								
								
									
										69
									
								
								pkg/rpccache/auth.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								pkg/rpccache/auth.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,69 @@ | |||||||
|  | package rpccache | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/localcache" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||||
|  | 	"github.com/openimsdk/protocol/auth" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
|  | 	"github.com/redis/go-redis/v9" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func NewAuthLocalCache(client *rpcli.AuthClient, localCache *config.LocalCache, cli redis.UniversalClient) *AuthLocalCache { | ||||||
|  | 	lc := localCache.Auth | ||||||
|  | 	log.ZDebug(context.Background(), "AuthLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) | ||||||
|  | 	x := &AuthLocalCache{ | ||||||
|  | 		client: client, | ||||||
|  | 		local: localcache.New[[]byte]( | ||||||
|  | 			localcache.WithLocalSlotNum(lc.SlotNum), | ||||||
|  | 			localcache.WithLocalSlotSize(lc.SlotSize), | ||||||
|  | 			localcache.WithLinkSlotNum(lc.SlotNum), | ||||||
|  | 			localcache.WithLocalSuccessTTL(lc.Success()), | ||||||
|  | 			localcache.WithLocalFailedTTL(lc.Failed()), | ||||||
|  | 		), | ||||||
|  | 	} | ||||||
|  | 	if lc.Enable() { | ||||||
|  | 		go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) | ||||||
|  | 	} | ||||||
|  | 	return x | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type AuthLocalCache struct { | ||||||
|  | 	client *rpcli.AuthClient | ||||||
|  | 	local  localcache.Cache[[]byte] | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *AuthLocalCache) GetExistingToken(ctx context.Context, userID string, platformID int) (val map[string]int, err error) { | ||||||
|  | 	resp, err := a.getExistingToken(ctx, userID, platformID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	res := convert.TokenMapPb2DB(resp.TokenStates) | ||||||
|  | 
 | ||||||
|  | 	return res, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (a *AuthLocalCache) getExistingToken(ctx context.Context, userID string, platformID int) (val *auth.GetExistingTokenResp, err error) { | ||||||
|  | 	start := time.Now() | ||||||
|  | 	log.ZDebug(ctx, "AuthLocalCache GetExistingToken req", "userID", userID, "platformID", platformID) | ||||||
|  | 	defer func() { | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZError(ctx, "AuthLocalCache GetExistingToken error", err, "cost", time.Since(start), "userID", userID, "platformID", platformID) | ||||||
|  | 		} else { | ||||||
|  | 			log.ZDebug(ctx, "AuthLocalCache GetExistingToken resp", "cost", time.Since(start), "userID", userID, "platformID", platformID, "val", val) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	var cache cacheProto[auth.GetExistingTokenResp] | ||||||
|  | 
 | ||||||
|  | 	return cache.Unmarshal(a.local.Get(ctx, cachekey.GetTokenKey(userID, platformID), func(ctx context.Context) ([]byte, error) { | ||||||
|  | 		log.ZDebug(ctx, "AuthLocalCache GetExistingToken call rpc", "userID", userID, "platformID", platformID) | ||||||
|  | 		return cache.Marshal(a.client.AuthClient.GetExistingToken(ctx, &auth.GetExistingTokenReq{UserID: userID, PlatformID: int32(platformID)})) | ||||||
|  | 	})) | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user