diff --git a/internal/tools/msg.go b/internal/tools/msg.go index d1183417a..112d14ef5 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,27 +17,18 @@ package tools import ( "context" "fmt" - "github.com/openimsdk/tools/db/redisutil" "math" "math/rand" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type MsgTool struct { @@ -64,60 +55,61 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle } func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) - if err != nil { - return nil, err - } - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) - if err != nil { - return nil, err - } - discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) - if err != nil { - return nil, err - } - discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - userDB, err := mgo.NewUserMongo(mgocli.GetDB()) - if err != nil { - return nil, err - } - msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config) - if err != nil { - return nil, err - } - userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) - userDatabase := controller.NewUserDatabase( - userDB, - cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()), - mgocli.GetTx(), - userMongoDB, - ) - groupDB, err := mgo.NewGroupMongo(mgocli.GetDB()) - if err != nil { - return nil, err - } - groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB()) - if err != nil { - return nil, err - } - groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB()) - if err != nil { - return nil, err - } - conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB()) - if err != nil { - return nil, err - } - groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil) - conversationDatabase := controller.NewConversationDatabase( - conversationDB, - cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), - mgocli.GetTx(), - ) - msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg) - msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient)) - msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config) - return msgTool, nil + //mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) + //if err != nil { + // return nil, err + //} + //rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + //if err != nil { + // return nil, err + //} + //discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + //if err != nil { + // return nil, err + //} + //discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + //userDB, err := mgo.NewUserMongo(mgocli.GetDB()) + //if err != nil { + // return nil, err + //} + ////msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config) + //if err != nil { + // return nil, err + //} + //userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) + //userDatabase := controller.NewUserDatabase( + // userDB, + // cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()), + // mgocli.GetTx(), + // userMongoDB, + //) + //groupDB, err := mgo.NewGroupMongo(mgocli.GetDB()) + //if err != nil { + // return nil, err + //} + //groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB()) + //if err != nil { + // return nil, err + //} + //groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB()) + //if err != nil { + // return nil, err + //} + //conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB()) + //if err != nil { + // return nil, err + //} + //groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil) + //conversationDatabase := controller.NewConversationDatabase( + // conversationDB, + // cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), + // mgocli.GetTx(), + //) + //msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg) + //msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient)) + //msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config) + //return msgTool, nil + return nil, nil } // func (c *MsgTool) AllConversationClearMsgAndFixSeq() { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 28914a21f..130e35d20 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -17,13 +17,11 @@ package controller import ( "context" "encoding/json" - "github.com/openimsdk/open-im-server/v3/internal/tools" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/protocol/constant" @@ -133,16 +131,16 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M }, nil } -func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) { - msgDocModel, err := mgo.NewMsgMongo(database) - if err != nil { - return nil, err - } - //todo MsgCacheTimeout - msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) - seq := cache.NewSeqCache(rdb) - return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig) -} +//func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) { +// msgDocModel, err := mgo.NewMsgMongo(database) +// if err != nil { +// return nil, err +// } +// //todo MsgCacheTimeout +// msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) +// seq := cache.NewSeqCache(rdb) +// return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig) +//} type commonMsgDatabase struct { msgDocDatabase relation.MsgDocModelInterface