diff --git a/go.mod b/go.mod index 43251a4cf..7e3af7de2 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.1 - github.com/OpenIMSDK/protocol v0.0.18 + github.com/OpenIMSDK/protocol v0.0.21 github.com/OpenIMSDK/tools v0.0.14 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index e94b507ae..5b180156e 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.1 h1:B4/TdHce/8Ipza+qrLIeNJ9D1AOxZVp/3uDv6H/dp2M= github.com/IBM/sarama v1.41.1/go.mod h1:JFCPURVskaipJdKRFkiE/OZqQHw7jqliaJmRwXCmSSw= -github.com/OpenIMSDK/protocol v0.0.18 h1:hXukFiDMLZx7s+hDCQePIK9ABiHyNlobNL4MppvOuMY= -github.com/OpenIMSDK/protocol v0.0.18/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.21 h1:5H6H+hJ9d/VgRqttvxD/zfK9Asd+4M8Eknk5swSbUVY= +github.com/OpenIMSDK/protocol v0.0.21/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 4e068b775..e422de677 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -44,3 +44,7 @@ func (o *ConversationApi) GetConversations(c *gin.Context) { func (o *ConversationApi) SetConversations(c *gin.Context) { a2r.Call(conversation.ConversationClient.SetConversations, o.Client, c) } + +func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client, c) +} diff --git a/internal/api/route.go b/internal/api/route.go index e1722523b..221e180a1 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -198,6 +198,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive conversationGroup.POST("/get_conversation", c.GetConversation) conversationGroup.POST("/get_conversations", c.GetConversations) conversationGroup.POST("/set_conversations", c.SetConversations) + conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs) } statisticsGroup := r.Group("/statistics", ParseToken) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index edd03ce98..66b003eaa 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -19,6 +19,8 @@ import ( "encoding/json" "errors" + "github.com/OpenIMSDK/protocol/conversation" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/OpenIMSDK/protocol/constant" @@ -117,7 +119,6 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg if err != nil { return err } - break } } } @@ -234,15 +235,23 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } - err = p.offlinePushMsg(ctx, groupID, msg, offlinePushUserIDs) + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } - _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) - if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) - return err + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, WebAndPcBackgroundUserIDs)); err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) + return err + } } } } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 9f97c3c67..4b5b43fe8 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -300,3 +300,30 @@ func (c *conversationServer) GetConversationsByConversationID( } return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil } + +func (c *conversationServer) GetConversationOfflinePushUserIDs( + ctx context.Context, + req *pbconversation.GetConversationOfflinePushUserIDsReq, +) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) { + if req.ConversationID == "" { + return nil, errs.ErrArgs.Wrap("conversationID is empty") + } + if len(req.UserIDs) == 0 { + return &pbconversation.GetConversationOfflinePushUserIDsResp{}, nil + } + userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID) + if err != nil { + return nil, err + } + if len(userIDs) == 0 { + return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: req.UserIDs}, nil + } + userIDSet := make(map[string]struct{}) + for _, userID := range req.UserIDs { + userIDSet[userID] = struct{}{} + } + for _, userID := range userIDs { + delete(userIDSet, userID) + } + return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil +} diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index d02b021e3..083cf6d0b 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -38,6 +38,7 @@ const ( recvMsgOptKey = "RECV_MSG_OPT:" superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" conversationExpireTime = time.Second * 60 * 60 * 12 ) @@ -83,6 +84,8 @@ type ConversationCache interface { conversationIDs []string, ) ([]*relationtb.ConversationModel, error) DelConversationByConversationID(conversationIDs ...string) ConversationCache + GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) + DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache } func NewConversationRedis( @@ -153,6 +156,10 @@ func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conve return conversationHasReadSeqKey + ownerUserID + ":" + conversationID } +func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string { + return conversationNotReceiveMessageUserIDsKey + conversationID +} + func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { return getCache( ctx, @@ -432,3 +439,23 @@ func (c *ConversationRedisCache) GetConversationsByConversationID( func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) ConversationCache { panic("implement me") } + +func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + return getCache( + ctx, + c.rcClient, + c.getConversationNotReceiveMessageUserIDsKey(conversationID), + c.expireTime, + func(ctx context.Context) ([]string, error) { + return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) + }, + ) +} + +func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache { + cache := c.NewCache() + for _, conversationID := range conversationIDs { + cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID)) + } + return cache +} diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 806e09331..c3dd6980e 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -53,6 +53,7 @@ type ConversationDatabase interface { GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) + GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) } func NewConversationDatabase(conversation relationtb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -88,6 +89,9 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID) } } + if _, ok := filedMap["recv_msg_opt"]; ok { + cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) + } } NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs) log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs) @@ -121,7 +125,12 @@ func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, if err != nil { return err } - return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx) + cache := c.cache.NewCache() + cache = cache.DelUsersConversation(conversationID, userIDs...) + if _, ok := args["recv_msg_opt"]; ok { + cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) + } + return cache.ExecDel(ctx) } func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationtb.ConversationModel) error { @@ -132,6 +141,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat cache := c.cache.NewCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) + cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) } return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) @@ -224,7 +234,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs if err != nil { return err } - cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) + cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID).DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...) } return nil }); err != nil { @@ -250,7 +260,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, for _, v := range notExistUserIDs { conversation := relationtb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) - cache = cache.DelConversations(v, conversationID) + cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID) } cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...) if len(conversations) > 0 { @@ -296,3 +306,7 @@ func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Cont func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) { return c.conversationDB.GetConversationIDsNeedDestruct(ctx) } + +func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) +} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index dcc18be17..d5ca92ec2 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -17,6 +17,8 @@ package relation import ( "context" + "github.com/OpenIMSDK/tools/errs" + "gorm.io/gorm" "github.com/OpenIMSDK/protocol/constant" @@ -214,3 +216,24 @@ func (c *ConversationGorm) GetConversationIDsNeedDestruct( "", ) } + +func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) { + var recvMsgOpt int32 + return recvMsgOpt, errs.Wrap( + c.db(ctx). + Model(&relation.ConversationModel{}). + Where("conversation_id = ? and owner_user_id in ?", conversationID, userID). + Pluck("recv_msg_opt", &recvMsgOpt). + Error, + ) +} + +func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + var userIDs []string + return userIDs, errs.Wrap( + c.db(ctx). + Model(&relation.ConversationModel{}). + Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage). + Pluck("owner_user_id", &userIDs).Error, + ) +} diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 99c01e5b7..7e6c6bdf8 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -66,5 +66,6 @@ type ConversationModelInterface interface { GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error) + GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) NewTx(tx any) ConversationModelInterface }