conversation

This commit is contained in:
withchao 2023-02-22 14:31:30 +08:00
parent cd00d91eee
commit 3bae8edb39
5 changed files with 216 additions and 231 deletions

View File

@ -79,6 +79,9 @@ func (c *Controller) ApplyPut(ctx context.Context, args *FragmentPutArgs) (*PutA
name := args.Name name := args.Name
effective := time.Now().Add(args.EffectiveTime) effective := time.Now().Add(args.EffectiveTime)
prefix := c.Prefix(&args.PutArgs) prefix := c.Prefix(&args.PutArgs)
if minSize := c.i.MinMultipartSize(); args.FragmentSize > 0 && args.FragmentSize < minSize {
args.FragmentSize = minSize
}
var pack int64 var pack int64
if args.FragmentSize <= 0 || args.Size <= args.FragmentSize { if args.FragmentSize <= 0 || args.Size <= args.FragmentSize {
pack = 1 pack = 1

View File

@ -2,23 +2,24 @@ package conversation
import ( import (
"Open_IM/internal/common/check" "Open_IM/internal/common/check"
"Open_IM/internal/common/notification"
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/relation"
tableRelation "Open_IM/pkg/common/db/table/relation" tableRelation "Open_IM/pkg/common/db/table/relation"
"github.com/OpenIMSDK/openKeeper"
"Open_IM/internal/common/notification"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"github.com/OpenIMSDK/openKeeper"
"github.com/dtm-labs/rockscache"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
type conversationServer struct { type conversationServer struct {
groupChecker *check.GroupChecker groupChecker *check.GroupChecker
controller.ConversationInterface controller.ConversationDataBaseInterface
notify *notification.Check notify *notification.Check
} }
@ -30,18 +31,25 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil { if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil {
return err return err
} }
redis, err := cache.NewRedis()
if err != nil {
return err
}
pbConversation.RegisterConversationServer(server, &conversationServer{ pbConversation.RegisterConversationServer(server, &conversationServer{
groupChecker: check.NewGroupChecker(client), groupChecker: check.NewGroupChecker(client),
ConversationInterface: controller.NewConversationController(controller.NewConversationDataBase(controller.NewConversationGorm(db), cache.NewConversationRedis(nil))), ConversationDataBaseInterface: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(redis.GetClient(), rockscache.Options{
RandomExpireAdjustment: 0.2,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: true,
}), tx.NewGorm(db)),
}) })
controller.NewConversationDataBase()
controller.NewConversationController()
return nil return nil
} }
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) { func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) conversations, err := c.ConversationDataBaseInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -56,7 +64,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.GetUserAllConversation(ctx, req.OwnerUserID) conversations, err := c.ConversationDataBaseInterface.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -68,7 +76,7 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon
func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) { func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) {
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}} resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) conversations, err := c.ConversationDataBaseInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -84,7 +92,7 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil { if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil {
return nil, err return nil, err
} }
err := c.ConversationInterface.SetUserConversations(ctx, req.OwnerUserID, conversations) err := c.ConversationDataBaseInterface.SetUserConversations(ctx, req.OwnerUserID, conversations)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -118,14 +126,14 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
return nil, err return nil, err
} }
if req.FieldType == constant.FieldIsPrivateChat { if req.FieldType == constant.FieldIsPrivateChat {
err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation) err := c.ConversationDataBaseInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat) c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil return resp, nil
} }
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID) //haveUserID, err := c.ConversationDataBaseInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
//if err != nil { //if err != nil {
// return nil, err // return nil, err
//} //}
@ -149,7 +157,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
case constant.FieldBurnDuration: case constant.FieldBurnDuration:
filedMap["burn_duration"] = req.Conversation.BurnDuration filedMap["burn_duration"] = req.Conversation.BurnDuration
} }
err = c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) err = c.ConversationDataBaseInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -50,11 +50,50 @@ type ConversationCache interface {
//DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) //DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string)
} }
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options) ConversationCache {
return &ConversationRedis{rcClient: rockscache.NewClient(rdb, opts)}
}
type ConversationRedis struct { type ConversationRedis struct {
rcClient *rockscache.Client rcClient *rockscache.Client
expireTime time.Duration expireTime time.Duration
} }
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) {
//TODO implement me
panic("implement me")
}
func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error { func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error {
panic("implement me") panic("implement me")
} }
@ -63,10 +102,6 @@ func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserI
panic("implement me") panic("implement me")
} }
func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis {
return &ConversationRedis{rcClient: rcClient}
}
func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis { func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis {
return &ConversationRedis{rcClient: rockscache.NewClient(rdb, options)} return &ConversationRedis{rcClient: rockscache.NewClient(rdb, options)}
} }
@ -87,31 +122,31 @@ func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string)
return superGroupRecvMsgNotNotifyUserIDsKey + groupID return superGroupRecvMsgNotNotifyUserIDsKey + groupID
} }
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { //func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
//getConversationIDs := func() (string, error) { // //getConversationIDs := func() (string, error) {
// conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) // // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
// if err != nil { // // if err != nil {
// return "", err // // return "", err
// } // // }
// bytes, err := json.Marshal(conversationIDs) // // bytes, err := json.Marshal(conversationIDs)
// if err != nil { // // if err != nil {
// return "", utils.Wrap(err, "") // // return "", utils.Wrap(err, "")
// } // // }
// return string(bytes), nil // // return string(bytes), nil
//} // //}
//defer func() { // //defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs) // // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs)
//}() // //}()
//conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs) // //conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs)
//err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs) // //err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs)
//if err != nil { // //if err != nil {
// return nil, utils.Wrap(err, "") // // return nil, utils.Wrap(err, "")
//} // //}
//return conversationIDs, nil // //return conversationIDs, nil
return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { // return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) {
panic("implement me") // panic("implement me")
}) // })
} //}
func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
//getConversationIDs := func() (string, error) { //getConversationIDs := func() (string, error) {
@ -171,11 +206,11 @@ func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUse
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err")
} }
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) { //func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) {
return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) { // return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
panic("implement me") // panic("implement me")
}) // })
} //}
func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) {
defer func() { defer func() {
@ -184,69 +219,65 @@ func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, co
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err") return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err")
} }
func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) { //func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) {
defer func() { // defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations)
}() // }()
for _, conversationID := range conversationIDs { // for _, conversationID := range conversationIDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) // conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
conversations = append(conversations, *conversation) // conversations = append(conversations, *conversation)
} // }
return conversations, nil // return conversations, nil
} //}
func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) { //func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) {
defer func() { // defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations)
}() // }()
IDs, err := c.GetUserConversationIDs(ctx, ownerUserID) // IDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
var conversationIDs []relationTb.ConversationModel // var conversationIDs []relationTb.ConversationModel
for _, conversationID := range IDs { // for _, conversationID := range IDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) // conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
conversationIDs = append(conversationIDs, *conversation) // conversationIDs = append(conversationIDs, *conversation)
} // }
return conversationIDs, nil // return conversationIDs, nil
} //}
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { //func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
//getConversation := func() (string, error) { // //getConversation := func() (string, error) {
// conversation, err := relation.GetConversation(ownerUserID, conversationID) // // conversation, err := relation.GetConversation(ownerUserID, conversationID)
// if err != nil { // // if err != nil {
// return "", err // // return "", err
// } // // }
// return strconv.Itoa(int(conversation.RecvMsgOpt)), nil // // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil
//} // //}
//defer func() { // //defer func() {
// tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) // // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt)
//}() // //}()
//optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) // //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
//if err != nil { // //if err != nil {
// return 0, err // // return 0, err
//} // //}
//return strconv.Atoi(optStr) // //return strconv.Atoi(optStr)
// panic("implement me") // // panic("implement me")
return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) { // return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) {
panic("implement me") // panic("implement me")
}) // })
} //}
func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error {
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed") return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed")
} }
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
panic("implement me")
}
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) {
panic("implement me") panic("implement me")
} }

