diff --git a/go.mod b/go.mod index c06451aaa..775765706 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.73-alpha.12 - github.com/openimsdk/tools v0.0.50-alpha.96 + github.com/openimsdk/tools v0.0.50-alpha.97 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index d29eb0f3f..329a916ec 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FA github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.96 h1:U44Fq2jHiEvGi9zuYAnTRNx3Xd9T7P/kBAZLHvQ8xg4= -github.com/openimsdk/tools v0.0.50-alpha.96/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= +github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bbec3f9a2..35026c79a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) + historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config) msgTransfer := &MsgTransfer{ historyConsumer: historyConsumer, @@ -161,8 +161,8 @@ func (m *MsgTransfer) Start(ctx context.Context) error { }() go func() { - fn := func(ctx context.Context, key string, value []byte) error { - m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value) + fn := func(msg mq.Message) error { + m.historyMongoHandler.HandleChatWs2Mongo(msg) return nil } for { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 05775a1e6..8b212774a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" "time" @@ -77,6 +78,7 @@ type ConsumerMessage struct { Ctx context.Context Key string Value []byte + Raw mq.Message } func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { @@ -113,6 +115,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. b.Do = och.do och.redisMessageBatches = b + och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) { + lastMessage.Raw.Mark() + lastMessage.Raw.Commit() + } + return &och, nil } func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) { @@ -388,10 +395,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte return mcontext.SetOperationID(ctx, allMessageOperationID) } -func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group - err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value}) +func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group + err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg}) if err != nil { - log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value) + log.ZWarn(msg.Context(), "put msg to error", err, "key", msg.Key(), "value", msg.Value()) } return nil } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 6c1498f82..8611af7ea 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -15,12 +15,12 @@ package msgtransfer import ( - "context" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mq" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" @@ -40,7 +40,10 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabas } } -func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) { +func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) { + ctx := val.Context() + key := val.Key() + msg := val.Value() msgFromMQ := pbmsg.MsgDataToMongoByMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { @@ -58,6 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont prommetrics.MsgInsertMongoFailedCounter.Inc() } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() + val.Mark() } for _, msgData := range msgFromMQ.MsgData { diff --git a/internal/push/push.go b/internal/push/push.go index 1d6f8cb30..bf95b6acc 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -2,6 +2,7 @@ package push import ( "context" + "github.com/openimsdk/tools/mq" "math/rand" "strconv" @@ -106,8 +107,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg go func() { pushHandler.WaitCache() - fn := func(ctx context.Context, key string, value []byte) error { - pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(ctx), value) + fn := func(msg mq.Message) error { + pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(msg.Context()), msg.Value()) return nil } consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) @@ -121,8 +122,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg }() go func() { - fn := func(ctx context.Context, key string, value []byte) error { - offlineHandler.HandleMsg2OfflinePush(ctx, value) + fn := func(msg mq.Message) error { + offlineHandler.HandleMsg2OfflinePush(msg.Context(), msg.Value()) return nil } consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32())))