mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-04 03:13:15 +08:00
conversation version incremental
This commit is contained in:
parent
df9bbeb313
commit
e17f0c95aa
2
go.mod
2
go.mod
@ -12,7 +12,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.17
|
github.com/openimsdk/protocol v0.0.69-alpha.27
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.45
|
github.com/openimsdk/tools v0.0.49-alpha.45
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@ -262,8 +262,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M=
|
github.com/openimsdk/protocol v0.0.69-alpha.27 h1:0Ctpu9VBXVCkKno6vVNBgUTyo9W9bG7SZuAhQr/4H8Y=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.69-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0=
|
github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8=
|
github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||||
|
|||||||
@ -50,3 +50,11 @@ func (o *ConversationApi) SetConversations(c *gin.Context) {
|
|||||||
func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) {
|
func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) {
|
||||||
a2r.Call(conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client, c)
|
a2r.Call(conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *ConversationApi) GetFullOwnerConversationIDs(c *gin.Context) {
|
||||||
|
a2r.Call(conversation.ConversationClient.GetFullOwnerConversationIDs, o.Client, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) {
|
||||||
|
a2r.Call(conversation.ConversationClient.GetIncrementalConversation, o.Client, c)
|
||||||
|
}
|
||||||
|
|||||||
@ -192,6 +192,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
|||||||
conversationGroup.POST("/get_conversations", c.GetConversations)
|
conversationGroup.POST("/get_conversations", c.GetConversations)
|
||||||
conversationGroup.POST("/set_conversations", c.SetConversations)
|
conversationGroup.POST("/set_conversations", c.SetConversations)
|
||||||
conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
|
conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs)
|
||||||
|
conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs)
|
||||||
|
conversationGroup.POST("/get_incremental_conversation", c.GetIncrementalConversation)
|
||||||
}
|
}
|
||||||
|
|
||||||
statisticsGroup := r.Group("/statistics")
|
statisticsGroup := r.Group("/statistics")
|
||||||
|
|||||||
@ -184,13 +184,23 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) {
|
func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) {
|
||||||
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
|
conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &pbconversation.GetConversationsResp{
|
||||||
|
Conversations: conversations,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) getConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
|
||||||
|
conversations, err := c.conversationDatabase.FindConversations(ctx, ownerUserID, conversationIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}}
|
resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}}
|
||||||
resp.Conversations = convert.ConversationsDB2Pb(conversations)
|
resp.Conversations = convert.ConversationsDB2Pb(conversations)
|
||||||
return resp, nil
|
return convert.ConversationsDB2Pb(conversations), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
|
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
|
||||||
|
|||||||
56
internal/rpc/conversation/sync.go
Normal file
56
internal/rpc/conversation/sync.go
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
package conversation
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
|
||||||
|
"github.com/openimsdk/protocol/conversation"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, req *conversation.GetFullOwnerConversationIDsReq) (*conversation.GetFullOwnerConversationIDsResp, error) {
|
||||||
|
vl, err := c.conversationDatabase.FindMaxConversationUserVersionCache(ctx, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
idHash := hashutil.IdHash(conversationIDs)
|
||||||
|
if req.IdHash == idHash {
|
||||||
|
conversationIDs = nil
|
||||||
|
}
|
||||||
|
return &conversation.GetFullOwnerConversationIDsResp{
|
||||||
|
Version: idHash,
|
||||||
|
VersionID: vl.ID.Hex(),
|
||||||
|
Equal: req.IdHash == idHash,
|
||||||
|
ConversationIDs: conversationIDs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) {
|
||||||
|
opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{
|
||||||
|
Ctx: ctx,
|
||||||
|
VersionKey: req.UserID,
|
||||||
|
VersionID: req.VersionID,
|
||||||
|
VersionNumber: req.Version,
|
||||||
|
Version: c.conversationDatabase.FindConversationUserVersion,
|
||||||
|
CacheMaxVersion: c.conversationDatabase.FindMaxConversationUserVersionCache,
|
||||||
|
Find: func(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
|
||||||
|
return c.getConversations(ctx, req.UserID, conversationIDs)
|
||||||
|
},
|
||||||
|
ID: func(elem *conversation.Conversation) string { return elem.GroupID },
|
||||||
|
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*conversation.Conversation, full bool) *conversation.GetIncrementalConversationResp {
|
||||||
|
return &conversation.GetIncrementalConversationResp{
|
||||||
|
VersionID: version.ID.Hex(),
|
||||||
|
Version: uint64(version.Version),
|
||||||
|
Full: full,
|
||||||
|
Delete: delIDs,
|
||||||
|
Insert: insertList,
|
||||||
|
Update: updateList,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return opt.Build()
|
||||||
|
}
|
||||||
@ -290,3 +290,8 @@ type FormDataMate struct {
|
|||||||
Group string `json:"group"`
|
Group string `json:"group"`
|
||||||
Key string `json:"key"`
|
Key string `json:"key"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|||||||
@ -43,6 +43,7 @@ type thirdServer struct {
|
|||||||
defaultExpire time.Duration
|
defaultExpire time.Duration
|
||||||
config *Config
|
config *Config
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
RpcConfig config.Third
|
RpcConfig config.Third
|
||||||
RedisConfig config.Redis
|
RedisConfig config.Redis
|
||||||
|
|||||||
@ -736,3 +736,8 @@ func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (*
|
|||||||
}
|
}
|
||||||
return &pbuser.SortQueryResp{Users: convert.UsersDB2Pb(users)}, nil
|
return &pbuser.SortQueryResp{Users: convert.UsersDB2Pb(users)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|||||||
@ -23,6 +23,7 @@ const (
|
|||||||
SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
|
||||||
SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
|
SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
|
||||||
ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
|
ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
|
||||||
|
ConversationUserMaxKey = "CONVERSATION_USER_MAX:"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetConversationKey(ownerUserID, conversationID string) string {
|
func GetConversationKey(ownerUserID, conversationID string) string {
|
||||||
@ -56,3 +57,7 @@ func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string {
|
|||||||
func GetUserConversationIDsHashKey(ownerUserID string) string {
|
func GetUserConversationIDsHashKey(ownerUserID string) string {
|
||||||
return ConversationIDsHashKey + ownerUserID
|
return ConversationIDsHashKey + ownerUserID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetConversationUserMaxVersionKey(userID string) string {
|
||||||
|
return ConversationUserMaxKey + userID
|
||||||
|
}
|
||||||
|
|||||||
4
pkg/common/storage/cache/conversation.go
vendored
4
pkg/common/storage/cache/conversation.go
vendored
@ -54,4 +54,8 @@ type ConversationCache interface {
|
|||||||
|
|
||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
|
||||||
|
|
||||||
|
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
|
||||||
|
|
||||||
|
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
||||||
}
|
}
|
||||||
|
|||||||
19
pkg/common/storage/cache/redis/conversation.go
vendored
19
pkg/common/storage/cache/redis/conversation.go
vendored
@ -95,6 +95,10 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin
|
|||||||
return cachekey.GetUserConversationIDsHashKey(ownerUserID)
|
return cachekey.GetUserConversationIDsHashKey(ownerUserID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID string) string {
|
||||||
|
return cachekey.GetConversationUserMaxVersionKey(ownerUserID)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
|
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
|
||||||
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
|
return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) {
|
||||||
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
|
return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
|
||||||
@ -233,6 +237,19 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers
|
|||||||
for _, conversationID := range conversationIDs {
|
for _, conversationID := range conversationIDs {
|
||||||
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
|
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
|
||||||
}
|
}
|
||||||
|
|
||||||
return cache
|
return cache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache {
|
||||||
|
cache := c.CloneConversationCache()
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
cache.AddKeys(c.getConversationUserMaxVersionKey(userID))
|
||||||
|
}
|
||||||
|
return cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedisCache) FindMaxConversationUserVersion(ctx context.Context, userID string) (*model.VersionLog, error) {
|
||||||
|
return getCache(ctx, c.rcClient, c.getConversationUserMaxVersionKey(userID), c.expireTime, func(ctx context.Context) (*model.VersionLog, error) {
|
||||||
|
return c.conversationDB.FindConversationUserVersion(ctx, userID, 0, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@ -66,6 +66,8 @@ type ConversationDatabase interface {
|
|||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
// GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
|
// GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
|
||||||
// FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
|
// FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
|
||||||
|
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error)
|
||||||
|
FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||||
@ -106,6 +108,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
|
|||||||
if _, ok := fieldMap["recv_msg_opt"]; ok {
|
if _, ok := fieldMap["recv_msg_opt"]; ok {
|
||||||
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
|
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
|
||||||
}
|
}
|
||||||
|
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
|
||||||
}
|
}
|
||||||
NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs)
|
NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs)
|
||||||
log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
|
log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
|
||||||
@ -137,7 +140,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cache := c.cache.CloneConversationCache()
|
cache := c.cache.CloneConversationCache()
|
||||||
cache = cache.DelUsersConversation(conversationID, userIDs...)
|
cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...)
|
||||||
if _, ok := args["recv_msg_opt"]; ok {
|
if _, ok := args["recv_msg_opt"]; ok {
|
||||||
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
|
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
|
||||||
}
|
}
|
||||||
@ -155,13 +158,14 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
|
|||||||
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
|
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
|
||||||
userIDs = append(userIDs, conversation.OwnerUserID)
|
userIDs = append(userIDs, conversation.OwnerUserID)
|
||||||
}
|
}
|
||||||
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ChainExecDel(ctx)
|
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
|
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error {
|
||||||
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||||
cache := c.cache.CloneConversationCache()
|
cache := c.cache.CloneConversationCache()
|
||||||
for _, conversation := range conversations {
|
for _, conversation := range conversations {
|
||||||
|
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID)
|
||||||
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
|
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
|
||||||
ownerUserID := v[0]
|
ownerUserID := v[0]
|
||||||
userID := v[1]
|
userID := v[1]
|
||||||
@ -207,6 +211,7 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner
|
|||||||
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
|
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error {
|
||||||
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||||
cache := c.cache.CloneConversationCache()
|
cache := c.cache.CloneConversationCache()
|
||||||
|
cache = cache.DelConversationVersionUserIDs(ownerUserID)
|
||||||
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
|
||||||
return e.GroupID, e.GroupID != ""
|
return e.GroupID, e.GroupID != ""
|
||||||
}))
|
}))
|
||||||
@ -322,3 +327,11 @@ func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Contex
|
|||||||
func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
|
func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
|
||||||
return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
|
return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationDatabase) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) {
|
||||||
|
return c.conversationDB.FindConversationUserVersion(ctx, userID, version, limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conversationDatabase) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) {
|
||||||
|
return c.cache.FindMaxConversationUserVersion(ctx, userID)
|
||||||
|
}
|
||||||
|
|||||||
@ -22,7 +22,6 @@ import (
|
|||||||
|
|
||||||
type Conversation interface {
|
type Conversation interface {
|
||||||
Create(ctx context.Context, conversations []*model.Conversation) (err error)
|
Create(ctx context.Context, conversations []*model.Conversation) (err error)
|
||||||
Delete(ctx context.Context, groupIDs []string) (err error)
|
|
||||||
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
|
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
|
||||||
Update(ctx context.Context, conversation *model.Conversation) (err error)
|
Update(ctx context.Context, conversation *model.Conversation) (err error)
|
||||||
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
|
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
|
||||||
@ -39,4 +38,5 @@ type Conversation interface {
|
|||||||
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
|
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
|
||||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
|
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,40 +41,71 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
return &ConversationMgo{coll: coll}, nil
|
version, err := NewVersionLog(db.Collection(database.ConversationVersionName))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &ConversationMgo{version: version, coll: coll}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConversationMgo struct {
|
type ConversationMgo struct {
|
||||||
coll *mongo.Collection
|
version database.VersionLog
|
||||||
|
coll *mongo.Collection
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationMgo) Create(ctx context.Context, conversations []*model.Conversation) (err error) {
|
func (c *ConversationMgo) Create(ctx context.Context, conversations []*model.Conversation) (err error) {
|
||||||
return mongoutil.InsertMany(ctx, c.coll, conversations)
|
return mongoutil.IncrVersion(func() error {
|
||||||
|
return mongoutil.InsertMany(ctx, c.coll, conversations)
|
||||||
|
}, func() error {
|
||||||
|
userConversation := make(map[string][]string)
|
||||||
|
for _, conversation := range conversations {
|
||||||
|
userConversation[conversation.OwnerUserID] = append(userConversation[conversation.OwnerUserID], conversation.ConversationID)
|
||||||
|
}
|
||||||
|
for userID, conversationIDs := range userConversation {
|
||||||
|
if err := c.version.IncrVersion(ctx, userID, conversationIDs, model.VersionStateInsert); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err error) {
|
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (int64, error) {
|
||||||
return mongoutil.DeleteMany(ctx, c.coll, bson.M{"group_id": bson.M{"$in": groupIDs}})
|
if len(args) == 0 || len(userIDs) == 0 {
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) {
|
|
||||||
if len(args) == 0 {
|
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
filter := bson.M{
|
filter := bson.M{
|
||||||
"conversation_id": conversationID,
|
"conversation_id": conversationID,
|
||||||
|
"owner_user_id": bson.M{"$in": userIDs},
|
||||||
}
|
}
|
||||||
if len(userIDs) > 0 {
|
var rows int64
|
||||||
filter["owner_user_id"] = bson.M{"$in": userIDs}
|
err := mongoutil.IncrVersion(func() error {
|
||||||
}
|
res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
|
||||||
res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rows = res.ModifiedCount
|
||||||
|
return nil
|
||||||
|
}, func() error {
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateUpdate); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return res.ModifiedCount, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) {
|
func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) {
|
||||||
return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)
|
return mongoutil.IncrVersion(func() error {
|
||||||
|
return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)
|
||||||
|
}, func() error {
|
||||||
|
return c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationMgo) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) {
|
func (c *ConversationMgo) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) {
|
||||||
@ -178,3 +209,7 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co
|
|||||||
options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}),
|
options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
|
||||||
|
return c.version.FindChangeLog(ctx, userID, version, limit)
|
||||||
|
}
|
||||||
|
|||||||
@ -1,17 +1,18 @@
|
|||||||
package database
|
package database
|
||||||
|
|
||||||
const (
|
const (
|
||||||
BlackName = "black"
|
BlackName = "black"
|
||||||
ConversationName = "conversation"
|
ConversationName = "conversation"
|
||||||
FriendName = "friend"
|
FriendName = "friend"
|
||||||
FriendVersionName = "friend_version"
|
FriendVersionName = "friend_version"
|
||||||
FriendRequestName = "friend_request"
|
FriendRequestName = "friend_request"
|
||||||
GroupName = "group"
|
GroupName = "group"
|
||||||
GroupMemberName = "group_member"
|
GroupMemberName = "group_member"
|
||||||
GroupMemberVersionName = "group_member_version"
|
GroupMemberVersionName = "group_member_version"
|
||||||
GroupJoinVersionName = "group_join_version"
|
GroupJoinVersionName = "group_join_version"
|
||||||
GroupRequestName = "group_request"
|
ConversationVersionName = "conversation_version"
|
||||||
LogName = "log"
|
GroupRequestName = "group_request"
|
||||||
ObjectName = "s3"
|
LogName = "log"
|
||||||
UserName = "user"
|
ObjectName = "s3"
|
||||||
|
UserName = "user"
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user