View File

@ -1,6 +1,7 @@
package controller package controller
import ( import (
"Open_IM/internal/tx"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/relation"
@ -8,68 +9,8 @@ import (
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"encoding/json" "encoding/json"
"gorm.io/gorm"
) )
type ConversationInterface interface {
//GetUserIDExistConversation 获取拥有该会话的的用户ID列表
GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
//UpdateUserConversationFiled 更新用户该会话的属性信息
UpdateUsersConversationFiled(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) error
//CreateConversation 创建一批新的会话
CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error
//FindConversations 根据会话ID获取某个用户的多个会话
FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
//GetUserAllConversation 获取一个用户在服务器上所有的会话
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error
//SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
}
type ConversationController struct {
database ConversationDataBaseInterface
}
func (c *ConversationController) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
return c.database.SetUsersConversationFiledTx(ctx, userIDList, conversation, filedMap)
}
func NewConversationController(database ConversationDataBaseInterface) *ConversationController {
return &ConversationController{database: database}
}
func (c *ConversationController) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
return c.database.GetUserIDExistConversation(ctx, userIDList, conversationID)
}
func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
return c.database.UpdateUsersConversationFiled(ctx, UserIDList, conversationID, args)
}
func (c ConversationController) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
return c.database.CreateConversation(ctx, conversations)
}
func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error {
return c.database.SyncPeerUserPrivateConversationTx(ctx, conversation)
}
func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
return c.database.FindConversations(ctx, ownerUserID, conversationIDs)
}
func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
return c.database.GetUserAllConversation(ctx, ownerUserID)
}
func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
return c.database.SetUserConversations(ctx, ownerUserID, conversations)
}
var _ ConversationInterface = (*ConversationController)(nil)
type ConversationDataBaseInterface interface { type ConversationDataBaseInterface interface {
//GetUserIDExistConversation 获取拥有该会话的的用户ID列表 //GetUserIDExistConversation 获取拥有该会话的的用户ID列表
GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
@ -89,22 +30,29 @@ type ConversationDataBaseInterface interface {
SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
} }
var _ ConversationDataBaseInterface = (*ConversationDataBase)(nil) func NewConversationDatabase(conversation relation.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDataBaseInterface {
return &ConversationDataBase{
conversationDB: conversation,
cache: cache,
tx: tx,
}
}
type ConversationDataBase struct { type ConversationDataBase struct {
conversationDB relation.Conversation conversationDB relation.Conversation
cache cache.ConversationCache cache cache.ConversationCache
tx tx.Tx
} }
func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
fn := func(tx any) error { return c.tx.Transaction(func(tx any) error {
temp := c.conversationDB.NewTx(tx) conversationTx := c.conversationDB.NewTx(tx)
haveUserID, err := temp.FindUserID(ctx, userIDList, conversation.ConversationID, tx) haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID)
if err != nil { if err != nil {
return err return err
} }
if len(haveUserID) > 0 { if len(haveUserID) > 0 {
err = temp.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap, tx) err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap)
if err != nil { if err != nil {
return err return err
} }
@ -119,7 +67,7 @@ func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, u
temp.OwnerUserID = v temp.OwnerUserID = v
cList = append(cList, temp) cList = append(cList, temp)
} }
err = temp.Create(ctx, cList) err = conversationTx.Create(ctx, cList)
if err != nil { if err != nil {
return err return err
} }
@ -134,42 +82,42 @@ func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, u
return err return err
} }
return nil return nil
} })
return c.conversationDB.Transaction(fn)
} }
func NewConversationDataBase(db relation.Conversation, cache cache.ConversationCache) *ConversationDataBase { func (c *ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
return &ConversationDataBase{conversationDB: db, cache: cache}
}
func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
panic("implement me") panic("implement me")
} }
func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
panic("implement me") panic("implement me")
} }
func (c ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
panic("implement me") return c.tx.Transaction(func(tx any) error {
if err := c.conversationDB.NewTx(tx).Create(ctx, conversations); err != nil {
return err
}
// clear cache
return nil
})
} }
func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error {
return c.db.Transaction(func(tx *gorm.DB) error { return c.tx.Transaction(func(tx any) error {
userIDList := []string{conversation.OwnerUserID, conversation.UserID} userIDList := []string{conversation.OwnerUserID, conversation.UserID}
haveUserID, err := c.conversationDB.FindUserID(ctx, userIDList, conversation.ConversationID, tx) conversationTx := c.conversationDB.NewTx(tx)
haveUserID, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID)
if err != nil { if err != nil {
return err return err
} }
filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat} filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat}
if len(haveUserID) > 0 { if len(haveUserID) > 0 {
err = c.conversationDB.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap, tx) err = conversationTx.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap)
if err != nil { if err != nil {
return err return err
} }
} }
NotUserID := utils.DifferenceString(haveUserID, userIDList) NotUserID := utils.DifferenceString(haveUserID, userIDList)
var cList []*relationTb.ConversationModel var cList []*relationTb.ConversationModel
for _, v := range NotUserID { for _, v := range NotUserID {
@ -206,7 +154,7 @@ func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Cont
}) })
} }
func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
getConversation := func() (string, error) { getConversation := func() (string, error) {
conversationList, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs) conversationList, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
if err != nil { if err != nil {
@ -221,7 +169,7 @@ func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs, getConversation) return c.cache.GetConversations(ctx, ownerUserID, conversationIDs, getConversation)
} }
func (c ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
getConversation := func() (string, error) { getConversation := func() (string, error) {
conversationList, err := c.conversationDB.Take(ctx, ownerUserID, conversationID) conversationList, err := c.conversationDB.Take(ctx, ownerUserID, conversationID)
if err != nil { if err != nil {
@ -236,7 +184,7 @@ func (c ConversationDataBase) GetConversation(ctx context.Context, ownerUserID s
return c.cache.GetConversation(ctx, ownerUserID, conversationID, getConversation) return c.cache.GetConversation(ctx, ownerUserID, conversationID, getConversation)
} }
func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
getConversationIDList := func() (string, error) { getConversationIDList := func() (string, error) {
conversationIDList, err := c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) conversationIDList, err := c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
if err != nil { if err != nil {
@ -254,27 +202,28 @@ func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerU
} }
var conversations []*relationTb.ConversationModel var conversations []*relationTb.ConversationModel
for _, conversationID := range conversationIDList { for _, conversationID := range conversationIDList {
conversation, tErr := c.GetConversation(ctx, ownerUserID, conversationID) conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if tErr != nil { if err != nil {
return nil, utils.Wrap(tErr, "GetConversation failed") return nil, utils.Wrap(err, "GetConversation failed")
} }
conversations = append(conversations, conversation) conversations = append(conversations, conversation)
} }
return conversations, nil return conversations, nil
} }
func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
return c.db.Transaction(func(tx *gorm.DB) error { return c.tx.Transaction(func(tx any) error {
var conversationIDList []string var conversationIDList []string
for _, conversation := range conversations { for _, conversation := range conversations {
conversationIDList = append(conversationIDList, conversation.ConversationID) conversationIDList = append(conversationIDList, conversation.ConversationID)
} }
haveConversations, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDList, tx) conversationTx := c.conversationDB.NewTx(tx)
haveConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDList)
if err != nil { if err != nil {
return err return err
} }
if len(haveConversations) > 0 { if len(haveConversations) > 0 {
err = c.conversationDB.Update(ctx, conversations, tx) err = conversationTx.Update(ctx, conversations)
if err != nil { if err != nil {
return err return err
} }

View File

@ -9,16 +9,15 @@ import (
) )
type Conversation interface { type Conversation interface {
Create(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error)
Delete(ctx context.Context, groupIDs []string, tx ...any) (err error) Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}, tx ...any) (err error) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error)
Update(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) Update(ctx context.Context, conversations []*relation.ConversationModel) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string, tx ...any) (conversations []*relation.ConversationModel, err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error)
FindUserID(ctx context.Context, userIDList []string, conversationID string, tx ...any) ([]string, error) FindUserID(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string, tx ...any) ([]string, error) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string, tx ...any) (conversation *relation.ConversationModel, err error) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error)
FindConversationID(ctx context.Context, userID string, conversationIDList []string, tx ...any) (existConversationID []string, err error) FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error)
Transaction(func(tx any) error) error
NewTx(tx any) Conversation NewTx(tx any) Conversation
} }
type ConversationGorm struct { type ConversationGorm struct {
@ -28,74 +27,69 @@ type ConversationGorm struct {
func NewConversationGorm(DB *gorm.DB) Conversation { func NewConversationGorm(DB *gorm.DB) Conversation {
return &ConversationGorm{DB: DB} return &ConversationGorm{DB: DB}
} }
func (c *ConversationGorm) Transaction(fn func(tx any) error) error {
return c.DB.Transaction(func(tx *gorm.DB) error {
return fn(tx)
})
}
func (c *ConversationGorm) NewTx(tx any) Conversation { func (c *ConversationGorm) NewTx(tx any) Conversation {
return &ConversationGorm{DB: tx.(*gorm.DB)} return &ConversationGorm{DB: tx.(*gorm.DB)}
} }
func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) { func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations)
}() }()
return utils.Wrap(getDBConn(c.DB, tx).Create(&conversations).Error, "") return utils.Wrap(c.DB.Create(&conversations).Error, "")
} }
func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string, tx ...any) (err error) { func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs)
}() }()
return utils.Wrap(getDBConn(c.DB, tx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") return utils.Wrap(c.DB.Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "")
} }
func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}, tx ...any) (err error) { func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userIDList", userIDList, "conversationID", conversationID) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userIDList", userIDList, "conversationID", conversationID)
}() }()
return utils.Wrap(getDBConn(c.DB, tx).Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "") return utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "")
} }
func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) { func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations)
}() }()
return utils.Wrap(getDBConn(c.DB, tx).Updates(&conversations).Error, "") return utils.Wrap(c.DB.Updates(&conversations).Error, "")
} }
func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string, tx ...any) (conversations []*relation.ConversationModel, err error) { func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "groups", conversations) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "groups", conversations)
}() }()
err = utils.Wrap(getDBConn(c.DB, tx).Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "") err = utils.Wrap(c.DB.Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&conversations).Error, "")
return conversations, err return conversations, err
} }
func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string, tx ...any) (conversation *relation.ConversationModel, err error) { func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) {
cc := &relation.ConversationModel{} cc := &relation.ConversationModel{}
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversation", *conversation) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversation", *conversation)
}() }()
return cc, utils.Wrap(getDBConn(c.DB, tx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "") return cc, utils.Wrap(c.DB.Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "")
} }
func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string, tx ...any) (existUserID []string, err error) { func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string) (existUserID []string, err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userIDList, "existUserID", existUserID) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userIDList, "existUserID", existUserID)
}() }()
return existUserID, utils.Wrap(getDBConn(c.DB, tx).Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "") return existUserID, utils.Wrap(c.DB.Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "")
} }
func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string, tx ...any) (existConversationID []string, err error) { func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "existConversationIDList", existConversationID) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "existConversationIDList", existConversationID)
}() }()
return existConversationID, utils.Wrap(getDBConn(c.DB, tx).Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "") return existConversationID, utils.Wrap(c.DB.Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "")
} }
func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string, tx ...any) (conversationIDList []string, err error) { func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string) (conversationIDList []string, err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversationIDList", conversationIDList) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversationIDList", conversationIDList)
}() }()
return conversationIDList, utils.Wrap(getDBConn(c.DB, tx).Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "") return conversationIDList, utils.Wrap(c.DB.Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "")
} }