diff --git a/internal/push/push.go b/internal/push/push.go index 8a1882b62..13818e93d 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -7,8 +7,11 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" pbpush "github.com/openimsdk/protocol/push" @@ -28,6 +31,7 @@ type pushServer struct { type Config struct { RpcConfig config.Push RedisConfig config.Redis + MongoConfig config.Mongo KafkaConfig config.Kafka NotificationConfig config.Notification Share config.Share @@ -46,12 +50,25 @@ func (p pushServer) DelUserPushToken(ctx context.Context, } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - dbb := dbbuild.NewBuilder(nil, &config.RedisConfig) + dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { return err } - cacheModel := redis.NewThirdCache(rdb) + var cacheModel cache.ThirdCache + if rdb == nil { + mdb, err := dbb.Mongo(ctx) + if err != nil { + return err + } + mc, err := mgo.NewCacheMgo(mdb.GetDB()) + if err != nil { + return err + } + cacheModel = mcache.NewThirdCache(mc) + } else { + cacheModel = redis.NewThirdCache(rdb) + } offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel, string(config.FcmConfigPath)) if err != nil { return err diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 2bf86cb10..399d0bd4b 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -18,6 +18,9 @@ import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" @@ -51,16 +54,31 @@ type authServer struct { type Config struct { RpcConfig config.Auth RedisConfig config.Redis + MongoConfig config.Mongo Share config.Share Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { - dbb := dbbuild.NewBuilder(nil, &config.RedisConfig) + dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig) rdb, err := dbb.Redis(ctx) if err != nil { return err } + var token cache.TokenModel + if rdb == nil { + mdb, err := dbb.Mongo(ctx) + if err != nil { + return err + } + mc, err := mgo.NewCacheMgo(mdb.GetDB()) + if err != nil { + return err + } + token = mcache.NewTokenCacheModel(mc, config.RpcConfig.TokenPolicy.Expire) + } else { + token = redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire) + } userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return err @@ -68,7 +86,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr pbauth.RegisterAuthServer(server, &authServer{ RegisterCenter: client, authDatabase: controller.NewAuthDatabase( - redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire), + token, config.Share.Secret, config.RpcConfig.TokenPolicy.Expire, config.Share.MultiLogin, diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 8796fe824..97206dd6d 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,11 +19,12 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "path" "strconv" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -37,7 +38,10 @@ import ( ) func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) { - limit := t.s3dataBase.PartLimit() + limit, err := t.s3dataBase.PartLimit() + if err != nil { + return nil, err + } return &third.PartLimitResp{ MinPartSize: limit.MinPartSize, MaxPartSize: limit.MaxPartSize, diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 377e694c2..0afd54014 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -19,8 +19,11 @@ import ( "fmt" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/tools/s3/disable" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -79,15 +82,31 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } - + var thirdCache cache.ThirdCache + if rdb == nil { + tc, err := mgo.NewCacheMgo(mgocli.GetDB()) + if err != nil { + return err + } + thirdCache = mcache.NewThirdCache(tc) + } else { + thirdCache = redis.NewThirdCache(rdb) + } // Select the oss method according to the profile policy - enable := config.RpcConfig.Object.Enable - var ( - o s3.Interface - ) - switch enable { + var o s3.Interface + switch enable := config.RpcConfig.Object.Enable; enable { case "minio": - o, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build()) + var minioCache minio.Cache + if rdb == nil { + mc, err := mgo.NewCacheMgo(mgocli.GetDB()) + if err != nil { + return err + } + minioCache = mcache.NewMinioCache(mc) + } else { + minioCache = redis.NewMinioCache(rdb) + } + o, err = minio.NewMinio(ctx, minioCache, *config.MinioConfig.Build()) case "cos": o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build()) case "oss": @@ -96,6 +115,8 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build()) case "aws": o, err = aws.NewAws(*config.RpcConfig.Object.Aws.Build()) + case "": + o = disable.NewDisable() default: err = fmt.Errorf("invalid object enable: %s", enable) } @@ -108,7 +129,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr } localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ - thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), + thirdDatabase: controller.NewThirdDatabase(thirdCache, logdb), s3dataBase: controller.NewS3Database(rdb, o, s3db), defaultExpire: time.Hour * 24 * 7, config: config, diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index b09d4153f..6e59b6dc6 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -38,6 +38,7 @@ func NewAuthRpcCmd() *AuthRpcCmd { ret.configMap = map[string]any{ config.OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig, config.RedisConfigFileName: &authConfig.RedisConfig, + config.MongodbConfigFileName: &authConfig.MongoConfig, config.ShareFileName: &authConfig.Share, config.DiscoveryConfigFilename: &authConfig.Discovery, } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index f9f5beb70..7cd3c481e 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -38,6 +38,7 @@ func NewPushRpcCmd() *PushRpcCmd { ret.configMap = map[string]any{ config.OpenIMPushCfgFileName: &pushConfig.RpcConfig, config.RedisConfigFileName: &pushConfig.RedisConfig, + config.MongodbConfigFileName: &pushConfig.MongoConfig, config.KafkaConfigFileName: &pushConfig.KafkaConfig, config.ShareFileName: &pushConfig.Share, config.NotificationFileName: &pushConfig.NotificationConfig, diff --git a/pkg/common/redispubsub/doc.go b/pkg/common/redispubsub/doc.go deleted file mode 100644 index 19b2e38f2..000000000 --- a/pkg/common/redispubsub/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redispubsub // import "github.com/openimsdk/open-im-server/v3/pkg/common/redispubsub" diff --git a/pkg/common/redispubsub/redispubliser.go b/pkg/common/redispubsub/redispubliser.go deleted file mode 100644 index 6e41af73a..000000000 --- a/pkg/common/redispubsub/redispubliser.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redispubsub - -import "github.com/redis/go-redis/v9" - -type Publisher struct { - client redis.UniversalClient - channel string -} - -func NewPublisher(client redis.UniversalClient, channel string) *Publisher { - return &Publisher{client: client, channel: channel} -} - -func (p *Publisher) Publish(message string) error { - return p.client.Publish(ctx, p.channel, message).Err() -} diff --git a/pkg/common/redispubsub/redissubscriber.go b/pkg/common/redispubsub/redissubscriber.go deleted file mode 100644 index aea99b318..000000000 --- a/pkg/common/redispubsub/redissubscriber.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redispubsub - -import ( - "context" - - "github.com/redis/go-redis/v9" -) - -var ctx = context.Background() - -type Subscriber struct { - client redis.UniversalClient - channel string -} - -func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber { - return &Subscriber{client: client, channel: channel} -} - -func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error { - messageChannel := s.client.Subscribe(ctx, s.channel).Channel() - - go func() { - for { - select { - case <-ctx.Done(): - return - case msg := <-messageChannel: - callback(msg.Payload) - } - } - }() - - return nil -} diff --git a/pkg/common/storage/cache/mcache/minio.go b/pkg/common/storage/cache/mcache/minio.go new file mode 100644 index 000000000..ecee54aa5 --- /dev/null +++ b/pkg/common/storage/cache/mcache/minio.go @@ -0,0 +1,105 @@ +package mcache + +import ( + "context" + "encoding/json" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/s3/minio" +) + +func NewMinioCache(cache database.Cache) minio.Cache { + return &minioCache{ + cache: cache, + expireTime: time.Hour * 24 * 7, + } +} + +type minioCache struct { + cache database.Cache + expireTime time.Duration +} + +func (g *minioCache) getObjectImageInfoKey(key string) string { + return cachekey.GetObjectImageInfoKey(key) +} + +func (g *minioCache) getMinioImageThumbnailKey(key string, format string, width int, height int) string { + return cachekey.GetMinioImageThumbnailKey(key, format, width, height) +} + +func (g *minioCache) DelObjectImageInfoKey(ctx context.Context, keys ...string) error { + ks := make([]string, 0, len(keys)) + for _, key := range keys { + ks = append(ks, g.getObjectImageInfoKey(key)) + } + return g.cache.Del(ctx, ks) +} + +func (g *minioCache) DelImageThumbnailKey(ctx context.Context, key string, format string, width int, height int) error { + return g.cache.Del(ctx, []string{g.getMinioImageThumbnailKey(key, format, width, height)}) +} + +func (g *minioCache) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*minio.ImageInfo, error)) (*minio.ImageInfo, error) { + return getCache[*minio.ImageInfo](ctx, g.cache, g.getObjectImageInfoKey(key), g.expireTime, fn) +} + +func (g *minioCache) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) { + return getCache[string](ctx, g.cache, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache) +} + +func getCache[V any](ctx context.Context, cache database.Cache, key string, expireTime time.Duration, fn func(ctx context.Context) (V, error)) (V, error) { + getDB := func() (V, bool, error) { + res, err := cache.Get(ctx, []string{key}) + if err != nil { + var val V + return val, false, err + } + var val V + if str, ok := res[key]; ok { + if json.Unmarshal([]byte(str), &val) != nil { + return val, false, err + } + return val, true, nil + } + return val, false, nil + } + dbVal, ok, err := getDB() + if err != nil { + return dbVal, err + } + if ok { + return dbVal, nil + } + lockValue, err := cache.Lock(ctx, key, time.Minute) + if err != nil { + return dbVal, err + } + defer func() { + if err := cache.Unlock(ctx, key, lockValue); err != nil { + log.ZError(ctx, "unlock cache key", err, "key", key, "value", lockValue) + } + }() + dbVal, ok, err = getDB() + if err != nil { + return dbVal, err + } + if ok { + return dbVal, nil + } + val, err := fn(ctx) + if err != nil { + return val, err + } + data, err := json.Marshal(val) + if err != nil { + return val, err + } + if err := cache.Set(ctx, key, string(data), expireTime); err != nil { + return val, err + } + return val, nil +} diff --git a/pkg/common/storage/cache/mcache/online.go b/pkg/common/storage/cache/mcache/online.go new file mode 100644 index 000000000..546ccef7a --- /dev/null +++ b/pkg/common/storage/cache/mcache/online.go @@ -0,0 +1,74 @@ +package mcache + +import ( + "context" + "sync" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" +) + +func NewOnlineCache() cache.OnlineCache { + return &onlineCache{ + user: make(map[string]map[int32]struct{}), + } +} + +type onlineCache struct { + lock sync.RWMutex + user map[string]map[int32]struct{} +} + +func (x *onlineCache) GetOnline(ctx context.Context, userID string) ([]int32, error) { + x.lock.RLock() + defer x.lock.RUnlock() + pSet, ok := x.user[userID] + if !ok { + return nil, nil + } + res := make([]int32, 0, len(pSet)) + for k := range pSet { + res = append(res, k) + } + return res, nil +} + +func (x *onlineCache) SetUserOnline(ctx context.Context, userID string, online, offline []int32) error { + x.lock.Lock() + defer x.lock.Unlock() + pSet, ok := x.user[userID] + if ok { + for _, p := range offline { + delete(pSet, p) + } + } + if len(online) > 0 { + if !ok { + pSet = make(map[int32]struct{}) + x.user[userID] = pSet + } + for _, p := range online { + pSet[p] = struct{}{} + } + } + if len(pSet) == 0 { + delete(x.user, userID) + } + return nil +} + +func (x *onlineCache) GetAllOnlineUsers(ctx context.Context, cursor uint64) (map[string][]int32, uint64, error) { + if cursor != 0 { + return nil, 0, nil + } + x.lock.RLock() + defer x.lock.RUnlock() + res := make(map[string][]int32) + for k, v := range x.user { + pSet := make([]int32, 0, len(v)) + for p := range v { + pSet = append(pSet, p) + } + res[k] = pSet + } + return res, 0, nil +} diff --git a/pkg/common/storage/cache/mcache/third.go b/pkg/common/storage/cache/mcache/third.go new file mode 100644 index 000000000..6918ae784 --- /dev/null +++ b/pkg/common/storage/cache/mcache/third.go @@ -0,0 +1,98 @@ +package mcache + +import ( + "context" + "strconv" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/errs" + "github.com/redis/go-redis/v9" +) + +func NewThirdCache(cache database.Cache) cache.ThirdCache { + return &thirdCache{ + cache: cache, + } +} + +type thirdCache struct { + cache database.Cache +} + +func (c *thirdCache) getGetuiTokenKey() string { + return cachekey.GetGetuiTokenKey() +} + +func (c *thirdCache) getGetuiTaskIDKey() string { + return cachekey.GetGetuiTaskIDKey() +} + +func (c *thirdCache) getUserBadgeUnreadCountSumKey(userID string) string { + return cachekey.GetUserBadgeUnreadCountSumKey(userID) +} + +func (c *thirdCache) getFcmAccountTokenKey(account string, platformID int) string { + return cachekey.GetFcmAccountTokenKey(account, platformID) +} + +func (c *thirdCache) get(ctx context.Context, key string) (string, error) { + res, err := c.cache.Get(ctx, []string{key}) + if err != nil { + return "", err + } + if val, ok := res[key]; ok { + return val, nil + } + return "", errs.Wrap(redis.Nil) +} + +func (c *thirdCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return errs.Wrap(c.cache.Set(ctx, c.getFcmAccountTokenKey(account, platformID), fcmToken, time.Duration(expireTime)*time.Second)) +} + +func (c *thirdCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { + return c.get(ctx, c.getFcmAccountTokenKey(account, platformID)) +} + +func (c *thirdCache) DelFcmToken(ctx context.Context, account string, platformID int) error { + return c.cache.Del(ctx, []string{c.getFcmAccountTokenKey(account, platformID)}) +} + +func (c *thirdCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + return c.cache.Incr(ctx, c.getUserBadgeUnreadCountSumKey(userID), 1) +} + +func (c *thirdCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { + return c.cache.Set(ctx, c.getUserBadgeUnreadCountSumKey(userID), strconv.Itoa(value), 0) +} + +func (c *thirdCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + str, err := c.get(ctx, c.getUserBadgeUnreadCountSumKey(userID)) + if err != nil { + return 0, err + } + val, err := strconv.Atoi(str) + if err != nil { + return 0, errs.WrapMsg(err, "strconv.Atoi", "str", str) + } + return val, nil +} + +func (c *thirdCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { + return c.cache.Set(ctx, c.getGetuiTokenKey(), token, time.Duration(expireTime)*time.Second) +} + +func (c *thirdCache) GetGetuiToken(ctx context.Context) (string, error) { + return c.get(ctx, c.getGetuiTokenKey()) +} + +func (c *thirdCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { + return c.cache.Set(ctx, c.getGetuiTaskIDKey(), taskID, time.Duration(expireTime)*time.Second) +} + +func (c *thirdCache) GetGetuiTaskID(ctx context.Context) (string, error) { + return c.get(ctx, c.getGetuiTaskIDKey()) +} diff --git a/pkg/common/storage/cache/mcache/token.go b/pkg/common/storage/cache/mcache/token.go new file mode 100644 index 000000000..d7ae29cfc --- /dev/null +++ b/pkg/common/storage/cache/mcache/token.go @@ -0,0 +1,130 @@ +package mcache + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" +) + +func NewTokenCacheModel(cache database.Cache, accessExpire int64) cache.TokenModel { + c := &tokenCache{cache: cache} + c.accessExpire = c.getExpireTime(accessExpire) + return c +} + +type tokenCache struct { + cache database.Cache + accessExpire time.Duration +} + +func (x *tokenCache) getTokenKey(userID string, platformID int, token string) string { + return cachekey.GetTokenKey(userID, platformID) + ":" + token + +} + +func (x *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { + return x.cache.Set(ctx, x.getTokenKey(userID, platformID, token), strconv.Itoa(flag), x.accessExpire) +} + +// SetTokenFlagEx set token and flag with expire time +func (x *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error { + return x.SetTokenFlag(ctx, userID, platformID, token, flag) +} + +func (x *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) { + prefix := x.getTokenKey(userID, platformID, "") + m, err := x.cache.Prefix(ctx, prefix) + if err != nil { + return nil, errs.Wrap(err) + } + mm := make(map[string]int) + for k, v := range m { + state, err := strconv.Atoi(v) + if err != nil { + log.ZError(ctx, "token value is not int", err, "value", v, "userID", userID, "platformID", platformID) + continue + } + mm[strings.TrimPrefix(k, prefix)] = state + } + return mm, nil +} + +func (x *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) { + prefix := cachekey.UidPidToken + userID + ":" + tokens, err := x.cache.Prefix(ctx, prefix) + if err != nil { + return nil, err + } + res := make(map[int]map[string]int) + for key, flagStr := range tokens { + flag, err := strconv.Atoi(flagStr) + if err != nil { + log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID) + continue + } + arr := strings.SplitN(strings.TrimPrefix(key, prefix), ":", 2) + if len(arr) != 2 { + log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID) + continue + } + platformID, err := strconv.Atoi(arr[0]) + if err != nil { + log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID) + continue + } + token := arr[1] + if token == "" { + log.ZError(ctx, "token value is not int", err, "key", key, "value", flagStr, "userID", userID) + continue + } + tk, ok := res[platformID] + if !ok { + tk = make(map[string]int) + res[platformID] = tk + } + tk[token] = flag + } + return res, nil +} + +func (x *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { + for token, flag := range m { + err := x.SetTokenFlag(ctx, userID, platformID, token, flag) + if err != nil { + return err + } + } + return nil +} + +func (x *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]any) error { + for prefix, tokenFlag := range tokens { + for token, flag := range tokenFlag { + flagStr := fmt.Sprintf("%v", flag) + if err := x.cache.Set(ctx, prefix+":"+token, flagStr, x.accessExpire); err != nil { + return err + } + } + } + return nil +} + +func (x *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { + keys := make([]string, 0, len(fields)) + for _, token := range fields { + keys = append(keys, x.getTokenKey(userID, platformID, token)) + } + return x.cache.Del(ctx, keys) +} + +func (x *tokenCache) getExpireTime(t int64) time.Duration { + return time.Hour * 24 * time.Duration(t) +} diff --git a/pkg/common/storage/cache/redis/online.go b/pkg/common/storage/cache/redis/online.go index b6c90264e..b2eb60c69 100644 --- a/pkg/common/storage/cache/redis/online.go +++ b/pkg/common/storage/cache/redis/online.go @@ -3,18 +3,23 @@ package redis import ( "context" "fmt" + "strconv" + "strings" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" - "strconv" - "strings" - "time" ) func NewUserOnline(rdb redis.UniversalClient) cache.OnlineCache { + if rdb == nil { + return mcache.NewOnlineCache() + } return &userOnline{ rdb: rdb, expire: cachekey.OnlineExpire, diff --git a/pkg/common/storage/cache/redis/todo.go b/pkg/common/storage/cache/redis/todo.go deleted file mode 100644 index 13438d6d4..000000000 --- a/pkg/common/storage/cache/redis/todo.go +++ /dev/null @@ -1,3 +0,0 @@ -package redis - -// todo: token online third minio diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 2885b985a..7cf0f9e0a 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -60,10 +60,10 @@ func (a *authDatabase) BatchSetTokenMapByUidPid(ctx context.Context, tokens []st setMap := make(map[string]map[string]any) for _, token := range tokens { claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(a.accessSecret)) - key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID) if err != nil { continue } else { + key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID) if v, ok := setMap[key]; ok { v[token] = constant.KickedToken } else { diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 6693d2dde..30d8d20ec 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -30,7 +30,7 @@ import ( ) type S3Database interface { - PartLimit() *s3.PartLimit + PartLimit() (*s3.PartLimit, error) PartSize(ctx context.Context, size int64) (int64, error) AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) @@ -65,7 +65,7 @@ func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { return s.s3.PartSize(ctx, size) } -func (s *s3Database) PartLimit() *s3.PartLimit { +func (s *s3Database) PartLimit() (*s3.PartLimit, error) { return s.s3.PartLimit() } diff --git a/pkg/common/storage/database/cache.go b/pkg/common/storage/database/cache.go new file mode 100644 index 000000000..c57aea86d --- /dev/null +++ b/pkg/common/storage/database/cache.go @@ -0,0 +1,16 @@ +package database + +import ( + "context" + "time" +) + +type Cache interface { + Get(ctx context.Context, key []string) (map[string]string, error) + Prefix(ctx context.Context, prefix string) (map[string]string, error) + Set(ctx context.Context, key string, value string, expireAt time.Duration) error + Incr(ctx context.Context, key string, value int) (int, error) + Del(ctx context.Context, key []string) error + Lock(ctx context.Context, key string, duration time.Duration) (string, error) + Unlock(ctx context.Context, key string, value string) error +} diff --git a/pkg/common/storage/database/mgo/cache.go b/pkg/common/storage/database/mgo/cache.go new file mode 100644 index 000000000..bcf86cd56 --- /dev/null +++ b/pkg/common/storage/database/mgo/cache.go @@ -0,0 +1,183 @@ +package mgo + +import ( + "context" + "strconv" + "time" + + "github.com/google/uuid" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/errs" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewCacheMgo(db *mongo.Database) (*CacheMgo, error) { + coll := db.Collection(database.CacheName) + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "key", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "expire_at", Value: 1}, + }, + Options: options.Index().SetExpireAfterSeconds(0), + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &CacheMgo{coll: coll}, nil +} + +type CacheMgo struct { + coll *mongo.Collection +} + +func (x *CacheMgo) findToMap(res []model.Cache, now time.Time) map[string]string { + kv := make(map[string]string) + for _, re := range res { + if re.ExpireAt != nil && re.ExpireAt.Before(now) { + continue + } + kv[re.Key] = re.Value + } + return kv + +} + +func (x *CacheMgo) Get(ctx context.Context, key []string) (map[string]string, error) { + if len(key) == 0 { + return nil, nil + } + now := time.Now() + res, err := mongoutil.Find[model.Cache](ctx, x.coll, bson.M{ + "key": bson.M{"$in": key}, + "$or": []bson.M{ + {"expire_at": bson.M{"$gt": now}}, + {"expire_at": nil}, + }, + }) + if err != nil { + return nil, err + } + return x.findToMap(res, now), nil +} + +func (x *CacheMgo) Prefix(ctx context.Context, prefix string) (map[string]string, error) { + now := time.Now() + res, err := mongoutil.Find[model.Cache](ctx, x.coll, bson.M{ + "key": bson.M{"$regex": "^" + prefix}, + "$or": []bson.M{ + {"expire_at": bson.M{"$gt": now}}, + {"expire_at": nil}, + }, + }) + if err != nil { + return nil, err + } + return x.findToMap(res, now), nil +} + +func (x *CacheMgo) Set(ctx context.Context, key string, value string, expireAt time.Duration) error { + cv := &model.Cache{ + Key: key, + Value: value, + } + if expireAt > 0 { + now := time.Now().Add(expireAt) + cv.ExpireAt = &now + } + opt := options.Update().SetUpsert(true) + return mongoutil.UpdateOne(ctx, x.coll, bson.M{"key": key}, bson.M{"$set": cv}, false, opt) +} + +func (x *CacheMgo) Incr(ctx context.Context, key string, value int) (int, error) { + pipeline := mongo.Pipeline{ + { + {"$set", bson.M{ + "value": bson.M{ + "$toString": bson.M{ + "$add": bson.A{ + bson.M{"$toInt": "$value"}, + value, + }, + }, + }, + }}, + }, + } + opt := options.FindOneAndUpdate().SetReturnDocument(options.After) + res, err := mongoutil.FindOneAndUpdate[model.Cache](ctx, x.coll, bson.M{"key": key}, pipeline, opt) + if err != nil { + return 0, err + } + return strconv.Atoi(res.Value) +} + +func (x *CacheMgo) Del(ctx context.Context, key []string) error { + if len(key) == 0 { + return nil + } + _, err := x.coll.DeleteMany(ctx, bson.M{"key": bson.M{"$in": key}}) + return err +} + +func (x *CacheMgo) lockKey(key string) string { + return "LOCK_" + key +} + +func (x *CacheMgo) Lock(ctx context.Context, key string, duration time.Duration) (string, error) { + tmp, err := uuid.NewUUID() + if err != nil { + return "", err + } + if duration <= 0 || duration > time.Minute*10 { + duration = time.Minute * 10 + } + cv := &model.Cache{ + Key: x.lockKey(key), + Value: tmp.String(), + ExpireAt: nil, + } + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + wait := func() error { + timeout := time.NewTimer(time.Millisecond * 100) + defer timeout.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeout.C: + return nil + } + } + for { + if err := mongoutil.DeleteOne(ctx, x.coll, bson.M{"key": key, "expire_at": bson.M{"$lt": time.Now()}}); err != nil { + return "", err + } + expireAt := time.Now().Add(duration) + cv.ExpireAt = &expireAt + if err := mongoutil.InsertMany[*model.Cache](ctx, x.coll, []*model.Cache{cv}); err != nil { + if mongo.IsDuplicateKeyError(err) { + if err := wait(); err != nil { + return "", err + } + continue + } + return "", err + } + return cv.Value, nil + } +} + +func (x *CacheMgo) Unlock(ctx context.Context, key string, value string) error { + return mongoutil.DeleteOne(ctx, x.coll, bson.M{"key": x.lockKey(key), "value": value}) +} diff --git a/pkg/common/storage/database/mgo/cache_test.go b/pkg/common/storage/database/mgo/cache_test.go new file mode 100644 index 000000000..ac8b22951 --- /dev/null +++ b/pkg/common/storage/database/mgo/cache_test.go @@ -0,0 +1,133 @@ +package mgo + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func TestName1111(t *testing.T) { + coll := Mongodb().Collection("temp") + + //updatePipeline := mongo.Pipeline{ + // { + // {"$set", bson.M{ + // "age": bson.M{ + // "$toString": bson.M{ + // "$add": bson.A{ + // bson.M{"$toInt": "$age"}, + // 1, + // }, + // }, + // }, + // }}, + // }, + //} + + pipeline := mongo.Pipeline{ + { + {"$set", bson.M{ + "value": bson.M{ + "$toString": bson.M{ + "$add": bson.A{ + bson.M{"$toInt": "$value"}, + 1, + }, + }, + }, + }}, + }, + } + + opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After) + res, err := mongoutil.FindOneAndUpdate[model.Cache](context.Background(), coll, bson.M{"key": "123456"}, pipeline, opt) + if err != nil { + panic(err) + } + t.Log(res) +} + +func TestName33333(t *testing.T) { + c, err := NewCacheMgo(Mongodb()) + if err != nil { + panic(err) + } + if err := c.Set(context.Background(), "123456", "123456", time.Hour); err != nil { + panic(err) + } + + if err := c.Set(context.Background(), "123666", "123666", time.Hour); err != nil { + panic(err) + } + + res1, err := c.Get(context.Background(), []string{"123456"}) + if err != nil { + panic(err) + } + t.Log(res1) + + res2, err := c.Prefix(context.Background(), "123") + if err != nil { + panic(err) + } + t.Log(res2) +} + +func TestName1111aa(t *testing.T) { + + c, err := NewCacheMgo(Mongodb()) + if err != nil { + panic(err) + } + var count int + + key := "123456" + + doFunc := func() { + value, err := c.Lock(context.Background(), key, time.Second*30) + if err != nil { + t.Log("Lock error", err) + return + } + tmp := count + tmp++ + count = tmp + t.Log("count", tmp) + if err := c.Unlock(context.Background(), key, value); err != nil { + t.Log("Unlock error", err) + return + } + } + + if _, err := c.Lock(context.Background(), key, time.Second*10); err != nil { + t.Log(err) + return + } + + var wg sync.WaitGroup + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + doFunc() + } + }() + } + + wg.Wait() + +} + +func TestName111111a(t *testing.T) { + arr := strings.SplitN("1:testkakskdask:1111", ":", 2) + t.Log(arr) +} diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go index 8e85e302d..2ced0210a 100644 --- a/pkg/common/storage/database/mgo/msg_test.go +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -132,19 +132,19 @@ func TestName5(t *testing.T) { t.Log(res) } -func TestName6(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) - defer cancel() - cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - - tmp, err := NewMsgMongo(cli.Database("openim_v3")) - if err != nil { - panic(err) - } - msg := tmp.(*MsgMgo) - seq, sendTime, err := msg.findBeforeSendTime(ctx, "si_4924054191_9511766539", 1144) - if err != nil { - panic(err) - } - t.Log(seq, sendTime) -} +//func TestName6(t *testing.T) { +// ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) +// defer cancel() +// cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) +// +// tmp, err := NewMsgMongo(cli.Database("openim_v3")) +// if err != nil { +// panic(err) +// } +// msg := tmp.(*MsgMgo) +// seq, sendTime, err := msg.findBeforeSendTime(ctx, "si_4924054191_9511766539", 1144) +// if err != nil { +// panic(err) +// } +// t.Log(seq, sendTime) +//} diff --git a/pkg/common/storage/database/mgo/seq_conversation_test.go b/pkg/common/storage/database/mgo/seq_conversation_test.go index 42507a693..dd30286e1 100644 --- a/pkg/common/storage/database/mgo/seq_conversation_test.go +++ b/pkg/common/storage/database/mgo/seq_conversation_test.go @@ -2,10 +2,11 @@ package mgo import ( "context" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "testing" "time" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) func Result[V any](val V, err error) V { @@ -19,7 +20,7 @@ func Mongodb() *mongo.Database { return Result( mongo.Connect(context.Background(), options.Client(). - ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100"). + ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100"). SetConnectTimeout(5*time.Second)), ).Database("openim_v3") } diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 9742f933f..4d74c7ddc 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -18,4 +18,5 @@ const ( SeqConversationName = "seq" SeqUserName = "seq_user" StreamMsgName = "stream_msg" + CacheName = "cache" ) diff --git a/pkg/common/storage/model/cache.go b/pkg/common/storage/model/cache.go new file mode 100644 index 000000000..4bbc55e65 --- /dev/null +++ b/pkg/common/storage/model/cache.go @@ -0,0 +1,9 @@ +package model + +import "time" + +type Cache struct { + Key string `bson:"key"` + Value string `bson:"value"` + ExpireAt *time.Time `bson:"expire_at"` +}