refactor: all module update.

This commit is contained in:
Gordon 2024-04-10 10:12:33 +08:00
parent a7d43bb186
commit 92efb51206
2 changed files with 65 additions and 75 deletions

View File

@ -17,27 +17,18 @@ package tools
import (
"context"
"fmt"
"github.com/openimsdk/tools/db/redisutil"
"math"
"math/rand"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type MsgTool struct {
@ -64,60 +55,61 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle
}
func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return nil, err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return nil, err
}
discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
if err != nil {
return nil, err
}
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
userDB, err := mgo.NewUserMongo(mgocli.GetDB())
if err != nil {
return nil, err
}
msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config)
if err != nil {
return nil, err
}
userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
userDatabase := controller.NewUserDatabase(
userDB,
cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
mgocli.GetTx(),
userMongoDB,
)
groupDB, err := mgo.NewGroupMongo(mgocli.GetDB())
if err != nil {
return nil, err
}
groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB())
if err != nil {
return nil, err
}
groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB())
if err != nil {
return nil, err
}
conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB())
if err != nil {
return nil, err
}
groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil)
conversationDatabase := controller.NewConversationDatabase(
conversationDB,
cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
mgocli.GetTx(),
)
msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg)
msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
return msgTool, nil
//mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
//if err != nil {
// return nil, err
//}
//rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
//if err != nil {
// return nil, err
//}
//discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
//if err != nil {
// return nil, err
//}
//discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
//userDB, err := mgo.NewUserMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
////msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config)
//if err != nil {
// return nil, err
//}
//userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
//userDatabase := controller.NewUserDatabase(
// userDB,
// cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
// mgocli.GetTx(),
// userMongoDB,
//)
//groupDB, err := mgo.NewGroupMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil)
//conversationDatabase := controller.NewConversationDatabase(
// conversationDB,
// cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
// mgocli.GetTx(),
//)
//msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg)
//msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
//msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
//return msgTool, nil
return nil, nil
}
// func (c *MsgTool) AllConversationClearMsgAndFixSeq() {

View File

@ -17,13 +17,11 @@ package controller
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/protocol/constant"
@ -133,16 +131,16 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M
}, nil
}
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) {
msgDocModel, err := mgo.NewMsgMongo(database)
if err != nil {
return nil, err
}
//todo MsgCacheTimeout
msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline)
seq := cache.NewSeqCache(rdb)
return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig)
}
//func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) {
// msgDocModel, err := mgo.NewMsgMongo(database)
// if err != nil {
// return nil, err
// }
// //todo MsgCacheTimeout
// msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline)
// seq := cache.NewSeqCache(rdb)
// return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig)
//}
type commonMsgDatabase struct {
msgDocDatabase relation.MsgDocModelInterface