Merge remote-tracking branch 'origin/errcode' into errcode

# Conflicts:
#	cmd/api/main.go
#	internal/api/a2r/api2rpc.go
#	internal/apiresp/resp.go
#	internal/msgtransfer/online_history_msg_handler.go
#	internal/push/consumer_init.go
#	pkg/common/db/controller/conversation.go
#	pkg/common/mw/gin.go
#	pkg/statistics/statistics.go
This commit is contained in:
withchao 2023-03-16 10:51:00 +08:00
commit 1680b96b65
28 changed files with 306 additions and 151 deletions

View File

@ -27,12 +27,16 @@ func run(port int) error {
if port == 0 {
port = config.Config.Api.GinPort[0]
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
zk, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, config.Config.Zookeeper.UserName, config.Config.Zookeeper.Password)
if err != nil {
return err
}
log.NewPrivateLog(constant.LogFileName)
router := api.NewGinRouter(zk)
router := api.NewGinRouter(zk, rdb)
var address string
if config.Config.Api.ListenIP != "" {
address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port))

View File

@ -160,7 +160,7 @@ log:
rotationTime: 24
remainRotationCount: 2 #日志数量
#日志级别 6表示全都打印测试阶段建议设置为6
remainLogLevel: 6
remainLogLevel: -1
stderr: true
elasticSearchSwitch: false

View File

@ -15,11 +15,13 @@ func Call[A, B, C any](
) {
var req A
if err := c.BindJSON(&req); err != nil {
log.ZWarn(c, "gin bind json error", err, "req", req)
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数错误
return
}
if check, ok := any(&req).(interface{ Check() error }); ok {
if err := check.Check(); err != nil {
log.ZWarn(c, "custom check error", err, "req", req)
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败
return
}

View File

@ -7,17 +7,17 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v8"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"os"
)
func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine {
zk.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) // 默认RPC中间件
gin.SetMode(gin.ReleaseMode)
f, _ := os.Create("../logs/api.log")
gin.DefaultWriter = io.MultiWriter(f)
// gin.SetMode(gin.DebugMode)
//f, _ := os.Create("../logs/api.log")
//gin.DefaultWriter = io.MultiWriter(f)
//gin.SetMode(gin.DebugMode)
r := gin.New()
log.Info("load config: ", config.Config)
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
@ -28,17 +28,18 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
r.Use(prome.PrometheusMiddleware)
r.GET("/metrics", prome.PrometheusHandler())
}
zk.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) // 默认RPC中间件
userRouterGroup := r.Group("/user")
{
u := NewUser(zk)
userRouterGroup.POST("/user_register", u.UserRegister)
userRouterGroup.POST("/update_user_info", u.UpdateUserInfo) //1
userRouterGroup.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt)
userRouterGroup.POST("/get_users_info", u.GetUsersPublicInfo) //1
userRouterGroup.POST("/get_all_users_uid", u.GetAllUsersID) // todo
userRouterGroup.POST("/account_check", u.AccountCheck) // todo
userRouterGroup.POST("/get_users", u.GetUsers)
userRouterGroupChild1 := mw.NewRouterGroup(userRouterGroup, "",)
userRouterGroupChild2 := mw.NewRouterGroup(userRouterGroup, "", mw.WithGinParseToken(rdb))
userRouterGroupChild1.POST("/user_register", u.UserRegister)
userRouterGroupChild2.POST("/update_user_info", u.UpdateUserInfo) //1
userRouterGroupChild2.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt)
userRouterGroupChild2.POST("/get_users_info", u.GetUsersPublicInfo) //1
userRouterGroupChild2.POST("/get_all_users_uid", u.GetAllUsersID) // todo
userRouterGroupChild2.POST("/account_check", u.AccountCheck) // todo
userRouterGroupChild2.POST("/get_users", u.GetUsers)
}
////friend routing group
friendRouterGroup := r.Group("/friend")
@ -56,7 +57,6 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
friendRouterGroup.POST("/remove_black", f.RemoveBlack) //1
friendRouterGroup.POST("/import_friend", f.ImportFriends) //1
friendRouterGroup.POST("/is_friend", f.IsFriend) //1
}
groupRouterGroup := r.Group("/group")
g := NewGroup(zk)
@ -94,10 +94,12 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
{
a := NewAuth(zk)
u := NewUser(zk)
authRouterGroup.POST("/user_register", u.UserRegister) //1
authRouterGroup.POST("/user_token", a.UserToken) //1
authRouterGroup.POST("/parse_token", a.ParseToken) //1
authRouterGroup.POST("/force_logout", a.ForceLogout) //1
authRouterGroupChild1 := mw.NewRouterGroup(authRouterGroup, "",)
authRouterGroupChild2 := mw.NewRouterGroup(authRouterGroup, "", mw.WithGinParseToken(rdb))
authRouterGroupChild1.POST("/user_register", u.UserRegister) //1
authRouterGroupChild1.POST("/user_token", a.UserToken) //1
authRouterGroupChild2.POST("/parse_token", a.ParseToken) //1
authRouterGroupChild2.POST("/force_logout", a.ForceLogout) //1
}
////Third service
thirdGroup := r.Group("/third")
@ -113,7 +115,6 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
thirdGroup.POST("/confirm_put", t.ConfirmPut)
thirdGroup.GET("/get_url", t.GetURL)
thirdGroup.GET("/object", t.GetURL)
}
////Message
chatGroup := r.Group("/msg")
@ -137,7 +138,7 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
}
////Conversation
conversationGroup := r.Group("/conversation")
{ //1
{
c := NewConversation(zk)
conversationGroup.POST("/get_all_conversations", c.GetAllConversations)
conversationGroup.POST("/get_conversation", c.GetConversation)
@ -149,3 +150,4 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
}
return r
}

