connID to md5

This commit is contained in:
wangchuxiao 2023-03-13 17:57:52 +08:00
parent 217765be6e
commit bd4c6b1a97
6 changed files with 69 additions and 59 deletions

View File

@ -7,6 +7,7 @@ import (
"OpenIM/pkg/common/log" "OpenIM/pkg/common/log"
"OpenIM/pkg/common/mw" "OpenIM/pkg/common/mw"
"context" "context"
"errors"
"fmt" "fmt"
"github.com/OpenIMSDK/openKeeper" "github.com/OpenIMSDK/openKeeper"
"net" "net"
@ -43,7 +44,9 @@ func run(port int) error {
} }
fmt.Println("start api server, address: ", address, ", OpenIM version: ", config.Version) fmt.Println("start api server, address: ", address, ", OpenIM version: ", config.Version)
log.ZInfo(context.Background(), "start server success", "address", address, "version", config.Version) log.ZInfo(context.Background(), "start server success", "address", address, "version", config.Version)
log.Info("s", "start server") log.ZDebug(context.Background(), "start server success", "address", address, "version", config.Version)
log.ZError(context.Background(), "start server success", errors.New("ss"), "address", address)
log.ZWarn(context.Background(), "start server success", errors.New("ss"), "address", address)
err = router.Run(address) err = router.Run(address)
if err != nil { if err != nil {
log.Error("", "api run failed ", address, err.Error()) log.Error("", "api run failed ", address, err.Error())

View File

@ -13,7 +13,6 @@ import (
pbConversation "OpenIM/pkg/proto/conversation" pbConversation "OpenIM/pkg/proto/conversation"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"github.com/dtm-labs/rockscache"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -35,12 +34,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil { if err != nil {
return err return err
} }
opts := rockscache.NewDefaultOptions()
opts.RandomExpireAdjustment = 0.2
opts.StrongConsistency = true
pbConversation.RegisterConversationServer(server, &conversationServer{ pbConversation.RegisterConversationServer(server, &conversationServer{
groupChecker: check.NewGroupChecker(client), groupChecker: check.NewGroupChecker(client),
ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, opts), tx.NewGorm(db)), ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt()), tx.NewGorm(db)),
}) })
return nil return nil
} }

View File

@ -3,6 +3,7 @@ package cache
import ( import (
"OpenIM/pkg/common/config" "OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog" "OpenIM/pkg/common/tracelog"
pbMsg "OpenIM/pkg/proto/msg" pbMsg "OpenIM/pkg/proto/msg"
"OpenIM/pkg/proto/sdkws" "OpenIM/pkg/proto/sdkws"
@ -93,6 +94,38 @@ type cache struct {
rdb redis.UniversalClient rdb redis.UniversalClient
} }
// 兼容老版本调用
func (c *cache) DelKeys() {
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:",
"GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} {
fName := utils.GetSelfFuncName()
var cursor uint64
var n int
for {
var keys []string
var err error
keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
if err != nil {
panic(err.Error())
}
n += len(keys)
// for each for redis cluster
for _, key := range keys {
if err = c.rdb.Del(context.Background(), key).Err(); err != nil {
log.NewError("", fName, key, err.Error())
err = c.rdb.Del(context.Background(), key).Err()
if err != nil {
panic(err.Error())
}
}
}
if cursor == 0 {
break
}
}
}
}
func (c *cache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { func (c *cache) IncrUserSeq(ctx context.Context, userID string) (int64, error) {
return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64())
} }

View File

