Compare commits

...

3 Commits

Author SHA1 Message Date
xuzhijvn
b6e623eb99
Merge 8cd57d68535f83c6495423e777f6a30c88cbe0f1 into d817cb0ffd4ad9c540a36b358c71c081e800920b 2025-08-08 17:13:56 +08:00
chao
d817cb0ffd
fix: admin token in standalone mode (#3499)
* fix: performance issues with Kafka caused by encapsulating the MQ interface

* fix: admin token in standalone mode
2025-08-01 02:48:25 +00:00
xuzhijvn
8cd57d6853 fix: the loop querying the db in func batchGetCache2 (#3301) 2025-04-24 15:32:41 +08:00
3 changed files with 33 additions and 12 deletions

View File

@ -97,6 +97,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
case BestSpeed:
r.Use(gzip.Gzip(gzip.BestSpeed))
}
if config.Standalone() {
r.Use(func(c *gin.Context) {
c.Set(authverify.CtxAdminUserIDsKey, cfg.Share.IMAdminUser.UserIDs)
})
}
r.Use(api.GinLogger(), prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(),
mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn)), setGinIsAdmin(cfg.Share.IMAdminUser.UserIDs))

View File

@ -76,6 +76,27 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCac
if err != nil {
return nil, err
}
values, err := fn(ctx, ids)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "ids", ids)
return nil, err
}
idToValue := make(map[K]*V)
for _, value := range values {
idToValue[vId(value)] = value
}
getSlotValues := func(slotIds []K) []*V {
if len(slotIds) == 0 {
return nil
}
slotValues := make([]*V, 0, len(slotIds))
for _, id := range slotIds {
if value, ok := idToValue[id]; ok {
slotValues = append(slotValues, value)
}
}
return slotValues
}
result := make([]*V, 0, len(findKeys))
for _, keys := range slotKeys {
indexCache, err := rcClient.GetClient().FetchBatch2(ctx, keys, expire, func(idx []int) (map[int]string, error) {
@ -86,16 +107,12 @@ func batchGetCache2[K comparable, V any](ctx context.Context, rcClient *rocksCac
idIndex[id] = index
queryIds = append(queryIds, id)
}
values, err := fn(ctx, queryIds)
if err != nil {
log.ZError(ctx, "batchGetCache query database failed", err, "keys", keys, "queryIds", queryIds)
return nil, err
}
if len(values) == 0 {
slotValues := getSlotValues(queryIds)
if len(slotValues) == 0 {
return map[int]string{}, nil
}
cacheIndex := make(map[int]string)
for _, value := range values {
for _, value := range slotValues {
id := vId(value)
index, ok := idIndex[id]
if !ok {

View File

@ -165,16 +165,15 @@ func (c *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, t
}
func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error {
key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
for _, f := range fields {
k := cachekey.GetTemporaryTokenKey(userID, platformID, f)
if err := c.rdb.Set(ctx, k, "", time.Minute*5).Err(); err != nil {
return errs.Wrap(err)
}
}
key := cachekey.GetTokenKey(userID, platformID)
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
return nil
}