mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 05:02:11 +08:00 
			
		
		
		
	fix: performance issues with Kafka caused by encapsulating the MQ interface
This commit is contained in:
		
							parent
							
								
									4c9fdf70db
								
							
						
					
					
						commit
						eebff88a29
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -13,7 +13,7 @@ require ( | |||||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||||
| 	github.com/mitchellh/mapstructure v1.5.0 | 	github.com/mitchellh/mapstructure v1.5.0 | ||||||
| 	github.com/openimsdk/protocol v0.0.73-alpha.12 | 	github.com/openimsdk/protocol v0.0.73-alpha.12 | ||||||
| 	github.com/openimsdk/tools v0.0.50-alpha.96 | 	github.com/openimsdk/tools v0.0.50-alpha.97 | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.18.0 | 	github.com/prometheus/client_golang v1.18.0 | ||||||
| 	github.com/stretchr/testify v1.9.0 | 	github.com/stretchr/testify v1.9.0 | ||||||
|  | |||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FA | |||||||
| github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||||
| github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= | github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= | ||||||
| github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.96 h1:U44Fq2jHiEvGi9zuYAnTRNx3Xd9T7P/kBAZLHvQ8xg4= | github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= | ||||||
| github.com/openimsdk/tools v0.0.50-alpha.96/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||||
| github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= | github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= | ||||||
| github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= | github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= | ||||||
|  | |||||||
| @ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config) | 	historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config) | ||||||
| 
 | 
 | ||||||
| 	msgTransfer := &MsgTransfer{ | 	msgTransfer := &MsgTransfer{ | ||||||
| 		historyConsumer:      historyConsumer, | 		historyConsumer:      historyConsumer, | ||||||
| @ -161,8 +161,8 @@ func (m *MsgTransfer) Start(ctx context.Context) error { | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	go func() { | 	go func() { | ||||||
| 		fn := func(ctx context.Context, key string, value []byte) error { | 		fn := func(msg mq.Message) error { | ||||||
| 			m.historyMongoHandler.HandleChatWs2Mongo(ctx, key, value) | 			m.historyMongoHandler.HandleChatWs2Mongo(msg) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		for { | 		for { | ||||||
|  | |||||||
| @ -18,6 +18,7 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"github.com/openimsdk/tools/mq" | ||||||
| 
 | 
 | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| @ -77,6 +78,7 @@ type ConsumerMessage struct { | |||||||
| 	Ctx   context.Context | 	Ctx   context.Context | ||||||
| 	Key   string | 	Key   string | ||||||
| 	Value []byte | 	Value []byte | ||||||
|  | 	Raw   mq.Message | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { | func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { | ||||||
| @ -113,6 +115,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. | |||||||
| 	b.Do = och.do | 	b.Do = och.do | ||||||
| 	och.redisMessageBatches = b | 	och.redisMessageBatches = b | ||||||
| 
 | 
 | ||||||
|  | 	och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) { | ||||||
|  | 		lastMessage.Raw.Mark() | ||||||
|  | 		lastMessage.Raw.Commit() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return &och, nil | 	return &och, nil | ||||||
| } | } | ||||||
| func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) { | func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) { | ||||||
| @ -388,10 +395,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte | |||||||
| 	return mcontext.SetOperationID(ctx, allMessageOperationID) | 	return mcontext.SetOperationID(ctx, allMessageOperationID) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group | func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group | ||||||
| 	err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value}) | 	err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.ZWarn(ctx, "put msg to  error", err, "key", key, "value", value) | 		log.ZWarn(msg.Context(), "put msg to  error", err, "key", msg.Key(), "value", msg.Value()) | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | |||||||
| @ -15,12 +15,12 @@ | |||||||
| package msgtransfer | package msgtransfer | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | 	"github.com/openimsdk/tools/mq" | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | ||||||
| 	"github.com/openimsdk/protocol/constant" |  | ||||||
| 	pbmsg "github.com/openimsdk/protocol/msg" | 	pbmsg "github.com/openimsdk/protocol/msg" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"google.golang.org/protobuf/proto" | 	"google.golang.org/protobuf/proto" | ||||||
| @ -40,7 +40,10 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabas | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Context, key string, msg []byte) { | func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) { | ||||||
|  | 	ctx := val.Context() | ||||||
|  | 	key := val.Key() | ||||||
|  | 	msg := val.Value() | ||||||
| 	msgFromMQ := pbmsg.MsgDataToMongoByMQ{} | 	msgFromMQ := pbmsg.MsgDataToMongoByMQ{} | ||||||
| 	err := proto.Unmarshal(msg, &msgFromMQ) | 	err := proto.Unmarshal(msg, &msgFromMQ) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -58,6 +61,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont | |||||||
| 		prommetrics.MsgInsertMongoFailedCounter.Inc() | 		prommetrics.MsgInsertMongoFailedCounter.Inc() | ||||||
| 	} else { | 	} else { | ||||||
| 		prommetrics.MsgInsertMongoSuccessCounter.Inc() | 		prommetrics.MsgInsertMongoSuccessCounter.Inc() | ||||||
|  | 		val.Mark() | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, msgData := range msgFromMQ.MsgData { | 	for _, msgData := range msgFromMQ.MsgData { | ||||||
|  | |||||||
| @ -2,6 +2,7 @@ package push | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"github.com/openimsdk/tools/mq" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 
 | 
 | ||||||
| @ -106,8 +107,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 
 | 
 | ||||||
| 	go func() { | 	go func() { | ||||||
| 		pushHandler.WaitCache() | 		pushHandler.WaitCache() | ||||||
| 		fn := func(ctx context.Context, key string, value []byte) error { | 		fn := func(msg mq.Message) error { | ||||||
| 			pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(ctx), value) | 			pushHandler.HandleMs2PsChat(authverify.WithTempAdmin(msg.Context()), msg.Value()) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) | 		consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) | ||||||
| @ -121,8 +122,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	go func() { | 	go func() { | ||||||
| 		fn := func(ctx context.Context, key string, value []byte) error { | 		fn := func(msg mq.Message) error { | ||||||
| 			offlineHandler.HandleMsg2OfflinePush(ctx, value) | 			offlineHandler.HandleMsg2OfflinePush(msg.Context(), msg.Value()) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) | 		consumerCtx := mcontext.SetOperationID(context.Background(), "push_"+strconv.Itoa(int(rand.Uint32()))) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user