diff --git a/internal/objstorage/controller.go b/internal/objstorage/controller.go index 621e79aa6..cb8ed08ae 100644 --- a/internal/objstorage/controller.go +++ b/internal/objstorage/controller.go @@ -79,6 +79,9 @@ func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutA name := args.Name effective := time.Now().Add(args.EffectiveTime) prefix := c.Prefix(&args.PutArgs) + if minSize := c.i.MinMultipartSize(); args.FragmentSize > 0 && args.FragmentSize < minSize { + args.FragmentSize = minSize + } var pack int64 if args.FragmentSize <= 0 || args.Size <= args.FragmentSize { pack = 1 diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 304230d00..0441dc5ed 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -2,23 +2,24 @@ package conversation import ( "Open_IM/internal/common/check" + "Open_IM/internal/common/notification" + "Open_IM/internal/tx" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" tableRelation "Open_IM/pkg/common/db/table/relation" - "github.com/OpenIMSDK/openKeeper" - - "Open_IM/internal/common/notification" pbConversation "Open_IM/pkg/proto/conversation" "Open_IM/pkg/utils" "context" + "github.com/OpenIMSDK/openKeeper" + "github.com/dtm-labs/rockscache" "google.golang.org/grpc" ) type conversationServer struct { groupChecker *check.GroupChecker - controller.ConversationInterface + controller.ConversationDataBaseInterface notify *notification.Check } @@ -30,18 +31,25 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error { if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil { return err } + redis, err := cache.NewRedis() + if err != nil { + return err + } pbConversation.RegisterConversationServer(server, &conversationServer{ - groupChecker: check.NewGroupChecker(client), - ConversationInterface: controller.NewConversationController(controller.NewConversationDataBase(controller.NewConversationGorm(db), cache.NewConversationRedis(nil))), + groupChecker: check.NewGroupChecker(client), + ConversationDataBaseInterface: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(redis.GetClient(), rockscache.Options{ + RandomExpireAdjustment: 0.2, + DisableCacheRead: false, + DisableCacheDelete: false, + StrongConsistency: true, + }), tx.NewGorm(db)), }) - controller.NewConversationDataBase() - controller.NewConversationController() return nil } func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) { resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} - conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) + conversations, err := c.ConversationDataBaseInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) if err != nil { return nil, err } @@ -56,7 +64,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} - conversations, err := c.ConversationInterface.GetUserAllConversation(ctx, req.OwnerUserID) + conversations, err := c.ConversationDataBaseInterface.GetUserAllConversation(ctx, req.OwnerUserID) if err != nil { return nil, err } @@ -68,7 +76,7 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) { resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}} - conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) + conversations, err := c.ConversationDataBaseInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) if err != nil { return nil, err } @@ -84,7 +92,7 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil { return nil, err } - err := c.ConversationInterface.SetUserConversations(ctx, req.OwnerUserID, conversations) + err := c.ConversationDataBaseInterface.SetUserConversations(ctx, req.OwnerUserID, conversations) if err != nil { return nil, err } @@ -118,14 +126,14 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p return nil, err } if req.FieldType == constant.FieldIsPrivateChat { - err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation) + err := c.ConversationDataBaseInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation) if err != nil { return nil, err } c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat) return resp, nil } - //haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID) + //haveUserID, err := c.ConversationDataBaseInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID) //if err != nil { // return nil, err //} @@ -149,7 +157,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p case constant.FieldBurnDuration: filedMap["burn_duration"] = req.Conversation.BurnDuration } - err = c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) + err = c.ConversationDataBaseInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) if err != nil { return nil, err } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 6d19bddb1..742fac240 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -50,11 +50,50 @@ type ConversationCache interface { //DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) } +func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options) ConversationCache { + return &ConversationRedis{rcClient: rockscache.NewClient(rdb, opts)} +} + type ConversationRedis struct { rcClient *rockscache.Client expireTime time.Duration } +func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error) { + //TODO implement me + panic("implement me") +} + +func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error) { + //TODO implement me + panic("implement me") +} + +func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) { + //TODO implement me + panic("implement me") +} + +func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) { + //TODO implement me + panic("implement me") +} + +func (c *ConversationRedis) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error { + //TODO implement me + panic("implement me") +} + +func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) { + //TODO implement me + panic("implement me") +} + +func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) { + //TODO implement me + panic("implement me") +} + func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error { panic("implement me") } @@ -63,10 +102,6 @@ func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserI panic("implement me") } -func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis { - return &ConversationRedis{rcClient: rcClient} -} - func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis { return &ConversationRedis{rcClient: rockscache.NewClient(rdb, options)} } @@ -87,31 +122,31 @@ func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) return superGroupRecvMsgNotNotifyUserIDsKey + groupID } -func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { - //getConversationIDs := func() (string, error) { - // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) - // if err != nil { - // return "", err - // } - // bytes, err := json.Marshal(conversationIDs) - // if err != nil { - // return "", utils.Wrap(err, "") - // } - // return string(bytes), nil - //} - //defer func() { - // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs) - //}() - //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs) - //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs) - //if err != nil { - // return nil, utils.Wrap(err, "") - //} - //return conversationIDs, nil - return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { - panic("implement me") - }) -} +//func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { +// //getConversationIDs := func() (string, error) { +// // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) +// // if err != nil { +// // return "", err +// // } +// // bytes, err := json.Marshal(conversationIDs) +// // if err != nil { +// // return "", utils.Wrap(err, "") +// // } +// // return string(bytes), nil +// //} +// //defer func() { +// // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs) +// //}() +// //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs) +// //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs) +// //if err != nil { +// // return nil, utils.Wrap(err, "") +// //} +// //return conversationIDs, nil +// return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { +// panic("implement me") +// }) +//} func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { //getConversationIDs := func() (string, error) { @@ -171,11 +206,11 @@ func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUse return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") } -func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) { - return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) { - panic("implement me") - }) -} +//func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) { +// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) { +// panic("implement me") +// }) +//} func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { defer func() { @@ -184,69 +219,65 @@ func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, co return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err") } -func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) { - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) - }() - for _, conversationID := range conversationIDs { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - return nil, err - } - conversations = append(conversations, *conversation) - } - return conversations, nil -} +//func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) { +// defer func() { +// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) +// }() +// for _, conversationID := range conversationIDs { +// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) +// if err != nil { +// return nil, err +// } +// conversations = append(conversations, *conversation) +// } +// return conversations, nil +//} -func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) { - defer func() { - tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) - }() - IDs, err := c.GetUserConversationIDs(ctx, ownerUserID) - if err != nil { - return nil, err - } - var conversationIDs []relationTb.ConversationModel - for _, conversationID := range IDs { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - return nil, err - } - conversationIDs = append(conversationIDs, *conversation) - } - return conversationIDs, nil -} +//func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) { +// defer func() { +// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) +// }() +// IDs, err := c.GetUserConversationIDs(ctx, ownerUserID) +// if err != nil { +// return nil, err +// } +// var conversationIDs []relationTb.ConversationModel +// for _, conversationID := range IDs { +// conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) +// if err != nil { +// return nil, err +// } +// conversationIDs = append(conversationIDs, *conversation) +// } +// return conversationIDs, nil +//} -func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { - //getConversation := func() (string, error) { - // conversation, err := relation.GetConversation(ownerUserID, conversationID) - // if err != nil { - // return "", err - // } - // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil - //} - //defer func() { - // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) - //}() - //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) - //if err != nil { - // return 0, err - //} - //return strconv.Atoi(optStr) - // panic("implement me") - return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) { - panic("implement me") - }) -} +//func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { +// //getConversation := func() (string, error) { +// // conversation, err := relation.GetConversation(ownerUserID, conversationID) +// // if err != nil { +// // return "", err +// // } +// // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil +// //} +// //defer func() { +// // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) +// //}() +// //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) +// //if err != nil { +// // return 0, err +// //} +// //return strconv.Atoi(optStr) +// // panic("implement me") +// return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) { +// panic("implement me") +// }) +//} func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed") } -func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { - panic("implement me") -} - func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { panic("implement me") } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 3bf6c4d78..d1bf65996 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -1,6 +1,7 @@ package controller import ( + "Open_IM/internal/tx" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/relation" @@ -8,68 +9,8 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "gorm.io/gorm" ) -type ConversationInterface interface { - //GetUserIDExistConversation 获取拥有该会话的的用户ID列表 - GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) - //UpdateUserConversationFiled 更新用户该会话的属性信息 - UpdateUsersConversationFiled(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) error - //CreateConversation 创建一批新的会话 - CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error - //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 - SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error - //FindConversations 根据会话ID获取某个用户的多个会话 - FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) - //GetUserAllConversation 获取一个用户在服务器上所有的会话 - GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) - //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 - SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error - //SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作 - SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error -} -type ConversationController struct { - database ConversationDataBaseInterface -} - -func (c *ConversationController) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { - return c.database.SetUsersConversationFiledTx(ctx, userIDList, conversation, filedMap) -} - -func NewConversationController(database ConversationDataBaseInterface) *ConversationController { - return &ConversationController{database: database} -} - -func (c *ConversationController) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { - return c.database.GetUserIDExistConversation(ctx, userIDList, conversationID) -} - -func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { - return c.database.UpdateUsersConversationFiled(ctx, UserIDList, conversationID, args) -} - -func (c ConversationController) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { - return c.database.CreateConversation(ctx, conversations) -} - -func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { - return c.database.SyncPeerUserPrivateConversationTx(ctx, conversation) -} - -func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { - return c.database.FindConversations(ctx, ownerUserID, conversationIDs) -} - -func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { - return c.database.GetUserAllConversation(ctx, ownerUserID) -} -func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { - return c.database.SetUserConversations(ctx, ownerUserID, conversations) -} - -var _ ConversationInterface = (*ConversationController)(nil) - type ConversationDataBaseInterface interface { //GetUserIDExistConversation 获取拥有该会话的的用户ID列表 GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) @@ -89,22 +30,29 @@ type ConversationDataBaseInterface interface { SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error } -var _ ConversationDataBaseInterface = (*ConversationDataBase)(nil) +func NewConversationDatabase(conversation relation.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDataBaseInterface { + return &ConversationDataBase{ + conversationDB: conversation, + cache: cache, + tx: tx, + } +} type ConversationDataBase struct { conversationDB relation.Conversation cache cache.ConversationCache + tx tx.Tx } -func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { - fn := func(tx any) error { - temp := c.conversationDB.NewTx(tx) - haveUserID, err := temp.FindUserID(ctx, userIDList, conversation.ConversationID, tx) +func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { + return c.tx.Transaction(func(tx any) error { + conversationTx := c.conversationDB.NewTx(tx) + haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID) if err != nil { return err } if len(haveUserID) > 0 { - err = temp.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap, tx) + err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap) if err != nil { return err } @@ -119,7 +67,7 @@ func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, u temp.OwnerUserID = v cList = append(cList, temp) } - err = temp.Create(ctx, cList) + err = conversationTx.Create(ctx, cList) if err != nil { return err } @@ -134,42 +82,42 @@ func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, u return err } return nil - } - - return c.conversationDB.Transaction(fn) + }) } -func NewConversationDataBase(db relation.Conversation, cache cache.ConversationCache) *ConversationDataBase { - return &ConversationDataBase{conversationDB: db, cache: cache} -} - -func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { +func (c *ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { panic("implement me") } -func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { +func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { panic("implement me") } -func (c ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { - panic("implement me") +func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { + return c.tx.Transaction(func(tx any) error { + if err := c.conversationDB.NewTx(tx).Create(ctx, conversations); err != nil { + return err + } + // clear cache + return nil + }) } -func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { - return c.db.Transaction(func(tx *gorm.DB) error { +func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { + return c.tx.Transaction(func(tx any) error { userIDList := []string{conversation.OwnerUserID, conversation.UserID} - haveUserID, err := c.conversationDB.FindUserID(ctx, userIDList, conversation.ConversationID, tx) + conversationTx := c.conversationDB.NewTx(tx) + haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID) if err != nil { return err } filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat} if len(haveUserID) > 0 { - err = c.conversationDB.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap, tx) + err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap) if err != nil { return err } } - NotUserID := utils.DifferenceString(haveUserID, userIDList) var cList []*relationTb.ConversationModel for _, v := range NotUserID { @@ -206,7 +154,7 @@ func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Cont }) } -func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { +func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { getConversation := func() (string, error) { conversationList, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs) if err != nil { @@ -221,7 +169,7 @@ func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID return c.cache.GetConversations(ctx, ownerUserID, conversationIDs, getConversation) } -func (c ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { +func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { getConversation := func() (string, error) { conversationList, err := c.conversationDB.Take(ctx, ownerUserID, conversationID) if err != nil { @@ -236,7 +184,7 @@ func (c ConversationDataBase) GetConversation(ctx context.Context, ownerUserID s return c.cache.GetConversation(ctx, ownerUserID, conversationID, getConversation) } -func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { +func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { getConversationIDList := func() (string, error) { conversationIDList, err := c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) if err != nil { @@ -254,27 +202,28 @@ func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerU } var conversations []*relationTb.ConversationModel for _, conversationID := range conversationIDList { - conversation, tErr := c.GetConversation(ctx, ownerUserID, conversationID) - if tErr != nil { - return nil, utils.Wrap(tErr, "GetConversation failed") + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + return nil, utils.Wrap(err, "GetConversation failed") } conversations = append(conversations, conversation) } return conversations, nil } -func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { - return c.db.Transaction(func(tx *gorm.DB) error { +func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { + return c.tx.Transaction(func(tx any) error { var conversationIDList []string for _, conversation := range conversations { conversationIDList = append(conversationIDList, conversation.ConversationID) } - haveConversations, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDList, tx) + conversationTx := c.conversationDB.NewTx(tx) + haveConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDList) if err != nil { return err } if len(haveConversations) > 0 { - err = c.conversationDB.Update(ctx, conversations, tx) + err = conversationTx.Update(ctx, conversations) if err != nil { return err } diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index a975928e5..facbe3cd4 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -9,16 +9,15 @@ import ( ) type Conversation interface { - Create(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) - Delete(ctx context.Context, groupIDs []string, tx ...any) (err error) - UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}, tx ...any) (err error) - Update(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) - Find(ctx context.Context, ownerUserID string, conversationIDs []string, tx ...any) (conversations []*relation.ConversationModel, err error) - FindUserID(ctx context.Context, userIDList []string, conversationID string, tx ...any) ([]string, error) - FindUserIDAllConversationID(ctx context.Context, userID string, tx ...any) ([]string, error) - Take(ctx context.Context, userID, conversationID string, tx ...any) (conversation *relation.ConversationModel, err error) - FindConversationID(ctx context.Context, userID string, conversationIDList []string, tx ...any) (existConversationID []string, err error) - Transaction(func(tx any) error) error + Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) + Delete(ctx context.Context, groupIDs []string) (err error) + UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) + Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) + Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) + FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error) + FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) + Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) + FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) NewTx(tx any) Conversation } type ConversationGorm struct { @@ -28,74 +27,69 @@ type ConversationGorm struct { func NewConversationGorm(DB *gorm.DB) Conversation { return &ConversationGorm{DB: DB} } -func (c *ConversationGorm) Transaction(fn func(tx any) error) error { - return c.DB.Transaction(func(tx *gorm.DB) error { - return fn(tx) - }) -} func (c *ConversationGorm) NewTx(tx any) Conversation { return &ConversationGorm{DB: tx.(*gorm.DB)} } -func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) { +func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations) }() - return utils.Wrap(getDBConn(c.DB, tx).Create(&conversations).Error, "") + return utils.Wrap(c.DB.Create(&conversations).Error, "") } -func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string, tx ...any) (err error) { +func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) }() - return utils.Wrap(getDBConn(c.DB, tx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") + return utils.Wrap(c.DB.Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") } -func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}, tx ...any) (err error) { +func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userIDList", userIDList, "conversationID", conversationID) }() - return utils.Wrap(getDBConn(c.DB, tx).Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "") + return utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "") } -func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) { +func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations) }() - return utils.Wrap(getDBConn(c.DB, tx).Updates(&conversations).Error, "") + return utils.Wrap(c.DB.Updates(&conversations).Error, "") } -func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string, tx ...any) (conversations []*relation.ConversationModel, err error) { +func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "groups", conversations) }() - err = utils.Wrap(getDBConn(c.DB, tx).Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "") + err = utils.Wrap(c.DB.Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "") return conversations, err } -func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string, tx ...any) (conversation *relation.ConversationModel, err error) { +func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) { cc := &relation.ConversationModel{} defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversation", *conversation) }() - return cc, utils.Wrap(getDBConn(c.DB, tx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "") + return cc, utils.Wrap(c.DB.Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "") } -func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string, tx ...any) (existUserID []string, err error) { +func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string) (existUserID []string, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userIDList, "existUserID", existUserID) }() - return existUserID, utils.Wrap(getDBConn(c.DB, tx).Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "") + return existUserID, utils.Wrap(c.DB.Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "") } -func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string, tx ...any) (existConversationID []string, err error) { +func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "existConversationIDList", existConversationID) }() - return existConversationID, utils.Wrap(getDBConn(c.DB, tx).Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "") + return existConversationID, utils.Wrap(c.DB.Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "") } -func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string, tx ...any) (conversationIDList []string, err error) { +func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string) (conversationIDList []string, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversationIDList", conversationIDList) }() - return conversationIDList, utils.Wrap(getDBConn(c.DB, tx).Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "") + return conversationIDList, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "") }