From e51de81642ba16be3342381c8ffa401c0c138492 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 6 Feb 2025 16:21:43 +0800 Subject: [PATCH] fix: mq --- cmd/main.go | 12 ++- internal/msgtransfer/init.go | 14 ++- .../msgtransfer/online_history_msg_handler.go | 85 ++++++------------- internal/push/callback.go | 1 + internal/push/offlinepush_handler.go | 30 ++----- internal/push/onlinepusher.go | 6 +- internal/push/push.go | 57 +++++++++++-- internal/push/push_handler.go | 34 ++------ internal/rpc/msg/server.go | 11 ++- pkg/common/storage/controller/msg.go | 29 +++---- pkg/common/storage/controller/msg_transfer.go | 59 +++++++------ pkg/common/storage/controller/push.go | 27 +++--- pkg/common/storage/database/mgo/msg_test.go | 52 ++++++++++-- pkg/mqbuild/builder.go | 60 +++++++++++++ pkg/tools/batcher/batcher.go | 9 +- 15 files changed, 284 insertions(+), 202 deletions(-) create mode 100644 pkg/mqbuild/builder.go diff --git a/cmd/main.go b/cmd/main.go index 75636ec1a..23f325fac 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -35,8 +35,15 @@ import ( func main() { var configPath string - flag.StringVar(&configPath, "c", "/Users/chao/Desktop/code/open-im-server/config", "config path") + flag.StringVar(&configPath, "c", "", "config path") flag.Parse() + if configPath == "" { + if runtime.GOOS == "linux" { + configPath = "/root/dt/open-im-server/config" + } else { + configPath = "/Users/chao/Desktop/code/open-im-server/config" + } + } cmd := newCmds(configPath) putCmd1(cmd, auth.Start) putCmd1(cmd, conversation.Start) @@ -52,8 +59,9 @@ func main() { ctx := context.Background() if err := cmd.run(ctx); err != nil { fmt.Println(err) + return } - fmt.Println("success") + fmt.Println("exit") } func getTypePath(typ reflect.Type) string { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 85f208047..be0e94ba7 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -26,6 +26,7 @@ import ( "syscall" disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" + "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/jsonutil" @@ -76,6 +77,8 @@ type Config struct { func Start(ctx context.Context, index int, config *Config) error { runTimeEnv := runtimeenv.PrintRuntimeEnvironment() + builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) + log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) @@ -107,7 +110,14 @@ func Start(ctx context.Context, index int, config *Config) error { }) cm.Watch(ctx) } - + mongoProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToMongoTopic) + if err != nil { + return err + } + pushProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToPushTopic) + if err != nil { + return err + } msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err @@ -123,7 +133,7 @@ func Start(ctx context.Context, index int, config *Config) error { return err } seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) - msgTransferDatabase, err := controller.NewMsgTransferDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) + msgTransferDatabase, err := controller.NewMsgTransferDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, mongoProducer, pushProducer) if err != nil { return err } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6334c95fd..66bc562dc 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,14 +18,14 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/tools/discovery" - "strconv" - "strings" + "github.com/openimsdk/tools/mq" + "sync" "time" - "github.com/IBM/sarama" "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" @@ -37,7 +37,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" ) @@ -64,9 +63,7 @@ type userHasReadSeq struct { } type OnlineHistoryRedisConsumerHandler struct { - historyConsumerGroup *kafka.MConsumerGroup - - redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] + redisMessageBatches *batcher.Batcher[ConsumerMessage] msgTransferDatabase controller.MsgTransferDatabase conversationUserHasReadChan chan *userHasReadSeq @@ -76,12 +73,13 @@ type OnlineHistoryRedisConsumerHandler struct { conversationClient *rpcli.ConversationClient } -func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { - kafkaConf := config.KafkaConfig - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false) - if err != nil { - return nil, err - } +type ConsumerMessage struct { + Ctx context.Context + Key string + Value []byte +} + +func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase, historyConsumer mq.Consumer) (*OnlineHistoryRedisConsumerHandler, error) { groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) if err != nil { return nil, err @@ -97,7 +95,7 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. och.conversationClient = rpcli.NewConversationClient(conversationConn) och.wg.Add(1) - b := batcher.New[sarama.ConsumerMessage]( + b := batcher.New[ConsumerMessage]( batcher.WithSize(size), batcher.WithWorker(worker), batcher.WithInterval(interval), @@ -109,16 +107,15 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. hashCode := stringutil.GetHashCode(key) return int(hashCode) % och.redisMessageBatches.Worker() } - b.Key = func(consumerMessage *sarama.ConsumerMessage) string { - return string(consumerMessage.Key) + b.Key = func(consumerMessage *ConsumerMessage) string { + return consumerMessage.Key } b.Do = och.do och.redisMessageBatches = b - och.historyConsumerGroup = historyConsumerGroup return &och, nil } -func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) { +func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) { ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID()) ctxMessages := och.parseConsumerMessages(ctx, val.Val()) ctx = withAggregationCtx(ctx, ctxMessages) @@ -189,7 +186,7 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, } -func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { +func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*ConsumerMessage) []*ContextMsg { var ctxMessages []*ContextMsg for i := 0; i < len(consumerMessages); i++ { ctxMsg := &ContextMsg{} @@ -199,16 +196,9 @@ func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context. log.ZWarn(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) continue } - var arr []string - for i, header := range consumerMessages[i].Headers { - arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value)) - } - log.ZDebug(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), - "header", strings.Join(arr, ", ")) - ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) + ctxMsg.ctx = consumerMessages[i].Ctx ctxMsg.message = msgFromMQ - log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key", - string(consumerMessages[i].Key)) + log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key", consumerMessages[i].Key) ctxMessages = append(ctxMessages, ctxMsg) } return ctxMessages @@ -383,7 +373,9 @@ func (och *OnlineHistoryRedisConsumerHandler) Close() { func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) - _, _, _ = och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) + if err := och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message); err != nil { + log.ZError(ctx, "msg to push topic error", err, "msg", v.message.String()) + } } } @@ -401,35 +393,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte return mcontext.SetOperationID(ctx, allMessageOperationID) } -func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { +func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group + err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value}) + if err != nil { + log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value) + } return nil } - -func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", - claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) { - session.MarkMessage(lastMessage, "") - session.Commit() - } - for { - select { - case msg, ok := <-claim.Messages(): - if !ok { - return nil - } - - if len(msg.Value) == 0 { - continue - } - err := och.redisMessageBatches.Put(context.Background(), msg) - if err != nil { - log.ZWarn(context.Background(), "put msg to error", err, "msg", msg) - } - case <-session.Context().Done(): - return nil - } - } -} diff --git a/internal/push/callback.go b/internal/push/callback.go index f8e17bb8c..2d1b8090d 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -17,6 +17,7 @@ package push import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index 5c69da005..eaf6b8ed8 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -3,7 +3,6 @@ package push import ( "context" - "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -12,40 +11,21 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/protobuf/proto" ) type OfflinePushConsumerHandler struct { - OfflinePushConsumerGroup *kafka.MConsumerGroup - offlinePusher offlinepush.OfflinePusher + offlinePusher offlinepush.OfflinePusher } -func NewOfflinePushConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher) (*OfflinePushConsumerHandler, error) { - var offlinePushConsumerHandler OfflinePushConsumerHandler - var err error - offlinePushConsumerHandler.offlinePusher = offlinePusher - offlinePushConsumerHandler.OfflinePushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToOfflineGroupID, - []string{config.KafkaConfig.ToOfflinePushTopic}, true) - if err != nil { - return nil, err +func NewOfflinePushConsumerHandler(offlinePusher offlinepush.OfflinePusher) *OfflinePushConsumerHandler { + return &OfflinePushConsumerHandler{ + offlinePusher: offlinePusher, } - return &offlinePushConsumerHandler, nil } -func (*OfflinePushConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } -func (*OfflinePushConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } -func (o *OfflinePushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for msg := range claim.Messages() { - ctx := o.OfflinePushConsumerGroup.GetContextFromMsg(msg) - o.handleMsg2OfflinePush(ctx, msg.Value) - sess.MarkMessage(msg, "") - } - return nil -} - -func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) { +func (o *OfflinePushConsumerHandler) HandleMsg2OfflinePush(ctx context.Context, msg []byte) { offlinePushMsg := pbpush.PushMsgReq{} if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil { log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg)) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index d02766121..dc0b37089 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -30,13 +30,11 @@ func newEmptyOnlinePusher() *emptyOnlinePusher { return &emptyOnlinePusher{} } -func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, - pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { +func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { log.ZInfo(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil) return nil, nil } -func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, - wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { +func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { log.ZInfo(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil) return nil } diff --git a/internal/push/push.go b/internal/push/push.go index 748463d65..cab910da6 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -2,14 +2,19 @@ package push import ( "context" + "math/rand" + "strconv" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" + "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" ) @@ -19,8 +24,6 @@ type pushServer struct { database controller.PushDatabase disCov discovery.Conn offlinePusher offlinepush.OfflinePusher - pushCh *ConsumerHandler - offlinePushCh *OfflinePushConsumerHandler } type Config struct { @@ -57,30 +60,66 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } + builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) - database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig) + offlinePushProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToOfflinePushTopic) + if err != nil { + return err + } + database := controller.NewPushDatabase(cacheModel, offlinePushProducer) - consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client) + pushConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToPushTopic) + if err != nil { + return err + } + offlinePushConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToOfflinePushTopic) if err != nil { return err } - offlinePushConsumer, err := NewOfflinePushConsumerHandler(config, offlinePusher) + pushHandler, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client) if err != nil { return err } + offlineHandler := NewOfflinePushConsumerHandler(offlinePusher) + pbpush.RegisterPushMsgServiceServer(server, &pushServer{ database: database, disCov: client, offlinePusher: offlinePusher, - pushCh: consumer, - offlinePushCh: offlinePushConsumer, }) - go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer) + go func() { + pushHandler.WaitCache() + fn := func(ctx context.Context, key string, value []byte) error { + pushHandler.HandleMs2PsChat(ctx, value) + return nil + } + consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) + log.ZInfo(consumerCtx, "begin consume messages") + for { + if err := pushConsumer.Subscribe(consumerCtx, fn); err != nil { + log.ZError(consumerCtx, "subscribe err", err) + return + } + } + }() - go offlinePushConsumer.OfflinePushConsumerGroup.RegisterHandleAndConsumer(ctx, offlinePushConsumer) + go func() { + fn := func(ctx context.Context, key string, value []byte) error { + offlineHandler.HandleMsg2OfflinePush(ctx, value) + return nil + } + consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) + log.ZInfo(consumerCtx, "begin consume messages") + for { + if err := offlinePushConsumer.Subscribe(consumerCtx, fn); err != nil { + log.ZError(consumerCtx, "subscribe err", err) + return + } + } + }() return nil } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index dad9b53b7..e8240f031 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -3,13 +3,8 @@ package push import ( "context" "encoding/json" - "math/rand" - "strconv" "time" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - - "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -17,6 +12,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" @@ -25,7 +21,6 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" @@ -34,7 +29,7 @@ import ( ) type ConsumerHandler struct { - pushConsumerGroup *kafka.MConsumerGroup + //pushConsumerGroup mq.Consumer offlinePusher offlinepush.OfflinePusher onlinePusher OnlinePusher pushDatabase controller.PushDatabase @@ -49,15 +44,9 @@ type ConsumerHandler struct { conversationClient *rpcli.ConversationClient } -func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, - client discovery.Conn) (*ConsumerHandler, error) { +func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, client discovery.Conn) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler var err error - consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic}, true) - if err != nil { - return nil, err - } userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return nil, err @@ -93,7 +82,7 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller return &consumerHandler, nil } -func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { +func (c *ConsumerHandler) HandleMs2PsChat(ctx context.Context, msg []byte) { msgFromMQ := pbpush.PushMsgReq{} if err := proto.Unmarshal(msg, &msgFromMQ); err != nil { log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg)) @@ -127,25 +116,12 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } } -func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } - -func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } - -func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { +func (c *ConsumerHandler) WaitCache() { c.onlineCache.Lock.Lock() for c.onlineCache.CurrentPhase.Load() < rpccache.DoSubscribeOver { c.onlineCache.Cond.Wait() } c.onlineCache.Lock.Unlock() - ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10)) - log.ZInfo(ctx, "begin consume messages") - - for msg := range claim.Messages() { - ctx := c.pushConsumerGroup.GetContextFromMsg(msg) - c.handleMs2PsChat(ctx, msg.Value) - sess.MarkMessage(msg, "") - } - return nil } // Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType. diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index a85b88b0b..1025e9f77 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -17,6 +17,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/mqbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -78,6 +79,11 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { + builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig) + redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic) + if err != nil { + return err + } mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err @@ -105,10 +111,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr return err } seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) - if err != nil { - return err - } userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return err @@ -126,6 +128,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr return err } conversationClient := rpcli.NewConversationClient(conversationConn) + msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, redisProducer) s := &msgServer{ MsgDatabase: msgDatabase, StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg), diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index a93d581eb..82841c41c 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -18,7 +18,11 @@ import ( "context" "encoding/json" "errors" + + "github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/utils/jsonutil" + "google.golang.org/protobuf/proto" + "strconv" "strings" "time" @@ -29,7 +33,6 @@ import ( "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" @@ -37,7 +40,6 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" ) @@ -101,22 +103,14 @@ type CommonMsgDatabase interface { GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error) } -func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { - conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) - if err != nil { - return nil, err - } - producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic) - if err != nil { - return nil, err - } +func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, producer mq.Producer) CommonMsgDatabase { return &commonMsgDatabase{ msgDocDatabase: msgDocModel, msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, - producer: producerToRedis, - }, nil + producer: producer, + } } type commonMsgDatabase struct { @@ -125,12 +119,15 @@ type commonMsgDatabase struct { msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser - producer *kafka.Producer + producer mq.Producer } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { - _, _, err := db.producer.SendMessage(ctx, key, msg2mq) - return err + data, err := proto.Marshal(msg2mq) + if err != nil { + return err + } + return db.producer.SendMessage(ctx, key, data) } func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index f4c0c6270..260daf5c4 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -2,11 +2,13 @@ package controller import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mq" "github.com/openimsdk/tools/utils/datautil" + "google.golang.org/protobuf/proto" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -14,7 +16,6 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" "go.mongodb.org/mongo-driver/mongo" ) @@ -32,30 +33,30 @@ type MsgTransferDatabase interface { SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error // to mq - MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) + MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) error MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error } -func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) { - conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) - if err != nil { - return nil, err - } - producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic) - if err != nil { - return nil, err - } - producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic) - if err != nil { - return nil, err - } +func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, mongoProducer, pushProducer mq.Producer) (MsgTransferDatabase, error) { + //conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) + //if err != nil { + // return nil, err + //} + //producerToMongo, err := kafka.NewKafkaProducerV2(conf, kafkaConf.Address, kafkaConf.ToMongoTopic) + //if err != nil { + // return nil, err + //} + //producerToPush, err := kafka.NewKafkaProducerV2(conf, kafkaConf.Address, kafkaConf.ToPushTopic) + //if err != nil { + // return nil, err + //} return &msgTransferDatabase{ msgDocDatabase: msgDocModel, msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, - producerToMongo: producerToMongo, - producerToPush: producerToPush, + producerToMongo: mongoProducer, + producerToPush: pushProducer, }, nil } @@ -65,8 +66,8 @@ type msgTransferDatabase struct { msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser - producerToMongo *kafka.Producer - producerToPush *kafka.Producer + producerToMongo mq.Producer + producerToPush mq.Producer } func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { @@ -281,19 +282,25 @@ func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversati return nil } -func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { - partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) +func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) error { + data, err := proto.Marshal(&pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) if err != nil { - log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) - return 0, 0, err + return err } - return partition, offset, nil + if err := db.producerToPush.SendMessage(ctx, key, data); err != nil { + log.ZError(ctx, "MsgToPushMQ", err, "key", key, "conversationID", conversationID) + return err + } + return nil } func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { if len(messages) > 0 { - _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) + data, err := proto.Marshal(&pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) if err != nil { + return err + } + if err := db.producerToMongo.SendMessage(ctx, key, data); err != nil { log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq) return err } diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go index 91ef126fe..ce62a7258 100644 --- a/pkg/common/storage/controller/push.go +++ b/pkg/common/storage/controller/push.go @@ -17,12 +17,12 @@ package controller import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/mq" + "google.golang.org/protobuf/proto" ) type PushDatabase interface { @@ -32,21 +32,13 @@ type PushDatabase interface { type pushDataBase struct { cache cache.ThirdCache - producerToOfflinePush *kafka.Producer + producerToOfflinePush mq.Producer } -func NewPushDatabase(cache cache.ThirdCache, kafkaConf *config.Kafka) PushDatabase { - conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) - if err != nil { - return nil - } - producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic) - if err != nil { - return nil - } +func NewPushDatabase(cache cache.ThirdCache, offlinePushProducer mq.Producer) PushDatabase { return &pushDataBase{ cache: cache, - producerToOfflinePush: producerToOfflinePush, + producerToOfflinePush: offlinePushProducer, } } @@ -55,7 +47,12 @@ func (p *pushDataBase) DelFcmToken(ctx context.Context, userID string, platformI } func (p *pushDataBase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error { - _, _, err := p.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs}) - log.ZInfo(ctx, "message is push to offlinePush topic", "key", key, "userIDs", userIDs, "msg", msg2mq.String()) + data, err := proto.Marshal(&push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs}) + if err != nil { + return err + } + if err := p.producerToOfflinePush.SendMessage(ctx, key, data); err != nil { + log.ZError(ctx, "message is push to offlinePush topic", err, "key", key, "userIDs", userIDs, "msg", msg2mq.String()) + } return err } diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go index 992090552..8e85e302d 100644 --- a/pkg/common/storage/database/mgo/msg_test.go +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -2,16 +2,17 @@ package mgo import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/mongoutil" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "math" "math/rand" "strconv" "testing" "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) func TestName1(t *testing.T) { @@ -93,7 +94,7 @@ func TestName3(t *testing.T) { func TestName4(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) defer cancel() - cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) msg, err := NewMsgMongo(cli.Database("openim_v3")) if err != nil { @@ -109,6 +110,41 @@ func TestName4(t *testing.T) { } func TestName5(t *testing.T) { - var v time.Time - t.Log(v.UnixMilli()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + defer cancel() + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + + tmp, err := NewMsgMongo(cli.Database("openim_v3")) + if err != nil { + panic(err) + } + msg := tmp.(*MsgMgo) + ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli() + t.Log(ts) + var seqs []int64 + for i := 1; i < 256; i++ { + seqs = append(seqs, int64(i)) + } + res, err := msg.FindSeqs(ctx, "si_4924054191_9511766539", seqs) + if err != nil { + panic(err) + } + t.Log(res) +} + +func TestName6(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + defer cancel() + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.135:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + + tmp, err := NewMsgMongo(cli.Database("openim_v3")) + if err != nil { + panic(err) + } + msg := tmp.(*MsgMgo) + seq, sendTime, err := msg.findBeforeSendTime(ctx, "si_4924054191_9511766539", 1144) + if err != nil { + panic(err) + } + t.Log(seq, sendTime) } diff --git a/pkg/mqbuild/builder.go b/pkg/mqbuild/builder.go new file mode 100644 index 000000000..6c369f23a --- /dev/null +++ b/pkg/mqbuild/builder.go @@ -0,0 +1,60 @@ +package mqbuild + +import ( + "context" + "fmt" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/mq" + "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/mq/simmq" +) + +type Builder interface { + GetTopicProducer(ctx context.Context, topic string) (mq.Producer, error) + GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error) +} + +func NewBuilder(discovery *config.Discovery, kafka *config.Kafka) Builder { + if discovery.Enable == config.Standalone { + return standaloneBuilder{} + } + return &kafkaBuilder{ + addr: kafka.Address, + config: kafka.Build(), + topicGroupID: map[string]string{ + kafka.ToRedisTopic: kafka.ToRedisGroupID, + kafka.ToMongoTopic: kafka.ToMongoGroupID, + kafka.ToPushTopic: kafka.ToPushGroupID, + kafka.ToOfflinePushTopic: kafka.ToOfflineGroupID, + }, + } +} + +type standaloneBuilder struct{} + +func (standaloneBuilder) GetTopicProducer(ctx context.Context, topic string) (mq.Producer, error) { + return simmq.GetTopicProducer(topic), nil +} + +func (standaloneBuilder) GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error) { + return simmq.GetTopicConsumer(topic), nil +} + +type kafkaBuilder struct { + addr []string + config *kafka.Config + topicGroupID map[string]string +} + +func (x *kafkaBuilder) GetTopicProducer(ctx context.Context, topic string) (mq.Producer, error) { + return kafka.NewKafkaProducerV2(x.config, x.addr, topic) +} + +func (x *kafkaBuilder) GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error) { + groupID, ok := x.topicGroupID[topic] + if !ok { + return nil, fmt.Errorf("topic %s groupID not found", topic) + } + return kafka.NewMConsumerGroupV2(ctx, x.config, groupID, []string{topic}, true) +} diff --git a/pkg/tools/batcher/batcher.go b/pkg/tools/batcher/batcher.go index 163aeed39..dcf5d07ad 100644 --- a/pkg/tools/batcher/batcher.go +++ b/pkg/tools/batcher/batcher.go @@ -3,11 +3,12 @@ package batcher import ( "context" "fmt" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/idutil" "strings" "sync" "time" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/idutil" ) var ( @@ -245,7 +246,9 @@ func (b *Batcher[T]) distributeMessage(messages map[string][]*T, totalCount int, if b.config.syncWait { b.counter.Wait() } - b.OnComplete(lastMessage, totalCount) + if b.OnComplete != nil { + b.OnComplete(lastMessage, totalCount) + } } func (b *Batcher[T]) run(channelID int, ch <-chan *Msg[T]) {