diff --git a/cmd/api/main.go b/cmd/api/main.go index 7909333da..5423dfe23 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -27,12 +27,16 @@ func run(port int) error { if port == 0 { port = config.Config.Api.GinPort[0] } + rdb, err := cache.NewRedis() + if err != nil { + return err + } zk, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, config.Config.Zookeeper.UserName, config.Config.Zookeeper.Password) if err != nil { return err } log.NewPrivateLog(constant.LogFileName) - router := api.NewGinRouter(zk) + router := api.NewGinRouter(zk, rdb) var address string if config.Config.Api.ListenIP != "" { address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port)) diff --git a/config/config.yaml b/config/config.yaml index 58159d63a..47d7e5409 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -160,7 +160,7 @@ log: rotationTime: 24 remainRotationCount: 2 #日志数量 #日志级别 6表示全都打印,测试阶段建议设置为6 - remainLogLevel: 6 + remainLogLevel: -1 stderr: true elasticSearchSwitch: false diff --git a/internal/api/a2r/api2rpc.go b/internal/api/a2r/api2rpc.go index a988bd5b3..812f620c1 100644 --- a/internal/api/a2r/api2rpc.go +++ b/internal/api/a2r/api2rpc.go @@ -15,11 +15,13 @@ func Call[A, B, C any]( ) { var req A if err := c.BindJSON(&req); err != nil { + log.ZWarn(c, "gin bind json error", err, "req", req) apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数错误 return } if check, ok := any(&req).(interface{ Check() error }); ok { if err := check.Check(); err != nil { + log.ZWarn(c, "custom check error", err, "req", req) apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败 return } diff --git a/internal/api/route.go b/internal/api/route.go index a79258d79..ce78a3b2c 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -7,17 +7,17 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/gin-gonic/gin" + "github.com/go-redis/redis/v8" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "io" - "os" ) -func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { +func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine { + zk.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) // 默认RPC中间件 gin.SetMode(gin.ReleaseMode) - f, _ := os.Create("../logs/api.log") - gin.DefaultWriter = io.MultiWriter(f) - // gin.SetMode(gin.DebugMode) + //f, _ := os.Create("../logs/api.log") + //gin.DefaultWriter = io.MultiWriter(f) + //gin.SetMode(gin.DebugMode) r := gin.New() log.Info("load config: ", config.Config) r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) @@ -28,17 +28,18 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { r.Use(prome.PrometheusMiddleware) r.GET("/metrics", prome.PrometheusHandler()) } - zk.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) // 默认RPC中间件 userRouterGroup := r.Group("/user") { u := NewUser(zk) - userRouterGroup.POST("/user_register", u.UserRegister) - userRouterGroup.POST("/update_user_info", u.UpdateUserInfo) //1 - userRouterGroup.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt) - userRouterGroup.POST("/get_users_info", u.GetUsersPublicInfo) //1 - userRouterGroup.POST("/get_all_users_uid", u.GetAllUsersID) // todo - userRouterGroup.POST("/account_check", u.AccountCheck) // todo - userRouterGroup.POST("/get_users", u.GetUsers) + userRouterGroupChild1 := mw.NewRouterGroup(userRouterGroup, "",) + userRouterGroupChild2 := mw.NewRouterGroup(userRouterGroup, "", mw.WithGinParseToken(rdb)) + userRouterGroupChild1.POST("/user_register", u.UserRegister) + userRouterGroupChild2.POST("/update_user_info", u.UpdateUserInfo) //1 + userRouterGroupChild2.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt) + userRouterGroupChild2.POST("/get_users_info", u.GetUsersPublicInfo) //1 + userRouterGroupChild2.POST("/get_all_users_uid", u.GetAllUsersID) // todo + userRouterGroupChild2.POST("/account_check", u.AccountCheck) // todo + userRouterGroupChild2.POST("/get_users", u.GetUsers) } ////friend routing group friendRouterGroup := r.Group("/friend") @@ -56,7 +57,6 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { friendRouterGroup.POST("/remove_black", f.RemoveBlack) //1 friendRouterGroup.POST("/import_friend", f.ImportFriends) //1 friendRouterGroup.POST("/is_friend", f.IsFriend) //1 - } groupRouterGroup := r.Group("/group") g := NewGroup(zk) @@ -94,10 +94,12 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { { a := NewAuth(zk) u := NewUser(zk) - authRouterGroup.POST("/user_register", u.UserRegister) //1 - authRouterGroup.POST("/user_token", a.UserToken) //1 - authRouterGroup.POST("/parse_token", a.ParseToken) //1 - authRouterGroup.POST("/force_logout", a.ForceLogout) //1 + authRouterGroupChild1 := mw.NewRouterGroup(authRouterGroup, "",) + authRouterGroupChild2 := mw.NewRouterGroup(authRouterGroup, "", mw.WithGinParseToken(rdb)) + authRouterGroupChild1.POST("/user_register", u.UserRegister) //1 + authRouterGroupChild1.POST("/user_token", a.UserToken) //1 + authRouterGroupChild2.POST("/parse_token", a.ParseToken) //1 + authRouterGroupChild2.POST("/force_logout", a.ForceLogout) //1 } ////Third service thirdGroup := r.Group("/third") @@ -113,7 +115,6 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { thirdGroup.POST("/confirm_put", t.ConfirmPut) thirdGroup.GET("/get_url", t.GetURL) thirdGroup.GET("/object", t.GetURL) - } ////Message chatGroup := r.Group("/msg") @@ -137,7 +138,7 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { } ////Conversation conversationGroup := r.Group("/conversation") - { //1 + { c := NewConversation(zk) conversationGroup.POST("/get_all_conversations", c.GetAllConversations) conversationGroup.POST("/get_conversation", c.GetConversation) @@ -149,3 +150,4 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine { } return r } + diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 4d4254429..561b4ab3c 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -53,8 +53,7 @@ func (ws *WsServer) UnRegister(c *Client) { } func (ws *WsServer) Validate(s interface{}) error { - //TODO implement me - panic("implement me") + return nil } func (ws *WsServer) GetUserAllCons(userID string) ([]*Client, bool) { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 9c74a1fef..652096e5e 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -71,7 +71,7 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) - statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + //statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) return &och } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 494a0d243..ca56d77b5 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -183,7 +183,7 @@ type config struct { StorageLocation string `yaml:"storageLocation"` RotationTime int `yaml:"rotationTime"` RemainRotationCount uint `yaml:"remainRotationCount"` - RemainLogLevel uint `yaml:"remainLogLevel"` + RemainLogLevel int `yaml:"remainLogLevel"` Stderr bool `yaml:"stderr"` ElasticSearchSwitch bool `yaml:"elasticSearchSwitch"` ElasticSearchAddr []string `yaml:"elasticSearchAddr"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index bfb66ce47..e553be29b 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -273,6 +273,8 @@ const ( const OperationID = "operationID" const OpUserID = "opUserID" const ConnID = "connID" +const OpUserIDPlatformID = "platformID" +const Token = "token" const ( UnreliableNotification = 1 diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index bae708c93..bb5acb77a 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -32,7 +32,7 @@ type ConversationDatabase interface { SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error } -func NewConversationDatabase(conversation relation.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { +func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { return &ConversationDataBase{ conversationDB: conversation, cache: cache, @@ -41,7 +41,7 @@ func NewConversationDatabase(conversation relation.Conversation, cache cache.Con } type ConversationDataBase struct { - conversationDB relation.Conversation + conversationDB relationTb.ConversationModelInterface cache cache.ConversationCache tx tx.Tx } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 4bbbd9d4d..f27af4296 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -46,9 +46,8 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel } miss := utils.SliceAnySub(users, result, func(e *relation.UserModel) string { return e.UserID }) if len(miss) > 0 { - u.userDB.Create(ctx, miss) + _ = u.userDB.Create(ctx, miss) } - return nil } diff --git a/pkg/common/db/relation/black_model.go b/pkg/common/db/relation/black_model.go index 884c050bf..c56909288 100644 --- a/pkg/common/db/relation/black_model.go +++ b/pkg/common/db/relation/black_model.go @@ -8,11 +8,11 @@ import ( ) type BlackGorm struct { - DB *gorm.DB + *MetaDB } func NewBlackGorm(db *gorm.DB) relation.BlackModelInterface { - return &BlackGorm{db} + return &BlackGorm{NewMetaDB(db, &relation.BlackModel{})} } func (b *BlackGorm) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) { diff --git a/pkg/common/db/relation/chat_log_model.go b/pkg/common/db/relation/chat_log_model.go index 3a2f35b51..ecd8cd2a2 100644 --- a/pkg/common/db/relation/chat_log_model.go +++ b/pkg/common/db/relation/chat_log_model.go @@ -14,11 +14,11 @@ import ( ) type ChatLogGorm struct { - DB *gorm.DB + *MetaDB } func NewChatLogGorm(db *gorm.DB) relation.ChatLogModelInterface { - return &ChatLogGorm{DB: db} + return &ChatLogGorm{NewMetaDB(db, &relation.ChatLogModel{})} } func (c *ChatLogGorm) Create(msg pbMsg.MsgDataToMQ) error { diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index 049eebff1..5955bc886 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -8,29 +8,16 @@ import ( "gorm.io/gorm" ) -type Conversation interface { - Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) - Delete(ctx context.Context, groupIDs []string) (err error) - UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) - Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) - Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) - FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error) - FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) - Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) - FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) - FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) - NewTx(tx any) Conversation -} type ConversationGorm struct { - DB *gorm.DB + *MetaDB } -func NewConversationGorm(DB *gorm.DB) Conversation { - return &ConversationGorm{DB: DB} +func NewConversationGorm(db *gorm.DB) relation.ConversationModelInterface { + return &ConversationGorm{NewMetaDB(db, &relation.ConversationModel{})} } -func (c *ConversationGorm) NewTx(tx any) Conversation { - return &ConversationGorm{DB: tx.(*gorm.DB)} +func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface { + return &ConversationGorm{NewMetaDB(tx.(*gorm.DB), &relation.ConversationModel{})} } func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { diff --git a/pkg/common/db/relation/friend_model.go b/pkg/common/db/relation/friend_model.go index f24fdb7c8..d22127dc2 100644 --- a/pkg/common/db/relation/friend_model.go +++ b/pkg/common/db/relation/friend_model.go @@ -8,46 +8,46 @@ import ( ) type FriendGorm struct { - DB *gorm.DB + *MetaDB } func NewFriendGorm(db *gorm.DB) relation.FriendModelInterface { - return &FriendGorm{DB: db} + return &FriendGorm{NewMetaDB(db, &relation.FriendModel{})} } func (f *FriendGorm) NewTx(tx any) relation.FriendModelInterface { - return &FriendGorm{DB: tx.(*gorm.DB)} + return &FriendGorm{NewMetaDB(tx.(*gorm.DB), &relation.FriendModel{})} } // 插入多条记录 func (f *FriendGorm) Create(ctx context.Context, friends []*relation.FriendModel) (err error) { - return utils.Wrap(f.DB.Create(&friends).Error, "") + return utils.Wrap(f.db(ctx).Create(&friends).Error, "") } // 删除ownerUserID指定的好友 func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) { - err = utils.Wrap(f.DB.Where("owner_user_id = ? AND friend_user_id in ( ?)", ownerUserID, friendUserIDs).Delete(&relation.FriendModel{}).Error, "") + err = utils.Wrap(f.db(ctx).Where("owner_user_id = ? AND friend_user_id in ( ?)", ownerUserID, friendUserIDs).Delete(&relation.FriendModel{}).Error, "") return err } // 更新ownerUserID单个好友信息 更新零值 func (f *FriendGorm) UpdateByMap(ctx context.Context, ownerUserID string, friendUserID string, args map[string]interface{}) (err error) { - return utils.Wrap(f.DB.Model(&relation.FriendModel{}).Where("owner_user_id = ? AND friend_user_id = ? ", ownerUserID, friendUserID).Updates(args).Error, "") + return utils.Wrap(f.db(ctx).Where("owner_user_id = ? AND friend_user_id = ? ", ownerUserID, friendUserID).Updates(args).Error, "") } // 更新好友信息的非零值 func (f *FriendGorm) Update(ctx context.Context, friends []*relation.FriendModel) (err error) { - return utils.Wrap(f.DB.Updates(&friends).Error, "") + return utils.Wrap(f.db(ctx).Updates(&friends).Error, "") } // 更新好友备注(也支持零值 ) func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) { if remark != "" { - return utils.Wrap(f.DB.Model(&relation.FriendModel{}).Where("owner_user_id = ? and friend_user_id = ?", ownerUserID, friendUserID).Update("remark", remark).Error, "") + return utils.Wrap(f.db(ctx).Where("owner_user_id = ? and friend_user_id = ?", ownerUserID, friendUserID).Update("remark", remark).Error, "") } m := make(map[string]interface{}, 1) m["remark"] = "" - return utils.Wrap(f.DB.Model(&relation.FriendModel{}).Where("owner_user_id = ?", ownerUserID).Updates(m).Error, "") + return utils.Wrap(f.db(ctx).Where("owner_user_id = ?", ownerUserID).Updates(m).Error, "") } diff --git a/pkg/common/db/relation/friend_request_model.go b/pkg/common/db/relation/friend_request_model.go index b259b35a1..21998b7aa 100644 --- a/pkg/common/db/relation/friend_request_model.go +++ b/pkg/common/db/relation/friend_request_model.go @@ -7,16 +7,16 @@ import ( "gorm.io/gorm" ) -func NewFriendRequestGorm(db *gorm.DB) relation.FriendRequestModelInterface { - return &FriendRequestGorm{db} +type FriendRequestGorm struct { + *MetaDB } -type FriendRequestGorm struct { - DB *gorm.DB +func NewFriendRequestGorm(db *gorm.DB) relation.FriendRequestModelInterface { + return &FriendRequestGorm{NewMetaDB(db, &relation.FriendModel{})} } func (f *FriendRequestGorm) NewTx(tx any) relation.FriendRequestModelInterface { - return &FriendRequestGorm{DB: tx.(*gorm.DB)} + return &FriendRequestGorm{NewMetaDB(tx.(*gorm.DB), &relation.FriendModel{})} } // 插入多条记录 diff --git a/pkg/common/db/relation/group_member_model.go b/pkg/common/db/relation/group_member_model.go index 945ea391e..42266c550 100644 --- a/pkg/common/db/relation/group_member_model.go +++ b/pkg/common/db/relation/group_member_model.go @@ -11,15 +11,15 @@ import ( var _ relation.GroupMemberModelInterface = (*GroupMemberGorm)(nil) type GroupMemberGorm struct { - DB *gorm.DB + *MetaDB } func NewGroupMemberDB(db *gorm.DB) relation.GroupMemberModelInterface { - return &GroupMemberGorm{DB: db} + return &GroupMemberGorm{NewMetaDB(db, &relation.GroupMemberModel{})} } func (g *GroupMemberGorm) NewTx(tx any) relation.GroupMemberModelInterface { - return &GroupMemberGorm{DB: tx.(*gorm.DB)} + return &GroupMemberGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupMemberModel{})} } func (g *GroupMemberGorm) Create(ctx context.Context, groupMemberList []*relation.GroupMemberModel) (err error) { diff --git a/pkg/common/db/relation/group_model.go b/pkg/common/db/relation/group_model.go index fe5a2cdee..e4d5d9759 100644 --- a/pkg/common/db/relation/group_model.go +++ b/pkg/common/db/relation/group_model.go @@ -10,15 +10,15 @@ import ( var _ relation.GroupModelInterface = (*GroupGorm)(nil) type GroupGorm struct { - DB *gorm.DB + *MetaDB } func NewGroupDB(db *gorm.DB) relation.GroupModelInterface { - return &GroupGorm{DB: db} + return &GroupGorm{NewMetaDB(db, &relation.GroupModel{})} } func (g *GroupGorm) NewTx(tx any) relation.GroupModelInterface { - return &GroupGorm{DB: tx.(*gorm.DB)} + return &GroupGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupModel{})} } func (g *GroupGorm) Create(ctx context.Context, groups []*relation.GroupModel) (err error) { diff --git a/pkg/common/db/relation/group_request_model.go b/pkg/common/db/relation/group_request_model.go index 4c1f8299e..d5c874bff 100644 --- a/pkg/common/db/relation/group_request_model.go +++ b/pkg/common/db/relation/group_request_model.go @@ -8,21 +8,19 @@ import ( ) type GroupRequestGorm struct { - DB *gorm.DB -} - -func (g *GroupRequestGorm) NewTx(tx any) relation.GroupRequestModelInterface { - return &GroupRequestGorm{ - DB: tx.(*gorm.DB), - } + *MetaDB } func NewGroupRequest(db *gorm.DB) relation.GroupRequestModelInterface { return &GroupRequestGorm{ - DB: db, + NewMetaDB(db, &relation.GroupRequestModel{}), } } +func (g *GroupRequestGorm) NewTx(tx any) relation.GroupRequestModelInterface { + return &GroupRequestGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupRequestModel{})} +} + func (g *GroupRequestGorm) Create(ctx context.Context, groupRequests []*relation.GroupRequestModel) (err error) { return utils.Wrap(g.DB.Create(&groupRequests).Error, utils.GetSelfFuncName()) } diff --git a/pkg/common/db/relation/meta_db.go b/pkg/common/db/relation/meta_db.go new file mode 100644 index 000000000..b758bb863 --- /dev/null +++ b/pkg/common/db/relation/meta_db.go @@ -0,0 +1,22 @@ +package relation + +import ( + "context" + "gorm.io/gorm" +) + +type MetaDB struct { + DB *gorm.DB + table interface{} +} + +func NewMetaDB(db *gorm.DB, table any) *MetaDB { + return &MetaDB{ + DB: db, + table: table, + } +} + +func (g *MetaDB) db(ctx context.Context) *gorm.DB { + return g.DB.WithContext(ctx).Model(g.table) +} diff --git a/pkg/common/db/relation/mysql_init.go b/pkg/common/db/relation/mysql_init.go index c225f1dca..b365b8b4d 100644 --- a/pkg/common/db/relation/mysql_init.go +++ b/pkg/common/db/relation/mysql_init.go @@ -2,6 +2,7 @@ package relation import ( "context" + "errors" "fmt" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -9,7 +10,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" mysqlDriver "github.com/go-sql-driver/mysql" "gorm.io/driver/mysql" - "strings" + gormUtils "gorm.io/gorm/utils" "time" "gorm.io/gorm" @@ -39,16 +40,9 @@ func newMysqlGormDB() (*gorm.DB, error) { } dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) - newLogger := logger.New( - Writer{}, - logger.Config{ - SlowThreshold: time.Duration(config.Config.Mysql.SlowThreshold) * time.Millisecond, // Slow SQL threshold - LogLevel: logger.LogLevel(config.Config.Mysql.LogLevel), // Log level - IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger - }, - ) + sqlLogger := NewSqlLogger(logger.LogLevel(config.Config.Mysql.LogLevel), true, time.Duration(config.Config.Mysql.SlowThreshold)*time.Millisecond) db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ - Logger: newLogger, + Logger: sqlLogger, }) if err != nil { return nil, err @@ -84,14 +78,65 @@ func IsMysqlDuplicateKey(err error) bool { return false } -type Writer struct{} +type SqlLogger struct { + LogLevel logger.LogLevel + IgnoreRecordNotFoundError bool + SlowThreshold time.Duration +} -func (w Writer) Printf(format string, args ...interface{}) { - s := fmt.Sprintf(format, args...) - l := strings.Split(s, "\n") - if len(l) == 2 { - log.ZDebug(context.Background(), "sql exec detail", "gorm", l[0], "sql", l[1]) - } else { - log.ZDebug(context.Background(), "sql exec detail", "sql", s) +func NewSqlLogger(logLevel logger.LogLevel, ignoreRecordNotFoundError bool, slowThreshold time.Duration) *SqlLogger { + return &SqlLogger{ + LogLevel: logLevel, + IgnoreRecordNotFoundError: ignoreRecordNotFoundError, + SlowThreshold: slowThreshold, + } +} + +func (l *SqlLogger) LogMode(logLevel logger.LogLevel) logger.Interface { + newLogger := *l + newLogger.LogLevel = logLevel + return &newLogger +} + +func (SqlLogger) Info(ctx context.Context, msg string, args ...interface{}) { + log.ZInfo(ctx, msg, args) +} + +func (SqlLogger) Warn(ctx context.Context, msg string, args ...interface{}) { + log.ZWarn(ctx, msg, nil, args) +} + +func (SqlLogger) Error(ctx context.Context, msg string, args ...interface{}) { + log.ZError(ctx, msg, nil, args) +} + +func (l SqlLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) { + if l.LogLevel <= logger.Silent { + return + } + elapsed := time.Since(begin) + switch { + case err != nil && l.LogLevel >= logger.Error && (!errors.Is(err, gorm.ErrRecordNotFound) || !l.IgnoreRecordNotFoundError): + sql, rows := fc() + if rows == -1 { + log.ZError(ctx, "sql exec detail", err, "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "sql", sql) + } else { + log.ZError(ctx, "sql exec detail", err, "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "rows", rows, "sql", sql) + } + case elapsed > l.SlowThreshold && l.SlowThreshold != 0 && l.LogLevel >= logger.Warn: + sql, rows := fc() + slowLog := fmt.Sprintf("SLOW SQL >= %v", l.SlowThreshold) + if rows == -1 { + log.ZWarn(ctx, "sql exec detail", nil, "gorm", gormUtils.FileWithLineNum(), nil, "slow sql", slowLog, "time(ms)", float64(elapsed.Nanoseconds())/1e6, "sql", sql) + } else { + log.ZWarn(ctx, "sql exec detail", nil, "gorm", gormUtils.FileWithLineNum(), nil, "slow sql", slowLog, "time(ms)", float64(elapsed.Nanoseconds())/1e6, "rows", rows, "sql", sql) + } + case l.LogLevel == logger.Info: + sql, rows := fc() + if rows == -1 { + log.ZDebug(ctx, "sql exec detail", "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "sql", sql) + } else { + log.ZDebug(ctx, "sql exec detail", "gorm", gormUtils.FileWithLineNum(), "time(ms)", float64(elapsed.Nanoseconds())/1e6, "rows", rows, "sql", sql) + } } } diff --git a/pkg/common/db/relation/object_hash_model.go b/pkg/common/db/relation/object_hash_model.go index 9d22a029a..e122d9cec 100644 --- a/pkg/common/db/relation/object_hash_model.go +++ b/pkg/common/db/relation/object_hash_model.go @@ -7,19 +7,19 @@ import ( "gorm.io/gorm" ) -func NewObjectHash(db *gorm.DB) relation.ObjectHashModelInterface { - return &ObjectHashGorm{ - DB: db, - } +type ObjectHashGorm struct { + *MetaDB } -type ObjectHashGorm struct { - DB *gorm.DB +func NewObjectHash(db *gorm.DB) relation.ObjectHashModelInterface { + return &ObjectHashGorm{ + NewMetaDB(db, &relation.ObjectHashModel{}), + } } func (o *ObjectHashGorm) NewTx(tx any) relation.ObjectHashModelInterface { return &ObjectHashGorm{ - DB: tx.(*gorm.DB), + NewMetaDB(tx.(*gorm.DB), &relation.ObjectHashModel{}), } } diff --git a/pkg/common/db/relation/object_info_model.go b/pkg/common/db/relation/object_info_model.go index 69f9f87d5..bb3fce142 100644 --- a/pkg/common/db/relation/object_info_model.go +++ b/pkg/common/db/relation/object_info_model.go @@ -8,19 +8,19 @@ import ( "time" ) -func NewObjectInfo(db *gorm.DB) relation.ObjectInfoModelInterface { - return &ObjectInfoGorm{ - DB: db, - } +type ObjectInfoGorm struct { + *MetaDB } -type ObjectInfoGorm struct { - DB *gorm.DB +func NewObjectInfo(db *gorm.DB) relation.ObjectInfoModelInterface { + return &ObjectInfoGorm{ + NewMetaDB(db, &relation.ObjectInfoModel{}), + } } func (o *ObjectInfoGorm) NewTx(tx any) relation.ObjectInfoModelInterface { return &ObjectInfoGorm{ - DB: tx.(*gorm.DB), + NewMetaDB(tx.(*gorm.DB), &relation.ObjectInfoModel{}), } } diff --git a/pkg/common/db/relation/object_put_model.go b/pkg/common/db/relation/object_put_model.go index e8a4bd88a..82f98624c 100644 --- a/pkg/common/db/relation/object_put_model.go +++ b/pkg/common/db/relation/object_put_model.go @@ -8,19 +8,19 @@ import ( "time" ) -func NewObjectPut(db *gorm.DB) relation.ObjectPutModelInterface { - return &ObjectPutGorm{ - DB: db, - } +type ObjectPutGorm struct { + *MetaDB } -type ObjectPutGorm struct { - DB *gorm.DB +func NewObjectPut(db *gorm.DB) relation.ObjectPutModelInterface { + return &ObjectPutGorm{ + NewMetaDB(db, &relation.ObjectPutModel{}), + } } func (o *ObjectPutGorm) NewTx(tx any) relation.ObjectPutModelInterface { return &ObjectPutGorm{ - DB: tx.(*gorm.DB), + NewMetaDB(tx.(*gorm.DB), &relation.ObjectPutModel{}), } } diff --git a/pkg/common/db/relation/user_model.go b/pkg/common/db/relation/user_model.go index 1b427aac1..4f34288d7 100644 --- a/pkg/common/db/relation/user_model.go +++ b/pkg/common/db/relation/user_model.go @@ -8,53 +8,53 @@ import ( ) type UserGorm struct { - DB *gorm.DB + *MetaDB } -func NewUserGorm(DB *gorm.DB) relation.UserModelInterface { - return &UserGorm{DB: DB.Model(&relation.UserModel{})} +func NewUserGorm(db *gorm.DB) relation.UserModelInterface { + return &UserGorm{NewMetaDB(db, &relation.UserModel{})} } // 插入多条 func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel) (err error) { - return utils.Wrap(u.DB.Create(&users).Error, "") + return utils.Wrap(u.db(ctx).Create(&users).Error, "") } // 更新用户信息 零值 func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { - return utils.Wrap(u.DB.Where("user_id = ?", userID).Updates(args).Error, "") + return utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Updates(args).Error, "") } // 更新多个用户信息 非零值 func (u *UserGorm) Update(ctx context.Context, users []*relation.UserModel) (err error) { - return utils.Wrap(u.DB.Updates(&users).Error, "") + return utils.Wrap(u.db(ctx).Updates(&users).Error, "") } // 获取指定用户信息 不存在,也不返回错误 func (u *UserGorm) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { - err = utils.Wrap(u.DB.Debug().Where("user_id in ?", userIDs).Find(&users).Error, "") + err = utils.Wrap(u.db(ctx).Where("user_id in (?)", userIDs).Find(&users).Error, "") return users, err } // 获取某个用户信息 不存在,则返回错误 func (u *UserGorm) Take(ctx context.Context, userID string) (user *relation.UserModel, err error) { user = &relation.UserModel{} - err = utils.Wrap(u.DB.Where("user_id = ?", userID).Take(&user).Error, "") + err = utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Take(&user).Error, "") return user, err } // 获取用户信息 不存在,不返回错误 func (u *UserGorm) Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error) { - err = utils.Wrap(u.DB.Count(&count).Error, "") + err = utils.Wrap(u.db(ctx).Count(&count).Error, "") if err != nil { return } - err = utils.Wrap(u.DB.Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&users).Error, "") + err = utils.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&users).Error, "") return } // 获取所有用户ID func (u *UserGorm) GetAllUserID(ctx context.Context) (userIDs []string, err error) { - err = u.DB.Pluck("user_id", &userIDs).Error + err = u.db(ctx).Pluck("user_id", &userIDs).Error return userIDs, err } diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 454d82b45..00e3a26a6 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -1,5 +1,7 @@ package relation +import "context" + const ( conversationModelTableName = "conversations" ) @@ -28,4 +30,15 @@ func (ConversationModel) TableName() string { } type ConversationModelInterface interface { + Create(ctx context.Context, conversations []*ConversationModel) (err error) + Delete(ctx context.Context, groupIDs []string) (err error) + UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) + Update(ctx context.Context, conversations []*ConversationModel) (err error) + Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*ConversationModel, err error) + FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error) + FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) + Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error) + FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) + FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + NewTx(tx any) ConversationModelInterface } diff --git a/pkg/common/log/zap.go b/pkg/common/log/zap.go index 01826a3e4..dc8a0ed95 100644 --- a/pkg/common/log/zap.go +++ b/pkg/common/log/zap.go @@ -51,11 +51,11 @@ type ZapLogger struct { func NewZapLogger() (*ZapLogger, error) { zapConfig := zap.Config{ - Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + Level: zap.NewAtomicLevelAt(zapcore.Level(config.Config.Log.RemainLogLevel)), Encoding: "json", EncoderConfig: zap.NewProductionEncoderConfig(), - DisableStacktrace: true, InitialFields: map[string]interface{}{"PID": os.Getegid()}, + DisableStacktrace: true, } if config.Config.Log.Stderr { zapConfig.OutputPaths = append(zapConfig.OutputPaths, "stderr") @@ -70,7 +70,6 @@ func NewZapLogger() (*ZapLogger, error) { return nil, err } zl.zap = l.Sugar() - zl.zap.WithOptions(zap.AddStacktrace(zap.DPanicLevel)) return zl, nil } @@ -82,7 +81,6 @@ func (l *ZapLogger) cores() (zap.Option, error) { c.LevelKey = "level" c.TimeKey = "time" c.CallerKey = "caller" - //c.EncodeLevel = zapcore.LowercaseColorLevelEncoder fileEncoder := zapcore.NewJSONEncoder(c) fileEncoder.AddInt("PID", os.Getpid()) writer, err := l.getWriter() @@ -92,25 +90,17 @@ func (l *ZapLogger) cores() (zap.Option, error) { var cores []zapcore.Core if config.Config.Log.StorageLocation != "" { cores = []zapcore.Core{ - zapcore.NewCore(fileEncoder, writer, zapcore.DebugLevel), + zapcore.NewCore(fileEncoder, writer, zap.NewAtomicLevelAt(zapcore.Level(config.Config.Log.RemainLogLevel))), } } if config.Config.Log.Stderr { - cores = append(cores, zapcore.NewCore(fileEncoder, zapcore.Lock(os.Stdout), zapcore.DebugLevel)) + cores = append(cores, zapcore.NewCore(fileEncoder, zapcore.Lock(os.Stdout), zap.NewAtomicLevelAt(zapcore.Level(config.Config.Log.RemainLogLevel)))) } return zap.WrapCore(func(c zapcore.Core) zapcore.Core { return zapcore.NewTee(cores...) }), nil } -func NewErrStackCore(c zapcore.Core) zapcore.Core { - return &errStackCore{c} -} - -type errStackCore struct { - zapcore.Core -} - func (l *ZapLogger) getWriter() (zapcore.WriteSyncer, error) { logf, err := rotatelogs.New(config.Config.Log.StorageLocation+sp+"OpenIM.log.all"+".%Y-%m-%d", rotatelogs.WithRotationCount(config.Config.Log.RemainRotationCount), @@ -138,7 +128,7 @@ func (l *ZapLogger) Info(ctx context.Context, msg string, keysAndValues ...inter func (l *ZapLogger) Warn(ctx context.Context, msg string, err error, keysAndValues ...interface{}) { if err != nil { - keysAndValues = append(keysAndValues, "error", err) + keysAndValues = append(keysAndValues, "error", err.Error()) } keysAndValues = l.kvAppend(ctx, keysAndValues) l.zap.Warnw(msg, keysAndValues...) @@ -146,7 +136,7 @@ func (l *ZapLogger) Warn(ctx context.Context, msg string, err error, keysAndValu func (l *ZapLogger) Error(ctx context.Context, msg string, err error, keysAndValues ...interface{}) { if err != nil { - keysAndValues = append(keysAndValues, "error", err) + keysAndValues = append(keysAndValues, "error", err.Error()) } keysAndValues = append([]interface{}{constant.OperationID, tracelog.GetOperationID(ctx)}, keysAndValues...) l.zap.Errorw(msg, keysAndValues...) diff --git a/pkg/common/mw/gin.go b/pkg/common/mw/gin.go index 9d825c063..d82dfd733 100644 --- a/pkg/common/mw/gin.go +++ b/pkg/common/mw/gin.go @@ -7,8 +7,43 @@ import ( "github.com/gin-gonic/gin" "io" "net/http" + "github.com/go-redis/redis/v8" ) +type GinMwOptions func( *gin.RouterGroup ) + +func WithRecovery() GinMwOptions { + return func(group *gin.RouterGroup) { + group.Use(gin.Recovery()) + } +} + +func WithCorsHandler() GinMwOptions { + return func(group *gin.RouterGroup) { + group.Use(CorsHandler()) + } +} + +func WithGinParseOperationID() GinMwOptions { + return func(group *gin.RouterGroup) { + group.Use(GinParseOperationID()) + } +} + +func WithGinParseToken(rdb redis.UniversalClient) GinMwOptions { + return func(group *gin.RouterGroup) { + group.Use(GinParseToken(rdb)) + } +} + +func NewRouterGroup(routerGroup *gin.RouterGroup, route string, options ...GinMwOptions) *gin.RouterGroup { + routerGroup = routerGroup.Group(route) + for _, option := range options { + option(routerGroup) + } + return routerGroup +} + func CorsHandler() gin.HandlerFunc { return func(context *gin.Context) { context.Writer.Header().Set("Access-Control-Allow-Origin", "*") @@ -33,7 +68,8 @@ func GinParseOperationID() gin.HandlerFunc { if operationID == "" { body, err := io.ReadAll(c.Request.Body) if err != nil { - c.String(400, "read request body error: "+err.Error()) + log.ZWarn(c, "read request body error", errs.ErrArgs.Wrap("read request body error: "+err.Error())) + apiresp.GinError(c, errs.ErrArgs.Wrap("read request body error: "+err.Error())) c.Abort() return } @@ -41,12 +77,14 @@ func GinParseOperationID() gin.HandlerFunc { OperationID string `json:"operationID"` }{} if err := json.Unmarshal(body, &req); err != nil { - c.String(400, "get operationID error: "+err.Error()) + log.ZWarn(c, "json unmarshal error", errs.ErrArgs.Wrap(err.Error())) + apiresp.GinError(c, errs.ErrArgs.Wrap("json unmarshal error"+err.Error())) c.Abort() return } if req.OperationID == "" { - c.String(400, "operationID empty") + log.ZWarn(c, "header must have operationID", errs.ErrArgs.Wrap(err.Error())) + apiresp.GinError(c, errs.ErrArgs.Wrap("header must have operationID"+err.Error())) c.Abort() return } @@ -61,3 +99,56 @@ func GinParseOperationID() gin.HandlerFunc { c.Next() } } +func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc { + dataBase := controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire) + return func(c *gin.Context) { + switch c.Request.Method { + case http.MethodPost: + token := c.Request.Header.Get(constant.Token) + if token == "" { + log.ZWarn(c, "header get token error", errs.ErrArgs.Wrap("header must have token")) + apiresp.GinError(c, errs.ErrArgs.Wrap("header must have token")) + c.Abort() + return + } + claims, err := tokenverify.GetClaimFromToken(token) + if err != nil { + log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap()) + apiresp.GinError(c, errs.ErrTokenUnknown.Wrap()) + c.Abort() + return + } + m, err := dataBase.GetTokensWithoutError(c, claims.UID, claims.Platform) + if err != nil { + log.ZWarn(c, "cache get token error", errs.ErrTokenNotExist.Wrap()) + apiresp.GinError(c, errs.ErrTokenNotExist.Wrap()) + c.Abort() + return + } + if len(m) == 0 { + log.ZWarn(c, "cache do not exist token error", errs.ErrTokenNotExist.Wrap()) + apiresp.GinError(c, errs.ErrTokenNotExist.Wrap()) + c.Abort() + return + } + if v, ok := m[token]; ok { + switch v { + case constant.NormalToken: + case constant.KickedToken: + log.ZWarn(c, "cache kicked token error", errs.ErrTokenKicked.Wrap()) + apiresp.GinError(c, errs.ErrTokenKicked.Wrap()) + c.Abort() + return + default: + log.ZWarn(c, "cache unknown token error", errs.ErrTokenUnknown.Wrap()) + apiresp.GinError(c, errs.ErrTokenUnknown.Wrap()) + c.Abort() + return + } + } + c.Set(constant.OpUserIDPlatformID, constant.PlatformNameToID(claims.Platform)) + c.Set(constant.OpUserID, claims.UID) + c.Next() + } + } +} diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index 2c178d2b3..700b24746 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -1,6 +1,7 @@ package statistics import ( + "context" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "time" ) @@ -29,7 +30,7 @@ func (s *Statistics) output() { intervalCount = *s.AllCount - sum } timeIntervalNum++ - log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, intervalCount, "total:", *s.AllCount, "intervalNum", timeIntervalNum, "avg", (*s.AllCount)/(timeIntervalNum)/s.SleepTime) + log.ZWarn(context.Background(), " system stat ", nil, "args", s.PrintArgs, "intervalCount", intervalCount, "total:", *s.AllCount, "intervalNum", timeIntervalNum, "avg", (*s.AllCount)/(timeIntervalNum)/s.SleepTime) } }