mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-06-24 02:28:11 +08:00
群阅后即焚
This commit is contained in:
parent
89d65f0d2b
commit
35152a2832
@ -298,8 +298,8 @@ func (o *GroupApi) GetEditSetting(c *gin.Context) {
|
||||
// SetMsgBurnDuration 设置群消息阅后即焚时长(秒);burnDuration=0 表示关闭。
|
||||
func (o *GroupApi) SetMsgBurnDuration(c *gin.Context) {
|
||||
var req struct {
|
||||
GroupID string `json:"groupID"`
|
||||
BurnDuration int32 `json:"burnDuration"`
|
||||
GroupID string `json:"groupID"`
|
||||
BurnDuration int32 `json:"burnDuration"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
apiresp.GinError(c, errs.ErrArgs.WrapMsg(err.Error()))
|
||||
@ -324,6 +324,40 @@ func (o *GroupApi) SetMsgBurnDuration(c *gin.Context) {
|
||||
apiresp.GinSuccess(c, resp)
|
||||
}
|
||||
|
||||
// GetMsgBurnDuration 返回当前群消息阅后即焚时长(秒);0 表示关闭(与 get_groups_info 中字段一致)。
|
||||
func (o *GroupApi) GetMsgBurnDuration(c *gin.Context) {
|
||||
var req struct {
|
||||
GroupID string `json:"groupID"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
apiresp.GinError(c, errs.ErrArgs.WrapMsg(err.Error()))
|
||||
return
|
||||
}
|
||||
if req.GroupID == "" {
|
||||
apiresp.GinError(c, errs.ErrArgs.WrapMsg("groupID is empty"))
|
||||
return
|
||||
}
|
||||
resp, err := o.Client.GetGroupsInfo(c.Request.Context(), &group.GetGroupsInfoReq{
|
||||
GroupIDs: []string{req.GroupID},
|
||||
})
|
||||
if err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
if len(resp.GroupInfos) == 0 {
|
||||
apiresp.GinError(c, errs.ErrRecordNotFound.WrapMsg("group not found", "groupID", req.GroupID))
|
||||
return
|
||||
}
|
||||
gi := resp.GroupInfos[0]
|
||||
apiresp.GinSuccess(c, struct {
|
||||
GroupID string `json:"groupID"`
|
||||
BurnDuration int32 `json:"burnDuration"`
|
||||
}{
|
||||
GroupID: gi.GroupID,
|
||||
BurnDuration: gi.GetMsgBurnDuration(),
|
||||
})
|
||||
}
|
||||
|
||||
func (o *GroupApi) JoinGroup(c *gin.Context) {
|
||||
a2r.Call(c, group.GroupClient.JoinGroup, o.Client)
|
||||
}
|
||||
|
||||
@ -242,6 +242,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co
|
||||
groupRouterGroup.POST("/set_edit_setting", g.SetEditSetting)
|
||||
groupRouterGroup.POST("/get_edit_setting", g.GetEditSetting)
|
||||
groupRouterGroup.POST("/set_msg_burn_duration", g.SetMsgBurnDuration)
|
||||
groupRouterGroup.POST("/get_msg_burn_duration", g.GetMsgBurnDuration)
|
||||
groupRouterGroup.POST("/join_group", g.JoinGroup)
|
||||
groupRouterGroup.POST("/quit_group", g.QuitGroup)
|
||||
groupRouterGroup.POST("/group_application_response", g.ApplicationGroupResponse)
|
||||
|
||||
@ -902,7 +902,7 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco
|
||||
|
||||
// 删除焚毁消息并同步通知阅读方客户端(best-effort,失败不中断流程)。
|
||||
if err := c.msgClient.DeleteMsgs(ctx, g.UserID, g.ConversationID, g.Seqs, &msg.DeleteSyncOpt{
|
||||
IsSyncSelf: true,
|
||||
IsSyncOther: true,
|
||||
}); err != nil {
|
||||
log.ZError(ctx, "ClearBurnExpiredMsgs DeleteMsgs failed", err,
|
||||
"userID", g.UserID, "conversationID", g.ConversationID, "seqs", g.Seqs)
|
||||
@ -920,12 +920,10 @@ func (c *conversationServer) ClearBurnExpiredMsgs(ctx context.Context, req *pbco
|
||||
return &pbconversation.ClearBurnExpiredMsgsResp{Count: processed}, nil
|
||||
}
|
||||
|
||||
|
||||
// ClearGroupBurnExpiredMsgs 处理群消息「阅后即焚」到期记录:
|
||||
// 1. 查询满足 read_count >= member_count 且 burn_end_time 过期的记录(按 group_id 聚合)。
|
||||
// 2. 对每个群,获取所有成员 ID,批量推进他们在群会话上的 min_seq。
|
||||
// 3. 更新每个成员的会话 min_seq 并下发 ConversationChangeNotification。
|
||||
// 4. 删除已处理的 group_msg_burn_record 记录。
|
||||
// 2. 对每个群调用 msg.DeleteMsgs(IsSyncOther:物理删除群会话消息并下发 DeleteMsgsNotification)。
|
||||
// 3. 删除已处理的 group_msg_burn_record 记录。
|
||||
func (c *conversationServer) ClearGroupBurnExpiredMsgs(ctx context.Context, req *pbconversation.ClearGroupBurnExpiredMsgsReq) (*pbconversation.ClearGroupBurnExpiredMsgsResp, error) {
|
||||
if c.groupMsgBurnRecordDB == nil {
|
||||
return &pbconversation.ClearGroupBurnExpiredMsgsResp{Count: 0}, nil
|
||||
@ -944,35 +942,20 @@ func (c *conversationServer) ClearGroupBurnExpiredMsgs(ctx context.Context, req
|
||||
continue
|
||||
}
|
||||
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, g.GroupID)
|
||||
newMinSeq := g.MaxSeq + 1
|
||||
|
||||
// 获取群所有成员 ID
|
||||
memberIDs, err := c.groupClient.GetGroupMemberUserIDs(ctx, g.GroupID)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "ClearGroupBurnExpiredMsgs GetGroupMemberUserIDs failed", err,
|
||||
"groupID", g.GroupID)
|
||||
continue
|
||||
// 与 ClearBurnExpiredMsgs 一致:物理删除并同步客户端(best-effort)。
|
||||
var deleteAsUserID string
|
||||
if len(c.config.Share.IMAdminUserID) > 0 {
|
||||
deleteAsUserID = c.config.Share.IMAdminUserID[0]
|
||||
}
|
||||
if len(memberIDs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 批量推进所有成员的 min_seq(seq 层)
|
||||
if err := c.msgClient.SetUserConversationMin(ctx, conversationID, memberIDs, newMinSeq); err != nil {
|
||||
log.ZError(ctx, "ClearGroupBurnExpiredMsgs SetUserConversationMin failed", err,
|
||||
"groupID", g.GroupID, "conversationID", conversationID, "minSeq", newMinSeq)
|
||||
continue
|
||||
}
|
||||
|
||||
// 更新每个成员会话文档中的 min_seq 并发送通知
|
||||
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, memberIDs, conversationID,
|
||||
map[string]any{"min_seq": newMinSeq}); err != nil {
|
||||
log.ZError(ctx, "ClearGroupBurnExpiredMsgs UpdateUsersConversationField failed", err,
|
||||
"groupID", g.GroupID, "conversationID", conversationID, "minSeq", newMinSeq)
|
||||
continue
|
||||
}
|
||||
for _, memberID := range memberIDs {
|
||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, memberID, []string{conversationID})
|
||||
if deleteAsUserID == "" {
|
||||
log.ZWarn(ctx, "ClearGroupBurnExpiredMsgs: IMAdminUserID empty, skip DeleteMsgs", nil,
|
||||
"groupID", g.GroupID, "conversationID", conversationID, "seqs", g.Seqs)
|
||||
} else if err := c.msgClient.DeleteMsgs(ctx, deleteAsUserID, conversationID, g.Seqs, &msg.DeleteSyncOpt{
|
||||
IsSyncOther: true,
|
||||
}); err != nil {
|
||||
log.ZError(ctx, "ClearGroupBurnExpiredMsgs DeleteMsgs failed", err,
|
||||
"groupID", g.GroupID, "conversationID", conversationID, "seqs", g.Seqs)
|
||||
}
|
||||
|
||||
// 删除已处理记录
|
||||
@ -980,9 +963,9 @@ func (c *conversationServer) ClearGroupBurnExpiredMsgs(ctx context.Context, req
|
||||
log.ZError(ctx, "ClearGroupBurnExpiredMsgs DeleteByGroupSeqs failed", err,
|
||||
"groupID", g.GroupID, "seqs", g.Seqs)
|
||||
}
|
||||
log.ZDebug(ctx, "ClearGroupBurnExpiredMsgs advanced min_seq for group",
|
||||
log.ZDebug(ctx, "ClearGroupBurnExpiredMsgs processed group burn batch",
|
||||
"groupID", g.GroupID, "conversationID", conversationID,
|
||||
"minSeq", newMinSeq, "memberCount", len(memberIDs), "seqs", g.Seqs)
|
||||
"seqs", g.Seqs)
|
||||
processed++
|
||||
}
|
||||
return &pbconversation.ClearGroupBurnExpiredMsgsResp{Count: processed}, nil
|
||||
|
||||
@ -266,16 +266,7 @@ func (m *msgServer) recordBurnDeadlines(ctx context.Context, conv *conversation.
|
||||
PeerID: peerID,
|
||||
DeadlineMs: deadline,
|
||||
CreateTime: now,
|
||||
},
|
||||
&model.MsgBurnDeadline{
|
||||
UserID: peerID,
|
||||
ConversationID: conv.ConversationID,
|
||||
Seq: seq,
|
||||
PeerID: readerUserID,
|
||||
DeadlineMs: deadline,
|
||||
CreateTime: now,
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
if err := m.msgBurnDeadlineDB.UpsertIfAbsent(ctx, items); err != nil {
|
||||
log.ZError(ctx, "recordBurnDeadlines UpsertIfAbsent failed", err,
|
||||
|
||||
2
protocol
2
protocol
@ -1 +1 @@
|
||||
Subproject commit 7bd25d8024043c6f8ebd4c6d473a89e7034e639d
|
||||
Subproject commit c5adb72b8d02244a5d3d7d814e607e9b6256fdd6
|
||||
Loading…
x
Reference in New Issue
Block a user