From eb946ad181b1b412e9c3811ec7fe38b751705a51 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 13 Mar 2023 15:39:47 +0800 Subject: [PATCH] log --- cmd/api/main.go | 11 ++-- cmd/cmdutils/main.go | 5 +- cmd/crontask/main.go | 5 +- cmd/msggateway/main.go | 5 +- cmd/msgtransfer/main.go | 5 +- cmd/push/main.go | 8 +-- cmd/rpc/auth/main.go | 8 +-- cmd/rpc/conversation/main.go | 8 +-- cmd/rpc/friend/main.go | 8 +-- cmd/rpc/group/main.go | 12 ++--- cmd/rpc/msg/main.go | 8 +-- cmd/rpc/third/main.go | 8 +-- cmd/rpc/user/main.go | 8 +-- config/config.yaml | 2 +- .../msgtransfer/online_history_msg_handler.go | 53 +++++-------------- internal/rpc/conversation/conversaion.go | 13 ++--- internal/startrpc/start.go | 4 -- pkg/common/db/controller/msg.go | 39 ++++++++++++-- 18 files changed, 82 insertions(+), 128 deletions(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index 4108c4138..4d8e521fb 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -9,7 +9,7 @@ import ( "context" "fmt" "github.com/OpenIMSDK/openKeeper" - "os" + "net" "strconv" "OpenIM/pkg/common/constant" @@ -20,8 +20,7 @@ func main() { apiCmd.AddPortFlag() apiCmd.AddApi(run) if err := apiCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + panic(err.Error()) } } @@ -36,9 +35,11 @@ func run(port int) error { log.NewPrivateLog(constant.LogFileName) zk.AddOption(mw.GrpcClient()) router := api.NewGinRouter(zk) - address := constant.LocalHost + ":" + strconv.Itoa(port) + var address string if config.Config.Api.ListenIP != "" { - address = config.Config.Api.ListenIP + ":" + strconv.Itoa(port) + address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port)) + } else { + address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) } fmt.Println("start api server, address: ", address, ", OpenIM version: ", config.Version) log.ZInfo(context.Background(), "start server success", "address", address, "version", config.Version) diff --git a/cmd/cmdutils/main.go b/cmd/cmdutils/main.go index 577eda1c6..3c1884c24 100644 --- a/cmd/cmdutils/main.go +++ b/cmd/cmdutils/main.go @@ -2,8 +2,6 @@ package main import ( "OpenIM/pkg/common/cmd" - "fmt" - "os" ) func main() { @@ -17,7 +15,6 @@ func main() { cmd.GetCmd.AddCommand(msgCmd.Command) msgUtilsCmd.AddCommand(cmd.GetCmd, cmd.FixCmd, cmd.ClearCmd) if err := msgUtilsCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/crontask/main.go b/cmd/crontask/main.go index 940af2361..0c8dfddb5 100644 --- a/cmd/crontask/main.go +++ b/cmd/crontask/main.go @@ -3,14 +3,11 @@ package main import ( "OpenIM/internal/tools" "OpenIM/pkg/common/cmd" - "fmt" - "os" ) func main() { cronTaskCmd := cmd.NewCronTaskCmd() if err := cronTaskCmd.Exec(tools.StartCronTask); err != nil { - fmt.Println(err) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/msggateway/main.go b/cmd/msggateway/main.go index 60722813c..d69ba47c5 100644 --- a/cmd/msggateway/main.go +++ b/cmd/msggateway/main.go @@ -2,8 +2,6 @@ package main import ( "OpenIM/pkg/common/cmd" - "fmt" - "os" ) func main() { @@ -12,7 +10,6 @@ func main() { msgGatewayCmd.AddPortFlag() msgGatewayCmd.AddPrometheusPortFlag() if err := msgGatewayCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/msgtransfer/main.go b/cmd/msgtransfer/main.go index 1c1ffded6..8a373dfd3 100644 --- a/cmd/msgtransfer/main.go +++ b/cmd/msgtransfer/main.go @@ -2,15 +2,12 @@ package main import ( "OpenIM/pkg/common/cmd" - "fmt" - "os" ) func main() { msgTransferCmd := cmd.NewMsgTransferCmd() msgTransferCmd.AddPrometheusPortFlag() if err := msgTransferCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/push/main.go b/cmd/push/main.go index cbe68d616..61ed722b3 100644 --- a/cmd/push/main.go +++ b/cmd/push/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/push" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { pushCmd.AddPortFlag() pushCmd.AddPrometheusPortFlag() if err := pushCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := pushCmd.StartSvr(config.Config.RpcRegisterName.OpenImPushName, push.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/rpc/auth/main.go b/cmd/rpc/auth/main.go index 99e8f5128..3b91a4648 100644 --- a/cmd/rpc/auth/main.go +++ b/cmd/rpc/auth/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/rpc/auth" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { authCmd.AddPortFlag() authCmd.AddPrometheusPortFlag() if err := authCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := authCmd.StartSvr(config.Config.RpcRegisterName.OpenImAuthName, auth.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/rpc/conversation/main.go b/cmd/rpc/conversation/main.go index 087e6e4f8..1a860ee76 100644 --- a/cmd/rpc/conversation/main.go +++ b/cmd/rpc/conversation/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/rpc/conversation" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImConversationName, conversation.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/rpc/friend/main.go b/cmd/rpc/friend/main.go index 76384f6e5..b9c837435 100644 --- a/cmd/rpc/friend/main.go +++ b/cmd/rpc/friend/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/rpc/friend" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImFriendName, friend.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/rpc/group/main.go b/cmd/rpc/group/main.go index c1633e2fc..fa48dcc4c 100644 --- a/cmd/rpc/group/main.go +++ b/cmd/rpc/group/main.go @@ -1,11 +1,9 @@ package main import ( - "OpenIM/internal/rpc/friend" + "OpenIM/internal/rpc/group" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } - if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImGroupName, friend.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImGroupName, group.Start); err != nil { + panic(err.Error()) } } diff --git a/cmd/rpc/msg/main.go b/cmd/rpc/msg/main.go index e88a8f528..26a29bf79 100644 --- a/cmd/rpc/msg/main.go +++ b/cmd/rpc/msg/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/rpc/msg" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImMsgName, msg.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/rpc/third/main.go b/cmd/rpc/third/main.go index 060697561..f1210de63 100644 --- a/cmd/rpc/third/main.go +++ b/cmd/rpc/third/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/rpc/third" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImThirdName, third.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/cmd/rpc/user/main.go b/cmd/rpc/user/main.go index b4b16685d..7e905062b 100644 --- a/cmd/rpc/user/main.go +++ b/cmd/rpc/user/main.go @@ -4,8 +4,6 @@ import ( "OpenIM/internal/rpc/user" "OpenIM/pkg/common/cmd" "OpenIM/pkg/common/config" - "fmt" - "os" ) func main() { @@ -13,11 +11,9 @@ func main() { rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil { - fmt.Println(err.Error()) - os.Exit(1) + panic(err.Error()) } } diff --git a/config/config.yaml b/config/config.yaml index feebde24d..8b659fc2d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -19,7 +19,7 @@ mysql: dbMaxOpenConns: 100 dbMaxIdleConns: 10 dbMaxLifeTime: 5 - logLevel: 1 #1=slient 2=error 3=warn 4=info + logLevel: 4 #1=slient 2=error 3=warn 4=info slowThreshold: 500 mongo: diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index dbf2a2590..292519105 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -10,7 +10,6 @@ import ( pbMsg "OpenIM/pkg/proto/msg" "OpenIM/pkg/statistics" "OpenIM/pkg/utils" - "context" "fmt" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" @@ -50,9 +49,9 @@ type OnlineHistoryRedisConsumerHandler struct { singleMsgSuccessCountMutex sync.Mutex singleMsgFailedCountMutex sync.Mutex - producerToPush *kafka.Producer - producerToModify *kafka.Producer - producerToMongo *kafka.Producer + //producerToPush *kafka.Producer + //producerToModify *kafka.Producer + //producerToMongo *kafka.Producer msgDatabase controller.MsgDatabase } @@ -66,9 +65,9 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } - och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) - och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic) - och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic) + //och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) + //och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic) + //och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic) och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) @@ -108,7 +107,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { } } if len(modifyMsgList) > 0 { - och.sendMessageToModifyMQ(ctx, msgChannelValue.aggregationID, triggerID, modifyMsgList) + och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, triggerID, modifyMsgList) } log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) if len(storageMsgList) > 0 { @@ -122,17 +121,18 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { och.singleMsgSuccessCountMutex.Lock() och.singleMsgSuccessCount += uint64(len(storageMsgList)) och.singleMsgSuccessCountMutex.Unlock() - och.SendMessageToMongoCH(ctx, msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) + och.msgDatabase.MsgToMongoMQ(ctx, msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) for _, v := range storageMsgList { - och.sendMessageToPushMQ(ctx, v, msgChannelValue.aggregationID) + och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v) } - for _, x := range notStoragePushMsgList { - och.sendMessageToPushMQ(ctx, x, msgChannelValue.aggregationID) + for _, v := range notStoragePushMsgList { + och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v) } } } else { for _, v := range notStoragePushMsgList { - och.sendMessageToPushMQ(ctx, v, msgChannelValue.aggregationID) + och.msgDatabase.MsgToPushMQ(ctx, msgChannelValue.aggregationID, v) + } } } @@ -239,30 +239,3 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG } return nil } - -func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) { - mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID} - pid, offset, err := och.producerToPush.SendMessage(ctx, mqPushMsg.SourceID, &mqPushMsg) - if err != nil { - log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) - } - return -} - -func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) { - if len(messages) > 0 { - pid, offset, err := och.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) - if err != nil { - log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) - } - } -} - -func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) { - if len(messages) > 0 { - pid, offset, err := och.producerToMongo.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) - if err != nil { - log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) - } - } -} diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index b13e327ff..06f697481 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -35,15 +35,12 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } - + opts := rockscache.NewDefaultOptions() + opts.RandomExpireAdjustment = 0.2 + opts.StrongConsistency = true pbConversation.RegisterConversationServer(server, &conversationServer{ - groupChecker: check.NewGroupChecker(client), - ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, rockscache.Options{ - RandomExpireAdjustment: 0.2, - DisableCacheRead: false, - DisableCacheDelete: false, - StrongConsistency: true, - }), tx.NewGorm(db)), + groupChecker: check.NewGroupChecker(client), + ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, opts), tx.NewGorm(db)), }) return nil } diff --git a/internal/startrpc/start.go b/internal/startrpc/start.go index d47734b9c..c1e442e4f 100644 --- a/internal/startrpc/start.go +++ b/internal/startrpc/start.go @@ -70,7 +70,3 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c } return nil } - -//func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { -// return start(rpcPort, rpcRegisterName, prometheusPort, rpcFn, options) -//} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 0da2f18a3..91d03b444 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -70,18 +70,25 @@ type MsgDatabase interface { DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) - MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error GetUserMaxSeq(ctx context.Context, userID string) (int64, error) GetUserMinSeq(ctx context.Context, userID string) (int64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) + + MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error + MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error + MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) error + MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error } func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase { return &msgDatabase{ - msgDocDatabase: msgDocModel, - cache: cacheModel, - producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic), + msgDocDatabase: msgDocModel, + cache: cacheModel, + producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic), + producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic), + producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic), + producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic), } } @@ -97,6 +104,9 @@ type msgDatabase struct { extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface cache cache.Model producer *kafka.Producer + producerToMongo *kafka.Producer + producerToModify *kafka.Producer + producerToPush *kafka.Producer // model msg unRelationTb.MsgDocModel extendMsgSetModel unRelationTb.ExtendMsgSetModel @@ -175,6 +185,27 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.Ms return err } +func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error { + if len(messages) > 0 { + _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) + return err + } + return nil +} + +func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error { + mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq.MsgData, SourceID: key} + _, _, err := db.producerToPush.SendMessage(ctx, key, &mqPushMsg) + return err +} + +func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { + if len(messages) > 0 { + _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) + return err + } + return nil +} func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { return db.cache.GetUserMaxSeq(ctx, userID) }