mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-24 12:02:14 +08:00
feat: implement DeleteConversations interface.
This commit is contained in:
parent
349a8cd9af
commit
c4a1649aae
2
go.mod
2
go.mod
@ -12,7 +12,7 @@ require (
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.12
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.17
|
||||
github.com/openimsdk/tools v0.0.50-alpha.97
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
|
4
go.sum
4
go.sum
@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
|
||||
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
|
@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
|
||||
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
|
||||
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
|
||||
}
|
||||
|
||||
func (o *ConversationApi) DeleteConversations(c *gin.Context) {
|
||||
a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client)
|
||||
}
|
||||
|
@ -277,6 +277,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
|
||||
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
|
||||
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
|
||||
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
|
||||
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -280,6 +280,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
||||
}
|
||||
|
||||
if isNewConversation {
|
||||
// 需要每一条消息 都去检查了 有没有会话
|
||||
|
||||
switch msg.SessionType {
|
||||
case constant.ReadGroupChatType:
|
||||
log.ZDebug(ctx, "group chat first create conversation", "conversationID",
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
|
||||
}
|
||||
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
|
||||
for i, conversation := range conversations {
|
||||
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
|
||||
if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 {
|
||||
continue
|
||||
}
|
||||
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
|
||||
@ -835,3 +836,69 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
|
||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) {
|
||||
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if req.NeedDeleteTime == 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("need_delete_time need be set")
|
||||
}
|
||||
|
||||
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
|
||||
|
||||
var conversationIDs []string
|
||||
if len(req.ConversationIDs) == 0 {
|
||||
conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
conversationIDs = req.ConversationIDs
|
||||
}
|
||||
|
||||
// Check Conversation have a specific status
|
||||
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, conversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(conversations) == 0 {
|
||||
return nil, errs.ErrRecordNotFound.Wrap()
|
||||
}
|
||||
|
||||
needCheckConversationIDs := make([]string, 0, len(conversations))
|
||||
|
||||
for _, conversation := range conversations {
|
||||
if conversation.IsPinned {
|
||||
continue
|
||||
}
|
||||
needCheckConversationIDs = append(needCheckConversationIDs, conversation.ConversationID)
|
||||
}
|
||||
|
||||
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
||||
UserID: req.OwnerUserID,
|
||||
ConversationIDs: needCheckConversationIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var needDeleteConversationIDs []string
|
||||
for conversationID, msg := range latestMsgs.Msgs {
|
||||
if msg.SendTime < deleteTimeThreshold {
|
||||
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(needDeleteConversationIDs) == 0 {
|
||||
return &pbconversation.DeleteConversationsResp{}, nil
|
||||
}
|
||||
|
||||
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
|
||||
|
||||
return &pbconversation.DeleteConversationsResp{}, nil
|
||||
}
|
||||
|
@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
|
||||
|
||||
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
|
||||
}
|
||||
|
||||
func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) {
|
||||
tips := &sdkws.ConversationDeleteTips{
|
||||
UserID: userID,
|
||||
ConversationIDs: conversationIDs,
|
||||
}
|
||||
|
||||
c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips)
|
||||
}
|
||||
|
@ -78,6 +78,8 @@ type ConversationDatabase interface {
|
||||
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||
// FindRandConversation finds random conversations based on the specified timestamp and limit.
|
||||
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
|
||||
|
||||
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
|
||||
}
|
||||
|
||||
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||
@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
|
||||
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
|
||||
return c.conversationDB.FindRandConversation(ctx, ts, limit)
|
||||
}
|
||||
|
||||
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
|
||||
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cache := c.cache.CloneConversationCache()
|
||||
cache = cache.DelConversations(userID, conversationIDs...).
|
||||
DelConversationVersionUserIDs(userID).
|
||||
DelConversationIDs(userID).
|
||||
DelUserConversationIDsHash(userID).
|
||||
DelConversationNotNotifyMessageUserIDs(userID).
|
||||
DelConversationPinnedMessageUserIDs(userID)
|
||||
|
||||
return cache.ChainExecDel(ctx)
|
||||
})
|
||||
}
|
||||
|
@ -44,4 +44,5 @@ type Conversation interface {
|
||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
||||
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
|
||||
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
|
||||
}
|
||||
|
@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
|
||||
}
|
||||
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
|
||||
}
|
||||
|
||||
func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
|
||||
if len(conversationIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mongoutil.IncrVersion(func() error {
|
||||
err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}})
|
||||
return err
|
||||
}, func() error {
|
||||
for _, conversationID := range conversationIDs {
|
||||
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user