mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-06-29 22:38:27 +08:00
阅后即焚删除本地消息
This commit is contained in:
parent
62ea94f2d2
commit
61a9153567
@ -36,6 +36,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
@ -874,38 +875,40 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco
|
||||
if g.UserID == "" || g.ConversationID == "" || g.MaxSeq <= 0 {
|
||||
continue
|
||||
}
|
||||
newMinSeq := g.MaxSeq + 1
|
||||
//newMinSeq := g.MaxSeq + 1
|
||||
|
||||
// 推进阅读方 min_seq。
|
||||
if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil {
|
||||
log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err,
|
||||
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
|
||||
continue
|
||||
}
|
||||
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID,
|
||||
map[string]any{"min_seq": newMinSeq}); err != nil {
|
||||
log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err,
|
||||
"userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
|
||||
continue
|
||||
}
|
||||
//if err := c.msgClient.SetUserConversationMin(ctx, g.ConversationID, []string{g.UserID}, newMinSeq); err != nil {
|
||||
// log.ZError(ctx, "ClearBurnExpiredMsgs SetUserConversationMin failed", err,
|
||||
// "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
|
||||
// continue
|
||||
//}
|
||||
//if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{g.UserID}, g.ConversationID,
|
||||
// map[string]any{"min_seq": newMinSeq}); err != nil {
|
||||
// log.ZError(ctx, "ClearBurnExpiredMsgs UpdateUsersConversationField failed", err,
|
||||
// "userID", g.UserID, "conversationID", g.ConversationID, "minSeq", newMinSeq)
|
||||
// continue
|
||||
//}
|
||||
// 通知 g.UserID 客户端:会话变更 + 精确删除指定 seqs。
|
||||
// 对端用户在 msg_burn_deadline 中有独立记录,cron 处理其分组时会自行通知,
|
||||
// 无需在此重复推进对端 min_seq 或发送额外通知。
|
||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID})
|
||||
c.conversationNotificationSender.BurnMsgsDeleteNotification(ctx, g.UserID, g.UserID, g.ConversationID, g.Seqs)
|
||||
//c.conversationNotificationSender.ConversationChangeNotification(ctx, g.UserID, []string{g.ConversationID})
|
||||
|
||||
// 物理删除 msg 存储中的焚毁消息(best-effort,失败不中断流程)。
|
||||
if err := c.msgClient.DeleteMsgPhysicalBySeqs(ctx, g.ConversationID, g.Seqs); err != nil {
|
||||
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgPhysicalBySeqs failed", err,
|
||||
"conversationID", g.ConversationID, "seqs", g.Seqs)
|
||||
// 删除焚毁消息并同步通知阅读方客户端(best-effort,失败不中断流程)。
|
||||
if err := c.msgClient.DeleteMsgs(ctx, g.UserID, g.ConversationID, g.Seqs, &msg.DeleteSyncOpt{
|
||||
IsSyncSelf: true,
|
||||
}); err != nil {
|
||||
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgs failed", err,
|
||||
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
|
||||
}
|
||||
|
||||
if err := c.msgBurnDeadlineDB.DeleteByUserConversationSeqs(ctx, g.UserID, g.ConversationID, g.Seqs); err != nil {
|
||||
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteByUserConversationSeqs failed", err,
|
||||
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "ClearBurnExpiredMsgs advanced min_seq", "userID", g.UserID,
|
||||
"conversationID", g.ConversationID, "minSeq", newMinSeq, "seqs", g.Seqs)
|
||||
"conversationID", g.ConversationID, "seqs", g.Seqs)
|
||||
processed++
|
||||
}
|
||||
return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil
|
||||
|
||||
@ -114,6 +114,20 @@ func (x *MsgClient) SetUserConversationsMinSeq(ctx context.Context, conversation
|
||||
return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req))
|
||||
}
|
||||
|
||||
// DeleteMsgs 按 seq 删除消息,行为与 msg RPC DeleteMsgs 一致。
|
||||
func (x *MsgClient) DeleteMsgs(ctx context.Context, userID, conversationID string, seqs []int64, deleteSyncOpt *msg.DeleteSyncOpt) error {
|
||||
if len(seqs) == 0 {
|
||||
return nil
|
||||
}
|
||||
req := &msg.DeleteMsgsReq{
|
||||
ConversationID: conversationID,
|
||||
UserID: userID,
|
||||
Seqs: seqs,
|
||||
DeleteSyncOpt: deleteSyncOpt,
|
||||
}
|
||||
return ignoreResp(x.MsgClient.DeleteMsgs(ctx, req))
|
||||
}
|
||||
|
||||
// DeleteMsgPhysicalBySeqs 按 seq 物理删除会话内的消息(无鉴权)。
|
||||
// 用于阅后即焚、系统级消息清理等场景。
|
||||
func (x *MsgClient) DeleteMsgPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error {
|
||||
|
||||
2
protocol
2
protocol
@ -1 +1 @@
|
||||
Subproject commit 9afba46486484563098e1e77b46cc94e0d85c9dd
|
||||
Subproject commit 1d47a639ede5965901dfe0966b8369444c865e24
|
||||
Loading…
x
Reference in New Issue
Block a user