mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-25 04:32:10 +08:00
update logic and rename method.
This commit is contained in:
parent
cf8d373a8c
commit
bcee8dff5a
@ -841,64 +841,48 @@ func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbcon
|
|||||||
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if req.NeedDeleteTime == 0 {
|
if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 {
|
||||||
return nil, errs.ErrArgs.WrapMsg("need_delete_time need be set")
|
return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set")
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
|
if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 {
|
||||||
|
return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set")
|
||||||
var conversationIDs []string
|
|
||||||
if len(req.ConversationIDs) == 0 {
|
|
||||||
conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
conversationIDs = req.ConversationIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check Conversation have a specific status
|
|
||||||
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, conversationIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(conversations) == 0 {
|
|
||||||
return nil, errs.ErrRecordNotFound.Wrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
needCheckConversationIDs := make([]string, 0, len(conversations))
|
|
||||||
|
|
||||||
for _, conversation := range conversations {
|
|
||||||
if conversation.IsPinned {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
needCheckConversationIDs = append(needCheckConversationIDs, conversation.ConversationID)
|
|
||||||
}
|
|
||||||
|
|
||||||
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
|
||||||
UserID: req.OwnerUserID,
|
|
||||||
ConversationIDs: needCheckConversationIDs,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var needDeleteConversationIDs []string
|
var needDeleteConversationIDs []string
|
||||||
for conversationID, msg := range latestMsgs.Msgs {
|
|
||||||
if msg.SendTime < deleteTimeThreshold {
|
|
||||||
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(needDeleteConversationIDs) == 0 {
|
if len(req.ConversationIDs) == 0 {
|
||||||
return &pbconversation.DeleteConversationsResp{}, nil
|
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
|
||||||
|
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
||||||
|
UserID: req.OwnerUserID,
|
||||||
|
ConversationIDs: conversationIDs,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for conversationID, msg := range latestMsgs.Msgs {
|
||||||
|
if msg.SendTime < deleteTimeThreshold {
|
||||||
|
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(needDeleteConversationIDs) == 0 {
|
||||||
|
return &pbconversation.DeleteConversationsResp{}, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
needDeleteConversationIDs = req.ConversationIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
|
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
|
|
||||||
|
// c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
|
||||||
|
|
||||||
return &pbconversation.DeleteConversationsResp{}, nil
|
return &pbconversation.DeleteConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
3
pkg/common/storage/cache/conversation.go
vendored
3
pkg/common/storage/cache/conversation.go
vendored
@ -16,6 +16,7 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ type ConversationCache interface {
|
|||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
||||||
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
|
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
|
||||||
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
|
DelUserPinnedConversations(userIDs ...string) ConversationCache
|
||||||
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
|
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
|
||||||
|
|
||||||
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
||||||
|
|||||||
@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
|
|||||||
return cache
|
return cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
|
func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
|
||||||
cache := c.CloneConversationCache()
|
cache := c.CloneConversationCache()
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
|
cache.AddKeys(c.getPinnedConversationIDsKey(userID))
|
||||||
|
|||||||
@ -122,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
|
|||||||
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
||||||
}
|
}
|
||||||
if _, ok := fieldMap["is_pinned"]; ok {
|
if _, ok := fieldMap["is_pinned"]; ok {
|
||||||
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
|
cache = cache.DelUserPinnedConversations(userIDs...)
|
||||||
}
|
}
|
||||||
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
|
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
|
||||||
}
|
}
|
||||||
@ -174,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
|
|||||||
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
|
||||||
}
|
}
|
||||||
if _, ok := args["is_pinned"]; ok {
|
if _, ok := args["is_pinned"]; ok {
|
||||||
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
|
cache = cache.DelUserPinnedConversations(userIDs...)
|
||||||
}
|
}
|
||||||
return cache.ChainExecDel(ctx)
|
return cache.ChainExecDel(ctx)
|
||||||
}
|
}
|
||||||
@ -205,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
|
|||||||
DelUserConversationIDsHash(userIDs...).
|
DelUserConversationIDsHash(userIDs...).
|
||||||
DelConversationVersionUserIDs(userIDs...).
|
DelConversationVersionUserIDs(userIDs...).
|
||||||
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
|
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
|
||||||
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
|
DelUserPinnedConversations(pinnedUserIDs...).
|
||||||
ChainExecDel(ctx)
|
ChainExecDel(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
|
|||||||
cache := c.cache.CloneConversationCache()
|
cache := c.cache.CloneConversationCache()
|
||||||
cache = cache.DelConversationVersionUserIDs(ownerUserID).
|
cache = cache.DelConversationVersionUserIDs(ownerUserID).
|
||||||
DelConversationNotNotifyMessageUserIDs(ownerUserID).
|
DelConversationNotNotifyMessageUserIDs(ownerUserID).
|
||||||
DelConversationPinnedMessageUserIDs(ownerUserID)
|
DelUserPinnedConversations(ownerUserID)
|
||||||
|
|
||||||
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
||||||
return e.GroupID, e.GroupID != ""
|
return e.GroupID, e.GroupID != ""
|
||||||
@ -444,7 +444,7 @@ func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, use
|
|||||||
DelConversationIDs(userID).
|
DelConversationIDs(userID).
|
||||||
DelUserConversationIDsHash(userID).
|
DelUserConversationIDsHash(userID).
|
||||||
DelConversationNotNotifyMessageUserIDs(userID).
|
DelConversationNotNotifyMessageUserIDs(userID).
|
||||||
DelConversationPinnedMessageUserIDs(userID)
|
DelUserPinnedConversations(userID)
|
||||||
|
|
||||||
return cache.ChainExecDel(ctx)
|
return cache.ChainExecDel(ctx)
|
||||||
})
|
})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user