diff --git a/internal/common/check/group.go b/internal/common/check/group.go new file mode 100644 index 000000000..56df95e76 --- /dev/null +++ b/internal/common/check/group.go @@ -0,0 +1,17 @@ +package check + +import ( + server_api_params "Open_IM/pkg/proto/sdk_ws" + "errors" +) + +type GroupChecker struct { +} + +func NewGroupChecker() *GroupChecker { + return &GroupChecker{} +} + +func (g *GroupChecker) GetGroupInfo(groupID string) (*server_api_params.GroupInfo, error) { + return nil, errors.New("TODO:GetUserInfo") +} diff --git a/internal/rpc/msg/conversation_notification.go b/internal/rpc/msg/conversation_notification.go index 22e1dedc7..a62d48d4e 100644 --- a/internal/rpc/msg/conversation_notification.go +++ b/internal/rpc/msg/conversation_notification.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/log" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" + "context" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" ) @@ -59,7 +60,7 @@ func ConversationSetPrivateNotification(operationID, sendID, recvID string, isPr } // 会话改变 -func ConversationChangeNotification(operationID, userID string) { +func ConversationChangeNotification(ctx context.Context, userID string) { log.NewInfo(operationID, utils.GetSelfFuncName()) ConversationChangedTips := &open_im_sdk.ConversationUpdateTips{ UserID: userID, diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go new file mode 100644 index 000000000..d7757397e --- /dev/null +++ b/pkg/common/db/cache/conversation.go @@ -0,0 +1,86 @@ +package cache + +import ( + "Open_IM/pkg/common/db/table" + "Open_IM/pkg/utils" + "encoding/json" + "github.com/dtm-labs/rockscache" + "time" +) + +type DBFun func() (string, error) + +type ConversationCache interface { + GetUserConversationIDListFromCache(userID string, fn DBFun) ([]string, error) + DelUserConversationIDListFromCache(userID string) error + GetConversationFromCache(ownerUserID, conversationID string, fn DBFun) (*table.ConversationModel, error) + GetConversationsFromCache(ownerUserID string, conversationIDList []string, fn DBFun) ([]*table.ConversationModel, error) + GetUserAllConversationList(ownerUserID string, fn DBFun) ([]*table.ConversationModel, error) + DelConversationFromCache(ownerUserID, conversationID string) error +} +type ConversationRedis struct { + rcClient *rockscache.Client +} + +func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis { + return &ConversationRedis{rcClient: rcClient} +} + +func (c *ConversationRedis) GetUserConversationIDListFromCache(userID string, fn DBFun) ([]string, error) { + conversationIDListStr, err := c.rcClient.Fetch(conversationIDListCache+userID, time.Second*30*60, fn) + var conversationIDList []string + err = json.Unmarshal([]byte(conversationIDListStr), &conversationIDList) + if err != nil { + return nil, utils.Wrap(err, "") + } + return conversationIDList, nil +} + +func (c *ConversationRedis) DelUserConversationIDListFromCache(userID string) error { + return utils.Wrap(c.rcClient.TagAsDeleted(conversationIDListCache+userID), "DelUserConversationIDListFromCache err") +} + +func (c *ConversationRedis) GetConversationFromCache(ownerUserID, conversationID string, fn DBFun) (*table.ConversationModel, error) { + conversationStr, err := c.rcClient.Fetch(conversationCache+ownerUserID+":"+conversationID, time.Second*30*60, fn) + if err != nil { + return nil, utils.Wrap(err, "Fetch failed") + } + conversation := table.ConversationModel{} + err = json.Unmarshal([]byte(conversationStr), &conversation) + if err != nil { + return nil, utils.Wrap(err, "Unmarshal failed") + } + return &conversation, nil +} + +func (c *ConversationRedis) GetConversationsFromCache(ownerUserID string, conversationIDList []string, fn DBFun) ([]*table.ConversationModel, error) { + var conversationList []*table.ConversationModel + for _, conversationID := range conversationIDList { + conversation, err := c.GetConversationFromCache(ownerUserID, conversationID, fn) + if err != nil { + return nil, utils.Wrap(err, "GetConversationFromCache failed") + } + conversationList = append(conversationList, conversation) + } + return conversationList, nil +} + +func (c *ConversationRedis) GetUserAllConversationList(ownerUserID string, fn DBFun) ([]*table.ConversationModel, error) { + IDList, err := c.GetUserConversationIDListFromCache(ownerUserID, fn) + if err != nil { + return nil, err + } + var conversationList []*table.ConversationModel + for _, conversationID := range IDList { + conversation, err := c.GetConversationFromCache(ownerUserID, conversationID, fn) + if err != nil { + return nil, utils.Wrap(err, "GetConversationFromCache failed") + } + conversationList = append(conversationList, conversation) + } + return conversationList, nil +} + +func (c *ConversationRedis) DelConversationFromCache(ownerUserID, conversationID string) error { + return utils.Wrap(c.rcClient.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") +} diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go new file mode 100644 index 000000000..3ed221c45 --- /dev/null +++ b/pkg/common/db/controller/conversation.go @@ -0,0 +1,119 @@ +package controller + +import ( + "Open_IM/pkg/common/db/cache" + "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" + "context" +) + +type ConversationInterface interface { + //GetUserIDExistConversation 获取拥有该会话的的用户ID列表 + GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) + //UpdateUserConversationFiled 更新用户该会话的属性信息 + UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error + //CreateConversation 创建一批新的会话 + CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error + //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 + SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error + //FindConversations 根据会话ID获取某个用户的多个会话 + FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) + //GetUserAllConversation 获取一个用户在服务器上所有的会话 + GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) + //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 + SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error +} +type ConversationController struct { + database ConversationDataBaseInterface +} + +func NewConversationController(database ConversationDataBaseInterface) *ConversationController { + return &ConversationController{database: database} +} + +func (c *ConversationController) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { + return c.database.GetUserIDExistConversation(ctx, userIDList, conversationID) +} + +func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { + panic("implement me") +} + +func (c ConversationController) CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error { + panic("implement me") +} + +func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error { + panic("implement me") +} + +func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) { + panic("implement me") +} + +func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) { + panic("implement me") +} +func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error { + panic("implement me") +} + +var _ ConversationInterface = (*ConversationController)(nil) + +type ConversationDataBaseInterface interface { + //GetUserIDExistConversation 获取拥有该会话的的用户ID列表 + GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) + //UpdateUserConversationFiled 更新用户该会话的属性信息 + UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error + //CreateConversation 创建一批新的会话 + CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error + //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 + SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error + //FindConversations 根据会话ID获取某个用户的多个会话 + FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) + //GetUserAllConversation 获取一个用户在服务器上所有的会话 + GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) + //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 + SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error +} +type ConversationDataBase struct { + db relation.Conversation + cache cache.ConversationCache +} + +func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { + panic("implement me") +} + +func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { + panic("implement me") +} + +func (c ConversationDataBase) CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error { + panic("implement me") +} + +func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error { + panic("implement me") +} + +func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) { + panic("implement me") +} + +func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) { + panic("implement me") +} + +func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error { + panic("implement me") +} + +func NewConversationDataBase(db relation.Conversation, cache cache.ConversationCache) *ConversationDataBase { + return &ConversationDataBase{db: db, cache: cache} +} + +//func NewConversationController(db *gorm.DB, rdb redis.UniversalClient) ConversationInterface { +// groupController := &ConversationController{database: newGroupDatabase(db, rdb, mgoClient)} +// return groupController +//} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index de2e92663..1071e26af 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -6,24 +6,24 @@ import ( var ConversationDB *gorm.DB -type Conversation struct { - OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` - ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` - ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` - UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` - GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` - RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` - UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"` - DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"` - IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` - IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"` - BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"` - GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"` - IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"` - UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"` - AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` - Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` -} +//type Conversation struct { +// OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` +// ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` +// ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` +// UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` +// GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` +// RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` +// UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"` +// DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"` +// IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` +// IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"` +// BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"` +// GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"` +// IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"` +// UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"` +// AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` +// Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` +//} func (Conversation) TableName() string { return "conversations" diff --git a/pkg/common/db/relation/conversation_model_g.go b/pkg/common/db/relation/conversation_model_g.go new file mode 100644 index 000000000..37882940d --- /dev/null +++ b/pkg/common/db/relation/conversation_model_g.go @@ -0,0 +1,73 @@ +package relation + +import ( + "Open_IM/pkg/common/db/table" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/utils" + "context" + "gorm.io/gorm" +) + +type Conversation interface { + TableName() string + Create(ctx context.Context, conversations []*table.ConversationModel) (err error) + Delete(ctx context.Context, groupIDs []string) (err error) + UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) + Update(ctx context.Context, groups []*table.ConversationModel) (err error) + Find(ctx context.Context, groupIDs []string) (groups []*table.ConversationModel, err error) + Take(ctx context.Context, groupID string) (group *table.ConversationModel, err error) +} +type ConversationGorm struct { + DB *gorm.DB +} + +func (c *ConversationGorm) TableName() string { + panic("implement me") +} + +func NewConversationGorm(DB *gorm.DB) Conversation { + return &ConversationGorm{DB: DB} +} + +func (c *ConversationGorm) Create(ctx context.Context, conversations []*table.ConversationModel) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations) + }() + return utils.Wrap(getDBConn(g.DB, tx).Create(&conversations).Error, "") +} + +func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) + }() + return utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Delete(&table.ConversationModel{}).Error, "") +} + +func (c *ConversationGorm) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "args", args) + }() + return utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Model(g).Updates(args).Error, "") +} + +func (c *ConversationGorm) Update(ctx context.Context, groups []*table.ConversationModel) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups) + }() + return utils.Wrap(getDBConn(g.DB, tx).Updates(&groups).Error, "") +} + +func (c *ConversationGorm) Find(ctx context.Context, groupIDs []string) (groups []*table.ConversationModel, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs, "groups", groups) + }() + return groups, utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Find(&groups).Error, "") +} + +func (c *ConversationGorm) Take(ctx context.Context, groupID string) (group *table.ConversationModel, err error) { + group = &Group{} + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group) + }() + return group, utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Take(group).Error, "") +}