Compare commits

..

1 Commits

9 changed files with 101 additions and 24 deletions

View File

@ -513,6 +513,14 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
}
func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) {
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
}
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
if req.ConversationID == "" {
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
@ -709,6 +717,56 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
}, nil
}
func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return nil, err
}
const batchNum = 100
if num == 0 {
return nil, errs.New("Need Destruct Msg is nil").Wrap()
}
maxPage := (num + batchNum - 1) / batchNum
temp := make([]*dbModel.Conversation, 0, maxPage*batchNum)
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
// log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
for _, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
conversation.LatestMsgDestructTime.IsZero()) {
temp = append(temp, conversation)
}
}
}
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err

View File

@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
@ -73,7 +74,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
return nil, err
}
conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID)
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
if err != nil {
return nil, err
}
@ -115,12 +116,14 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
}
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID)
conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
if err != nil {
return err
}
var existConversations []*conversation.Conversation
var existConversationIDs []string
for _, conversation := range conversations {
existConversations = append(existConversations, conversation)
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
}
log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs)
@ -149,7 +152,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil {
return err
}
for _, conversation := range conversations {
for _, conversation := range existConversations {
tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}}
m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
}

View File

@ -70,7 +70,6 @@ const (
BlockedByPeer = 1302 // Blocked by the peer
NotPeersFriend = 1303 // Not the peer's friend
RelationshipAlreadyError = 1304 // Already in a friend relationship
FriendRequestHandled = 1305 // Friend request has already been handled
// Message error codes.
MessageHasReadDisable = 1401

View File

@ -51,11 +51,10 @@ var (
ErrMessageHasReadDisable = errs.NewCodeError(MessageHasReadDisable, "MessageHasReadDisable")
ErrCanNotAddYourself = errs.NewCodeError(CanNotAddYourselfError, "CanNotAddYourselfError")
ErrBlockedByPeer = errs.NewCodeError(BlockedByPeer, "BlockedByPeer")
ErrNotPeersFriend = errs.NewCodeError(NotPeersFriend, "NotPeersFriend")
ErrRelationshipAlready = errs.NewCodeError(RelationshipAlreadyError, "RelationshipAlreadyError")
ErrFriendRequestHandled = errs.NewCodeError(FriendRequestHandled, "FriendRequestHandled")
ErrCanNotAddYourself = errs.NewCodeError(CanNotAddYourselfError, "CanNotAddYourselfError")
ErrBlockedByPeer = errs.NewCodeError(BlockedByPeer, "BlockedByPeer")
ErrNotPeersFriend = errs.NewCodeError(NotPeersFriend, "NotPeersFriend")
ErrRelationshipAlready = errs.NewCodeError(RelationshipAlreadyError, "RelationshipAlreadyError")
ErrMutedInGroup = errs.NewCodeError(MutedInGroup, "MutedInGroup")
ErrMutedGroup = errs.NewCodeError(MutedGroup, "MutedGroup")

View File

@ -61,6 +61,8 @@ type ConversationDatabase interface {
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
// GetConversationsByConversationID retrieves conversations by their IDs.
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error)
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error)
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
@ -373,6 +375,10 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati
return c.conversationDB.PageConversationIDs(ctx, pagination)
}
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) {
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
}
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) {
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
}

View File

@ -16,9 +16,9 @@ package controller
import (
"context"
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -109,13 +109,15 @@ func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (
// Retrieve friend IDs of userID1 from the cache
userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1)
if err != nil {
return false, false, err
err = fmt.Errorf("error retrieving friend IDs for user %s: %w", userID1, err)
return
}
// Retrieve friend IDs of userID2 from the cache
userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2)
if err != nil {
return false, false, err
err = fmt.Errorf("error retrieving friend IDs for user %s: %w", userID2, err)
return
}
// Check if userID2 is in userID1's friend list and vice versa
@ -212,12 +214,12 @@ func (f *friendDatabase) RefuseFriendRequest(ctx context.Context, friendRequest
// Attempt to retrieve the friend request from the database.
fr, err := f.friendRequest.Take(ctx, friendRequest.FromUserID, friendRequest.ToUserID)
if err != nil {
return err
return fmt.Errorf("failed to retrieve friend request from %s to %s: %w", friendRequest.FromUserID, friendRequest.ToUserID, err)
}
// Check if the friend request has already been handled.
if fr.HandleResult != 0 {
return servererrs.ErrFriendRequestHandled.WrapMsg("friend request has already been processed", "from", friendRequest.FromUserID, "to", friendRequest.ToUserID)
return fmt.Errorf("friend request from %s to %s has already been processed", friendRequest.FromUserID, friendRequest.ToUserID)
}
// Log the action of refusing the friend request for debugging and auditing purposes.
@ -230,7 +232,7 @@ func (f *friendDatabase) RefuseFriendRequest(ctx context.Context, friendRequest
friendRequest.HandleResult = constant.FriendResponseRefuse
friendRequest.HandleTime = time.Now()
if err := f.friendRequest.Update(ctx, friendRequest); err != nil {
return err
return fmt.Errorf("failed to update friend request from %s to %s as refused: %w", friendRequest.FromUserID, friendRequest.ToUserID, err)
}
return nil
@ -348,9 +350,9 @@ func (f *friendDatabase) PageFriendRequestToMe(ctx context.Context, userID strin
func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*model.Friend, err error) {
friends, err = f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
return nil, err
return
}
return friends, nil
return
}
func (f *friendDatabase) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) {

View File

@ -39,6 +39,7 @@ type Conversation interface {
GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)

View File

@ -47,12 +47,6 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
},
Options: options.Index(),
},
{
Keys: bson.D{
{Key: "conversation_id", Value: 1},
},
Options: options.Index().SetUnique(true),
},
})
if err != nil {
return nil, errs.Wrap(err)
@ -238,6 +232,10 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa
return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
}
func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}})
}
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) {
// "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{

View File

@ -2,7 +2,6 @@ package rpcli
import (
"context"
"github.com/openimsdk/protocol/conversation"
"google.golang.org/grpc"
)
@ -31,6 +30,18 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
}
func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
if len(conversationIDs) == 0 {
return nil, nil
}
req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}
return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations)
}
func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) {
return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID}))
}
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
if len(ownerUserIDs) == 0 {
return nil