View File

@ -53,8 +53,7 @@ func (ws *WsServer) UnRegister(c *Client) {
}
func (ws *WsServer) Validate(s interface{}) error {
//TODO implement me
panic("implement me")
return nil
}
func (ws *WsServer) GetUserAllCons(userID string) ([]*Client, bool) {

View File

@ -71,7 +71,7 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
//statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och
}

View File

@ -183,7 +183,7 @@ type config struct {
StorageLocation string `yaml:"storageLocation"`
RotationTime int `yaml:"rotationTime"`
RemainRotationCount uint `yaml:"remainRotationCount"`
RemainLogLevel uint `yaml:"remainLogLevel"`
RemainLogLevel int `yaml:"remainLogLevel"`
Stderr bool `yaml:"stderr"`
ElasticSearchSwitch bool `yaml:"elasticSearchSwitch"`
ElasticSearchAddr []string `yaml:"elasticSearchAddr"`

View File

@ -273,6 +273,8 @@ const (
const OperationID = "operationID"
const OpUserID = "opUserID"
const ConnID = "connID"
const OpUserIDPlatformID = "platformID"
const Token = "token"
const (
UnreliableNotification = 1

View File

@ -32,7 +32,7 @@ type ConversationDatabase interface {
SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
}
func NewConversationDatabase(conversation relation.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
return &ConversationDataBase{
conversationDB: conversation,
cache: cache,
@ -41,7 +41,7 @@ func NewConversationDatabase(conversation relation.Conversation, cache cache.Con
}
type ConversationDataBase struct {
conversationDB relation.Conversation
conversationDB relationTb.ConversationModelInterface
cache cache.ConversationCache
tx tx.Tx
}

View File

@ -46,9 +46,8 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel
}
miss := utils.SliceAnySub(users, result, func(e *relation.UserModel) string { return e.UserID })
if len(miss) > 0 {
u.userDB.Create(ctx, miss)
_ = u.userDB.Create(ctx, miss)
}
return nil
}

View File

@ -8,11 +8,11 @@ import (
)
type BlackGorm struct {
DB *gorm.DB
*MetaDB
}
func NewBlackGorm(db *gorm.DB) relation.BlackModelInterface {
return &BlackGorm{db}
return &BlackGorm{NewMetaDB(db, &relation.BlackModel{})}
}
func (b *BlackGorm) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) {

View File

@ -14,11 +14,11 @@ import (
)
type ChatLogGorm struct {
DB *gorm.DB
*MetaDB
}
func NewChatLogGorm(db *gorm.DB) relation.ChatLogModelInterface {
return &ChatLogGorm{DB: db}
return &ChatLogGorm{NewMetaDB(db, &relation.ChatLogModel{})}
}
func (c *ChatLogGorm) Create(msg pbMsg.MsgDataToMQ) error {

View File

@ -8,29 +8,16 @@ import (
"gorm.io/gorm"
)
type Conversation interface {
Create(ctx context.Context, conversations []*relation.ConversationModel) (err error)
Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error)
Update(ctx context.Context, conversations []*relation.ConversationModel) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error)
FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error)
FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error)
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
NewTx(tx any) Conversation
}
type ConversationGorm struct {
DB *gorm.DB
*MetaDB
}
func NewConversationGorm(DB *gorm.DB) Conversation {
return &ConversationGorm{DB: DB}
func NewConversationGorm(db *gorm.DB) relation.ConversationModelInterface {
return &ConversationGorm{NewMetaDB(db, &relation.ConversationModel{})}
}
func (c *ConversationGorm) NewTx(tx any) Conversation {
return &ConversationGorm{DB: tx.(*gorm.DB)}
func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface {
return &ConversationGorm{NewMetaDB(tx.(*gorm.DB), &relation.ConversationModel{})}
}
func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) {

View File

@ -8,46 +8,46 @@ import (
)
type FriendGorm struct {
DB *gorm.DB
*MetaDB
}
func NewFriendGorm(db *gorm.DB) relation.FriendModelInterface {
return &FriendGorm{DB: db}
return &FriendGorm{NewMetaDB(db, &relation.FriendModel{})}
}
func (f *FriendGorm) NewTx(tx any) relation.FriendModelInterface {
return &FriendGorm{DB: tx.(*gorm.DB)}
return &FriendGorm{NewMetaDB(tx.(*gorm.DB), &relation.FriendModel{})}
}
// 插入多条记录
func (f *FriendGorm) Create(ctx context.Context, friends []*relation.FriendModel) (err error) {
return utils.Wrap(f.DB.Create(&friends).Error, "")
return utils.Wrap(f.db(ctx).Create(&friends).Error, "")
}
// 删除ownerUserID指定的好友
func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) {
err = utils.Wrap(f.DB.Where("owner_user_id = ? AND friend_user_id in ( ?)", ownerUserID, friendUserIDs).Delete(&relation.FriendModel{}).Error, "")
err = utils.Wrap(f.db(ctx).Where("owner_user_id = ? AND friend_user_id in ( ?)", ownerUserID, friendUserIDs).Delete(&relation.FriendModel{}).Error, "")
return err
}
// 更新ownerUserID单个好友信息 更新零值
func (f *FriendGorm) UpdateByMap(ctx context.Context, ownerUserID string, friendUserID string, args map[string]interface{}) (err error) {
return utils.Wrap(f.DB.Model(&relation.FriendModel{}).Where("owner_user_id = ? AND friend_user_id = ? ", ownerUserID, friendUserID).Updates(args).Error, "")
return utils.Wrap(f.db(ctx).Where("owner_user_id = ? AND friend_user_id = ? ", ownerUserID, friendUserID).Updates(args).Error, "")
}
// 更新好友信息的非零值
func (f *FriendGorm) Update(ctx context.Context, friends []*relation.FriendModel) (err error) {
return utils.Wrap(f.DB.Updates(&friends).Error, "")
return utils.Wrap(f.db(ctx).Updates(&friends).Error, "")
}
// 更新好友备注(也支持零值
func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) {
if remark != "" {
return utils.Wrap(f.DB.Model(&relation.FriendModel{}).Where("owner_user_id = ? and friend_user_id = ?", ownerUserID, friendUserID).Update("remark", remark).Error, "")
return utils.Wrap(f.db(ctx).Where("owner_user_id = ? and friend_user_id = ?", ownerUserID, friendUserID).Update("remark", remark).Error, "")
}
m := make(map[string]interface{}, 1)
m["remark"] = ""
return utils.Wrap(f.DB.Model(&relation.FriendModel{}).Where("owner_user_id = ?", ownerUserID).Updates(m).Error, "")
return utils.Wrap(f.db(ctx).Where("owner_user_id = ?", ownerUserID).Updates(m).Error, "")
}

