mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-03 01:52:17 +08:00
feat: push to kafka log
This commit is contained in:
parent
0f87561472
commit
2236074fd3
@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
||||||
|
for _, storageMsg := range storageList {
|
||||||
|
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
|
||||||
|
}
|
||||||
|
|
||||||
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
||||||
var storageMessageList []*sdkws.MsgData
|
var storageMessageList []*sdkws.MsgData
|
||||||
for _, msg := range storageList {
|
for _, msg := range storageList {
|
||||||
@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||||
for _, v := range msgs {
|
for _, v := range msgs {
|
||||||
|
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
|
||||||
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user