mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-25 04:32:10 +08:00
fix: the source message of the reference is withdrawn, and the referenced message is deleted (#3137) (#3140)
* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted Co-authored-by: chao <48119764+withchao@users.noreply.github.com>
This commit is contained in:
parent
ab32167b19
commit
7a2c1e48aa
@ -62,3 +62,22 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
|
||||
// msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
|
||||
//}
|
||||
}
|
||||
|
||||
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
|
||||
func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
|
||||
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
|
||||
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
|
||||
for msg := range claim.Messages() {
|
||||
ctx := mc.historyConsumerGroup.GetContextFromMsg(msg)
|
||||
if len(msg.Value) != 0 {
|
||||
mc.handleChatWs2Mongo(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.ZError(ctx, "mongo msg get from kafka but is nil", nil, "conversationID", msg.Key)
|
||||
}
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user