View File

@ -7,16 +7,16 @@ import (
"gorm.io/gorm"
)
func NewFriendRequestGorm(db *gorm.DB) relation.FriendRequestModelInterface {
return &FriendRequestGorm{db}
type FriendRequestGorm struct {
*MetaDB
}
type FriendRequestGorm struct {
DB *gorm.DB
func NewFriendRequestGorm(db *gorm.DB) relation.FriendRequestModelInterface {
return &FriendRequestGorm{NewMetaDB(db, &relation.FriendModel{})}
}
func (f *FriendRequestGorm) NewTx(tx any) relation.FriendRequestModelInterface {
return &FriendRequestGorm{DB: tx.(*gorm.DB)}
return &FriendRequestGorm{NewMetaDB(tx.(*gorm.DB), &relation.FriendModel{})}
}
// 插入多条记录

View File

@ -11,15 +11,15 @@ import (
var _ relation.GroupMemberModelInterface = (*GroupMemberGorm)(nil)
type GroupMemberGorm struct {
DB *gorm.DB
*MetaDB
}
func NewGroupMemberDB(db *gorm.DB) relation.GroupMemberModelInterface {
return &GroupMemberGorm{DB: db}
return &GroupMemberGorm{NewMetaDB(db, &relation.GroupMemberModel{})}
}
func (g *GroupMemberGorm) NewTx(tx any) relation.GroupMemberModelInterface {
return &GroupMemberGorm{DB: tx.(*gorm.DB)}
return &GroupMemberGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupMemberModel{})}
}
func (g *GroupMemberGorm) Create(ctx context.Context, groupMemberList []*relation.GroupMemberModel) (err error) {

View File

@ -10,15 +10,15 @@ import (
var _ relation.GroupModelInterface = (*GroupGorm)(nil)
type GroupGorm struct {
DB *gorm.DB
*MetaDB
}
func NewGroupDB(db *gorm.DB) relation.GroupModelInterface {
return &GroupGorm{DB: db}
return &GroupGorm{NewMetaDB(db, &relation.GroupModel{})}
}
func (g *GroupGorm) NewTx(tx any) relation.GroupModelInterface {
return &GroupGorm{DB: tx.(*gorm.DB)}
return &GroupGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupModel{})}
}
func (g *GroupGorm) Create(ctx context.Context, groups []*relation.GroupModel) (err error) {

View File

@ -8,21 +8,19 @@ import (
)
type GroupRequestGorm struct {
DB *gorm.DB
}
func (g *GroupRequestGorm) NewTx(tx any) relation.GroupRequestModelInterface {
return &GroupRequestGorm{
DB: tx.(*gorm.DB),
}
*MetaDB
}
func NewGroupRequest(db *gorm.DB) relation.GroupRequestModelInterface {
return &GroupRequestGorm{
DB: db,
NewMetaDB(db, &relation.GroupRequestModel{}),
}
}
func (g *GroupRequestGorm) NewTx(tx any) relation.GroupRequestModelInterface {
return &GroupRequestGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupRequestModel{})}
}
func (g *GroupRequestGorm) Create(ctx context.Context, groupRequests []*relation.GroupRequestModel) (err error) {
return utils.Wrap(g.DB.Create(&groupRequests).Error, utils.GetSelfFuncName())
}