@ -1,6 +1,7 @@
package cache package cache
import ( import (
"OpenIM/pkg/common/log"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"encoding/json" "encoding/json"
@ -10,36 +11,13 @@ import (
const scanCount = 3000 const scanCount = 3000
//func (rc *RcClient) DelKeys() {
// for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, func GetDefaultOpt() rockscache.Options {
// groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} { opts := rockscache.NewDefaultOptions()
// fName := utils.GetSelfFuncName() opts.StrongConsistency = true
// var cursor uint64 opts.RandomExpireAdjustment = 0.2
// var n int return opts
// for { }
// var keys []string
// var err error
// keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
// if err != nil {
// panic(err.Error())
// }
// n += len(keys)
// // for each for redis cluster
// for _, key := range keys {
// if err = rc.rdb.Del(context.Background(), key).Err(); err != nil {
// log.NewError("", fName, key, err.Error())
// err = rc.rdb.Del(context.Background(), key).Err()
// if err != nil {
// panic(err.Error())
// }
// }
// }
// if cursor == 0 {
// break
// }
// }
// }
//}
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
var t T var t T

View File

@ -107,18 +107,6 @@ func InfoKv(ctx context.Context, msg string, keysAndValues ...interface{}) {
}).Infoln(keysAndValues) }).Infoln(keysAndValues)
} }
func DebugKv(ctx context.Context, msg string, keysAndValues ...interface{}) {
}
func ErrorKv(ctx context.Context, msg string, err error, keysAndValues ...interface{}) {
}
func WarnKv(ctx context.Context, msg string, err error, keysAndValues ...interface{}) {
}
func Info(OperationID string, args ...interface{}) { func Info(OperationID string, args ...interface{}) {
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"OperationID": OperationID, "OperationID": OperationID,

View File

@ -63,7 +63,14 @@ func NewZapLogger() (*ZapLogger, error) {
if config.Config.Log.Stderr { if config.Config.Log.Stderr {
zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stderr") zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stderr")
} }
l, err := zapConfig.Build(zl.cores()) zapConfig.EncoderConfig.EncodeTime = zl.timeEncoder
zapConfig.EncoderConfig.EncodeDuration = zapcore.SecondsDurationEncoder
zapConfig.EncoderConfig.EncodeLevel = zapcore.LowercaseColorLevelEncoder
opts, err := zl.cores()
if err != nil {
return nil, err
}
l, err := zapConfig.Build(opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -71,13 +78,16 @@ func NewZapLogger() (*ZapLogger, error) {
return zl, nil return zl, nil
} }
func timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { func (l *ZapLogger) timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05")) enc.AppendString(t.Format("2006-01-02 15:04:05"))
} }
func (l *ZapLogger) cores() zap.Option { func (l *ZapLogger) cores() (zap.Option, error) {
fileEncoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) fileEncoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())
writer := l.getWriter() writer, err := l.getWriter()
if err != nil {
return nil, err
}
var cores []zapcore.Core var cores []zapcore.Core
if config.Config.Log.StorageLocation != "" { if config.Config.Log.StorageLocation != "" {
cores = []zapcore.Core{ cores = []zapcore.Core{
@ -86,16 +96,18 @@ func (l *ZapLogger) cores() zap.Option {
} }
return zap.WrapCore(func(c zapcore.Core) zapcore.Core { return zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewTee(cores...) return zapcore.NewTee(cores...)
}) }), nil
} }
func (l *ZapLogger) getWriter() zapcore.WriteSyncer { func (l *ZapLogger) getWriter() (zapcore.WriteSyncer, error) {
logf, _ := rotatelogs.New(config.Config.Log.StorageLocation+sp+"openIM"+"-"+".%Y_%m%d_%H", logf, err := rotatelogs.New(config.Config.Log.StorageLocation+sp+"OpenIM.log.all"+".%Y-%m-%d",
rotatelogs.WithLinkName(config.Config.Log.StorageLocation+sp+"openIM"+"-"), rotatelogs.WithRotationCount(config.Config.Log.RemainRotationCount),
rotatelogs.WithMaxAge(2*24*time.Hour), rotatelogs.WithRotationTime(time.Duration(config.Config.Log.RotationTime)*time.Hour),
rotatelogs.WithRotationTime(time.Minute),
) )
return zapcore.AddSync(logf) if err != nil {
return nil, err
}
return zapcore.AddSync(logf), nil
} }
func (l *ZapLogger) ToZap() *zap.SugaredLogger { func (l *ZapLogger) ToZap() *zap.SugaredLogger {