View File

@ -0,0 +1,22 @@
package relation
import (
"context"
"gorm.io/gorm"
)
type MetaDB struct {
DB *gorm.DB
table interface{}
}
func NewMetaDB(db *gorm.DB, table any) *MetaDB {
return &MetaDB{
DB: db,
table: table,
}
}
func (g *MetaDB) db(ctx context.Context) *gorm.DB {
return g.DB.WithContext(ctx).Model(g.table)
}

View File

@ -2,6 +2,7 @@ package relation
import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
@ -9,7 +10,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
mysqlDriver "github.com/go-sql-driver/mysql"
"gorm.io/driver/mysql"
"strings"
gormUtils "gorm.io/gorm/utils"
"time"
"gorm.io/gorm"
@ -39,16 +40,9 @@ func newMysqlGormDB() (*gorm.DB, error) {
}
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName)
newLogger := logger.New(
Writer{},
logger.Config{
SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold
LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
},
)
sqlLogger := NewSqlLogger(logger.LogLevel(config.Config.Mysql.LogLevel), true, time.Duration(config.Config.Mysql.SlowThreshold)*time.Millisecond)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
Logger: sqlLogger,
})
if err != nil {
return nil, err
@ -84,14 +78,65 @@ func IsMysqlDuplicateKey(err error) bool {
return false
}
type Writer struct{}
type SqlLogger struct {
LogLevel logger.LogLevel
IgnoreRecordNotFoundError bool
SlowThreshold time.Duration
}
func (w Writer) Printf(format string, args ...interface{}) {
s := fmt.Sprintf(format, args...)
l := strings.Split(s, "\n")
if len(l) == 2 {
log.ZDebug(context.Background(), "sql exec detail", "gorm", l[0], "sql", l[1])
} else {
log.ZDebug(context.Background(), "sql exec detail", "sql", s)
func NewSqlLogger(logLevel logger.LogLevel, ignoreRecordNotFoundError bool, slowThreshold time.Duration) *SqlLogger {
return &SqlLogger{
LogLevel: logLevel,
IgnoreRecordNotFoundError: ignoreRecordNotFoundError,
SlowThreshold: slowThreshold,
}
}
func (l *SqlLogger) LogMode(logLevel logger.LogLevel) logger.Interface {
newLogger := *l
newLogger.LogLevel = logLevel
return &newLogger
}
func (SqlLogger) Info(ctx context.Context, msg string, args ...interface{}) {
log.ZInfo(ctx, msg, args)
}
func (SqlLogger) Warn(ctx context.Context, msg string, args ...interface{}) {
log.ZWarn(ctx, msg, nil, args)
}
func (SqlLogger) Error(ctx context.Context, msg string, args ...interface{}) {
log.ZError(ctx, msg, nil, args)
}
func (l SqlLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
if l.LogLevel <= logger.Silent {
return
}
elapsed := time.Since(begin)
switch {
case err != nil && l.LogLevel >= logger.Error && (!errors.Is(err, gorm.ErrRecordNotFound) || !l.IgnoreRecordNotFoundError):
sql, rows := fc()
if rows == -1 {
log.ZError(ctx, "sql exec detail", err, "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "sql", sql)
} else {
log.ZError(ctx, "sql exec detail", err, "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "rows", rows, "sql", sql)
}
case elapsed > l.SlowThreshold && l.SlowThreshold != 0 && l.LogLevel >= logger.Warn:
sql, rows := fc()
slowLog := fmt.Sprintf("SLOW SQL >= %v", l.SlowThreshold)
if rows == -1 {
log.ZWarn(ctx, "sql exec detail", nil, "gorm", gormUtils.FileWithLineNum(), nil, "slow sql", slowLog, "time(ms)", float64(elapsed.Nanoseconds())/1e6, "sql", sql)
} else {
log.ZWarn(ctx, "sql exec detail", nil, "gorm", gormUtils.FileWithLineNum(), nil, "slow sql", slowLog, "time(ms)", float64(elapsed.Nanoseconds())/1e6, "rows", rows, "sql", sql)
}
case l.LogLevel == logger.Info:
sql, rows := fc()
if rows == -1 {
log.ZDebug(ctx, "sql exec detail", "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "sql", sql)
} else {
log.ZDebug(ctx, "sql exec detail", "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "rows", rows, "sql", sql)
}
}
}

View File

@ -7,19 +7,19 @@ import (
"gorm.io/gorm"
)
func NewObjectHash(db *gorm.DB) relation.ObjectHashModelInterface {
return &ObjectHashGorm{
DB: db,
}
type ObjectHashGorm struct {
*MetaDB
}
type ObjectHashGorm struct {
DB *gorm.DB
func NewObjectHash(db *gorm.DB) relation.ObjectHashModelInterface {
return &ObjectHashGorm{
NewMetaDB(db, &relation.ObjectHashModel{}),
}
}
func (o *ObjectHashGorm) NewTx(tx any) relation.ObjectHashModelInterface {
return &ObjectHashGorm{
DB: tx.(*gorm.DB),
NewMetaDB(tx.(*gorm.DB), &relation.ObjectHashModel{}),
}
}

View File

@ -8,19 +8,19 @@ import (
"time"
)
func NewObjectInfo(db *gorm.DB) relation.ObjectInfoModelInterface {
return &ObjectInfoGorm{
DB: db,
}
type ObjectInfoGorm struct {
*MetaDB
}
type ObjectInfoGorm struct {
DB *gorm.DB
func NewObjectInfo(db *gorm.DB) relation.ObjectInfoModelInterface {
return &ObjectInfoGorm{
NewMetaDB(db, &relation.ObjectInfoModel{}),
}
}
func (o *ObjectInfoGorm) NewTx(tx any) relation.ObjectInfoModelInterface {
return &ObjectInfoGorm{
DB: tx.(*gorm.DB),
NewMetaDB(tx.(*gorm.DB), &relation.ObjectInfoModel{}),
}
}

View File

@ -8,19 +8,19 @@ import (
"time"
)
func NewObjectPut(db *gorm.DB) relation.ObjectPutModelInterface {
return &ObjectPutGorm{
DB: db,
}
type ObjectPutGorm struct {
*MetaDB
}
type ObjectPutGorm struct {
DB *gorm.DB
func NewObjectPut(db *gorm.DB) relation.ObjectPutModelInterface {
return &ObjectPutGorm{
NewMetaDB(db, &relation.ObjectPutModel{}),
}
}
func (o *ObjectPutGorm) NewTx(tx any) relation.ObjectPutModelInterface {
return &ObjectPutGorm{
DB: tx.(*gorm.DB),
NewMetaDB(tx.(*gorm.DB), &relation.ObjectPutModel{}),
}
}

View File

@ -8,53 +8,53 @@ import (
)
type UserGorm struct {
DB *gorm.DB
*MetaDB
}
func NewUserGorm(DB *gorm.DB) relation.UserModelInterface {
return &UserGorm{DB: DB.Model(&relation.UserModel{})}
func NewUserGorm(db *gorm.DB) relation.UserModelInterface {
return &UserGorm{NewMetaDB(db, &relation.UserModel{})}
}
// 插入多条
func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel) (err error) {
return utils.Wrap(u.DB.Create(&users).Error, "")
return utils.Wrap(u.db(ctx).Create(&users).Error, "")
}
// 更新用户信息 零值
func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
return utils.Wrap(u.DB.Where("user_id = ?", userID).Updates(args).Error, "")
return utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Updates(args).Error, "")
}
// 更新多个用户信息 非零值
func (u *UserGorm) Update(ctx context.Context, users []*relation.UserModel) (err error) {
return utils.Wrap(u.DB.Updates(&users).Error, "")
return utils.Wrap(u.db(ctx).Updates(&users).Error, "")
}
// 获取指定用户信息 不存在,也不返回错误
func (u *UserGorm) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
err = utils.Wrap(u.DB.Debug().Where("user_id in ?", userIDs).Find(&users).Error, "")
err = utils.Wrap(u.db(ctx).Where("user_id in (?)", userIDs).Find(&users).Error, "")
return users, err
}
// 获取某个用户信息 不存在,则返回错误
func (u *UserGorm) Take(ctx context.Context, userID string) (user *relation.UserModel, err error) {
user = &relation.UserModel{}
err = utils.Wrap(u.DB.Where("user_id = ?", userID).Take(&user).Error, "")
err = utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Take(&user).Error, "")
return user, err
}
// 获取用户信息 不存在,不返回错误
func (u *UserGorm) Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error) {
err = utils.Wrap(u.DB.Count(&count).Error, "")
err = utils.Wrap(u.db(ctx).Count(&count).Error, "")
if err != nil {
return
}
err = utils.Wrap(u.DB.Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&users).Error, "")
err = utils.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&users).Error, "")
return
}
// 获取所有用户ID
func (u *UserGorm) GetAllUserID(ctx context.Context) (userIDs []string, err error) {
err = u.DB.Pluck("user_id", &userIDs).Error
err = u.db(ctx).Pluck("user_id", &userIDs).Error
return userIDs, err
}

View File

@ -1,5 +1,7 @@
package relation
import "context"
const (
conversationModelTableName = "conversations"
)
@ -28,4 +30,15 @@ func (ConversationModel) TableName() string {
}
type ConversationModelInterface interface {
Create(ctx context.Context, conversations []*ConversationModel) (err error)
Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error)
Update(ctx context.Context, conversations []*ConversationModel) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*ConversationModel, err error)
FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error)
FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error)
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
NewTx(tx any) ConversationModelInterface
}

View File

@ -51,11 +51,11 @@ type ZapLogger struct {
func NewZapLogger() (*ZapLogger, error) {
zapConfig := zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
Level: zap.NewAtomicLevelAt(zapcore.Level(config.Config.Log.RemainLogLevel)),
Encoding: "json",
EncoderConfig: zap.NewProductionEncoderConfig(),
DisableStacktrace: true,
InitialFields: map[string]interface{}{"PID": os.Getegid()},
DisableStacktrace: true,
}
if config.Config.Log.Stderr {
zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stderr")
@ -70,7 +70,6 @@ func NewZapLogger() (*ZapLogger, error) {
return nil, err
}
zl.zap = l.Sugar()
zl.zap.WithOptions(zap.AddStacktrace(zap.DPanicLevel))
return zl, nil
}
@ -82,7 +81,6 @@ func (l *ZapLogger) cores() (zap.Option, error) {
c.LevelKey = "level"
c.TimeKey = "time"
c.CallerKey = "caller"
//c.EncodeLevel = zapcore.LowercaseColorLevelEncoder
fileEncoder := zapcore.NewJSONEncoder(c)
fileEncoder.AddInt("PID", os.Getpid())
writer, err := l.getWriter()
@ -92,25 +90,17 @@ func (l *ZapLogger) cores() (zap.Option, error) {
var cores []zapcore.Core
if config.Config.Log.StorageLocation != "" {
cores = []zapcore.Core{
zapcore.NewCore(fileEncoder, writer, zapcore.DebugLevel),
zapcore.NewCore(fileEncoder, writer, zap.NewAtomicLevelAt(zapcore.Level(config.Config.Log.RemainLogLevel))),
}
}
if config.Config.Log.Stderr {
cores = append(cores, zapcore.NewCore(fileEncoder, zapcore.Lock(os.Stdout), zapcore.DebugLevel))
cores = append(cores, zapcore.NewCore(fileEncoder, zapcore.Lock(os.Stdout), zap.NewAtomicLevelAt(zapcore.Level(config.Config.Log.RemainLogLevel))))
}
return zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewTee(cores...)
}), nil
}
func NewErrStackCore(c zapcore.Core) zapcore.Core {
return &errStackCore{c}
}
type errStackCore struct {
zapcore.Core
}
func (l *ZapLogger) getWriter() (zapcore.WriteSyncer, error) {
logf, err := rotatelogs.New(config.Config.Log.StorageLocation+sp+"OpenIM.log.all"+".%Y-%m-%d",
rotatelogs.WithRotationCount(config.Config.Log.RemainRotationCount),
@ -138,7 +128,7 @@ func (l *ZapLogger) Info(ctx context.Context, msg string, keysAndValues ...inter
func (l *ZapLogger) Warn(ctx context.Context, msg string, err error, keysAndValues ...interface{}) {
if err != nil {
keysAndValues = append(keysAndValues, "error", err)
keysAndValues = append(keysAndValues, "error", err.Error())
}
keysAndValues = l.kvAppend(ctx, keysAndValues)
l.zap.Warnw(msg, keysAndValues...)
@ -146,7 +136,7 @@ func (l *ZapLogger) Warn(ctx context.Context, msg string, err error, keysAndValu
func (l *ZapLogger) Error(ctx context.Context, msg string, err error, keysAndValues ...interface{}) {
if err != nil {
keysAndValues = append(keysAndValues, "error", err)
keysAndValues = append(keysAndValues, "error", err.Error())
}
keysAndValues = append([]interface{}{constant.OperationID, tracelog.GetOperationID(ctx)}, keysAndValues...)
l.zap.Errorw(msg, keysAndValues...)

View File

@ -7,8 +7,43 @@ import (
"github.com/gin-gonic/gin"
"io"
"net/http"
"github.com/go-redis/redis/v8"
)
type GinMwOptions func( *gin.RouterGroup )
func WithRecovery() GinMwOptions {
return func(group *gin.RouterGroup) {
group.Use(gin.Recovery())
}
}
func WithCorsHandler() GinMwOptions {
return func(group *gin.RouterGroup) {
group.Use(CorsHandler())
}
}
func WithGinParseOperationID() GinMwOptions {
return func(group *gin.RouterGroup) {
group.Use(GinParseOperationID())
}
}
func WithGinParseToken(rdb redis.UniversalClient) GinMwOptions {
return func(group *gin.RouterGroup) {
group.Use(GinParseToken(rdb))
}
}
func NewRouterGroup(routerGroup *gin.RouterGroup, route string, options ...GinMwOptions) *gin.RouterGroup {
routerGroup = routerGroup.Group(route)
for _, option := range options {
option(routerGroup)
}
return routerGroup
}
func CorsHandler() gin.HandlerFunc {
return func(context *gin.Context) {
context.Writer.Header().Set("Access-Control-Allow-Origin", "*")
@ -33,7 +68,8 @@ func GinParseOperationID() gin.HandlerFunc {
if operationID == "" {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
c.String(400, "read request body error: "+err.Error())
log.ZWarn(c, "read request body error", errs.ErrArgs.Wrap("read request body error: "+err.Error()))
apiresp.GinError(c, errs.ErrArgs.Wrap("read request body error: "+err.Error()))
c.Abort()
return
}
@ -41,12 +77,14 @@ func GinParseOperationID() gin.HandlerFunc {
OperationID string `json:"operationID"`
}{}
if err := json.Unmarshal(body, &req); err != nil {
c.String(400, "get operationID error: "+err.Error())
log.ZWarn(c, "json unmarshal error", errs.ErrArgs.Wrap(err.Error()))
apiresp.GinError(c, errs.ErrArgs.Wrap("json unmarshal error"+err.Error()))
c.Abort()
return
}
if req.OperationID == "" {
c.String(400, "operationID empty")
log.ZWarn(c, "header must have operationID", errs.ErrArgs.Wrap(err.Error()))
apiresp.GinError(c, errs.ErrArgs.Wrap("header must have operationID"+err.Error()))
c.Abort()
return
}
@ -61,3 +99,56 @@ func GinParseOperationID() gin.HandlerFunc {
c.Next()
}
}
func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
dataBase := controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire)
return func(c *gin.Context) {
switch c.Request.Method {
case http.MethodPost:
token := c.Request.Header.Get(constant.Token)
if token == "" {
log.ZWarn(c, "header get token error", errs.ErrArgs.Wrap("header must have token"))
apiresp.GinError(c, errs.ErrArgs.Wrap("header must have token"))
c.Abort()
return
}
claims, err := tokenverify.GetClaimFromToken(token)
if err != nil {
log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
c.Abort()
return
}
m, err := dataBase.GetTokensWithoutError(c, claims.UID, claims.Platform)
if err != nil {
log.ZWarn(c, "cache get token error", errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
return
}
if len(m) == 0 {
log.ZWarn(c, "cache do not exist token error", errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
return
}
if v, ok := m[token]; ok {
switch v {
case constant.NormalToken:
case constant.KickedToken:
log.ZWarn(c, "cache kicked token error", errs.ErrTokenKicked.Wrap())
apiresp.GinError(c, errs.ErrTokenKicked.Wrap())
c.Abort()
return
default:
log.ZWarn(c, "cache unknown token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
c.Abort()
return
}
}
c.Set(constant.OpUserIDPlatformID, constant.PlatformNameToID(claims.Platform))
c.Set(constant.OpUserID, claims.UID)
c.Next()
}
}
}

View File

@ -1,6 +1,7 @@
package statistics
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"time"
)
@ -29,7 +30,7 @@ func (s *Statistics) output() {
intervalCount = *s.AllCount - sum
}
timeIntervalNum++
log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, intervalCount, "total:", *s.AllCount, "intervalNum", timeIntervalNum, "avg", (*s.AllCount)/(timeIntervalNum)/s.SleepTime)
log.ZWarn(context.Background(), " system stat ", nil, "args", s.PrintArgs, "intervalCount", intervalCount, "total:", *s.AllCount, "intervalNum", timeIntervalNum, "avg", (*s.AllCount)/(timeIntervalNum)/s.SleepTime)
}
}