mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-17 08:25:10 +08:00
conversation update
This commit is contained in:
parent
b7f0e2e6f3
commit
730be9db47
17
internal/common/check/group.go
Normal file
17
internal/common/check/group.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package check
|
||||||
|
|
||||||
|
import (
|
||||||
|
server_api_params "Open_IM/pkg/proto/sdk_ws"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GroupChecker struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGroupChecker() *GroupChecker {
|
||||||
|
return &GroupChecker{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GroupChecker) GetGroupInfo(groupID string) (*server_api_params.GroupInfo, error) {
|
||||||
|
return nil, errors.New("TODO:GetUserInfo")
|
||||||
|
}
|
@ -6,6 +6,7 @@ import (
|
|||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
|
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
|
"context"
|
||||||
"github.com/golang/protobuf/jsonpb"
|
"github.com/golang/protobuf/jsonpb"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
@ -59,7 +60,7 @@ func ConversationSetPrivateNotification(operationID, sendID, recvID string, isPr
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 会话改变
|
// 会话改变
|
||||||
func ConversationChangeNotification(operationID, userID string) {
|
func ConversationChangeNotification(ctx context.Context, userID string) {
|
||||||
log.NewInfo(operationID, utils.GetSelfFuncName())
|
log.NewInfo(operationID, utils.GetSelfFuncName())
|
||||||
ConversationChangedTips := &open_im_sdk.ConversationUpdateTips{
|
ConversationChangedTips := &open_im_sdk.ConversationUpdateTips{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
|
86
pkg/common/db/cache/conversation.go
vendored
Normal file
86
pkg/common/db/cache/conversation.go
vendored
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/pkg/common/db/table"
|
||||||
|
"Open_IM/pkg/utils"
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/dtm-labs/rockscache"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DBFun func() (string, error)
|
||||||
|
|
||||||
|
type ConversationCache interface {
|
||||||
|
GetUserConversationIDListFromCache(userID string, fn DBFun) ([]string, error)
|
||||||
|
DelUserConversationIDListFromCache(userID string) error
|
||||||
|
GetConversationFromCache(ownerUserID, conversationID string, fn DBFun) (*table.ConversationModel, error)
|
||||||
|
GetConversationsFromCache(ownerUserID string, conversationIDList []string, fn DBFun) ([]*table.ConversationModel, error)
|
||||||
|
GetUserAllConversationList(ownerUserID string, fn DBFun) ([]*table.ConversationModel, error)
|
||||||
|
DelConversationFromCache(ownerUserID, conversationID string) error
|
||||||
|
}
|
||||||
|
type ConversationRedis struct {
|
||||||
|
rcClient *rockscache.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis {
|
||||||
|
return &ConversationRedis{rcClient: rcClient}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedis) GetUserConversationIDListFromCache(userID string, fn DBFun) ([]string, error) {
|
||||||
|
conversationIDListStr, err := c.rcClient.Fetch(conversationIDListCache+userID, time.Second*30*60, fn)
|
||||||
|
var conversationIDList []string
|
||||||
|
err = json.Unmarshal([]byte(conversationIDListStr), &conversationIDList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
return conversationIDList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedis) DelUserConversationIDListFromCache(userID string) error {
|
||||||
|
return utils.Wrap(c.rcClient.TagAsDeleted(conversationIDListCache+userID), "DelUserConversationIDListFromCache err")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedis) GetConversationFromCache(ownerUserID, conversationID string, fn DBFun) (*table.ConversationModel, error) {
|
||||||
|
conversationStr, err := c.rcClient.Fetch(conversationCache+ownerUserID+":"+conversationID, time.Second*30*60, fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap(err, "Fetch failed")
|
||||||
|
}
|
||||||
|
conversation := table.ConversationModel{}
|
||||||
|
err = json.Unmarshal([]byte(conversationStr), &conversation)
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap(err, "Unmarshal failed")
|
||||||
|
}
|
||||||
|
return &conversation, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedis) GetConversationsFromCache(ownerUserID string, conversationIDList []string, fn DBFun) ([]*table.ConversationModel, error) {
|
||||||
|
var conversationList []*table.ConversationModel
|
||||||
|
for _, conversationID := range conversationIDList {
|
||||||
|
conversation, err := c.GetConversationFromCache(ownerUserID, conversationID, fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap(err, "GetConversationFromCache failed")
|
||||||
|
}
|
||||||
|
conversationList = append(conversationList, conversation)
|
||||||
|
}
|
||||||
|
return conversationList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedis) GetUserAllConversationList(ownerUserID string, fn DBFun) ([]*table.ConversationModel, error) {
|
||||||
|
IDList, err := c.GetUserConversationIDListFromCache(ownerUserID, fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var conversationList []*table.ConversationModel
|
||||||
|
for _, conversationID := range IDList {
|
||||||
|
conversation, err := c.GetConversationFromCache(ownerUserID, conversationID, fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap(err, "GetConversationFromCache failed")
|
||||||
|
}
|
||||||
|
conversationList = append(conversationList, conversation)
|
||||||
|
}
|
||||||
|
return conversationList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationRedis) DelConversationFromCache(ownerUserID, conversationID string) error {
|
||||||
|
return utils.Wrap(c.rcClient.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
|
||||||
|
}
|
119
pkg/common/db/controller/conversation.go
Normal file
119
pkg/common/db/controller/conversation.go
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/pkg/common/db/cache"
|
||||||
|
"Open_IM/pkg/common/db/relation"
|
||||||
|
"Open_IM/pkg/common/db/table"
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConversationInterface interface {
|
||||||
|
//GetUserIDExistConversation 获取拥有该会话的的用户ID列表
|
||||||
|
GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
|
||||||
|
//UpdateUserConversationFiled 更新用户该会话的属性信息
|
||||||
|
UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error
|
||||||
|
//CreateConversation 创建一批新的会话
|
||||||
|
CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error
|
||||||
|
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
|
||||||
|
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error
|
||||||
|
//FindConversations 根据会话ID获取某个用户的多个会话
|
||||||
|
FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error)
|
||||||
|
//GetUserAllConversation 获取一个用户在服务器上所有的会话
|
||||||
|
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error)
|
||||||
|
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
|
||||||
|
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error
|
||||||
|
}
|
||||||
|
type ConversationController struct {
|
||||||
|
database ConversationDataBaseInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConversationController(database ConversationDataBaseInterface) *ConversationController {
|
||||||
|
return &ConversationController{database: database}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationController) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
|
||||||
|
return c.database.GetUserIDExistConversation(ctx, userIDList, conversationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationController) CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ ConversationInterface = (*ConversationController)(nil)
|
||||||
|
|
||||||
|
type ConversationDataBaseInterface interface {
|
||||||
|
//GetUserIDExistConversation 获取拥有该会话的的用户ID列表
|
||||||
|
GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
|
||||||
|
//UpdateUserConversationFiled 更新用户该会话的属性信息
|
||||||
|
UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error
|
||||||
|
//CreateConversation 创建一批新的会话
|
||||||
|
CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error
|
||||||
|
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
|
||||||
|
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error
|
||||||
|
//FindConversations 根据会话ID获取某个用户的多个会话
|
||||||
|
FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error)
|
||||||
|
//GetUserAllConversation 获取一个用户在服务器上所有的会话
|
||||||
|
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error)
|
||||||
|
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
|
||||||
|
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error
|
||||||
|
}
|
||||||
|
type ConversationDataBase struct {
|
||||||
|
db relation.Conversation
|
||||||
|
cache cache.ConversationCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConversationDataBase(db relation.Conversation, cache cache.ConversationCache) *ConversationDataBase {
|
||||||
|
return &ConversationDataBase{db: db, cache: cache}
|
||||||
|
}
|
||||||
|
|
||||||
|
//func NewConversationController(db *gorm.DB, rdb redis.UniversalClient) ConversationInterface {
|
||||||
|
// groupController := &ConversationController{database: newGroupDatabase(db, rdb, mgoClient)}
|
||||||
|
// return groupController
|
||||||
|
//}
|
@ -6,24 +6,24 @@ import (
|
|||||||
|
|
||||||
var ConversationDB *gorm.DB
|
var ConversationDB *gorm.DB
|
||||||
|
|
||||||
type Conversation struct {
|
//type Conversation struct {
|
||||||
OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
|
// OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
|
||||||
ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
|
// ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
|
||||||
ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"`
|
// ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"`
|
||||||
UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
|
// UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
|
||||||
GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
|
// GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
|
||||||
RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
|
// RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
|
||||||
UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"`
|
// UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"`
|
||||||
DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"`
|
// DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"`
|
||||||
IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
|
// IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
|
||||||
IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
|
// IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
|
||||||
BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
|
// BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
|
||||||
GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
|
// GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
|
||||||
IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"`
|
// IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"`
|
||||||
UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"`
|
// UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"`
|
||||||
AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
|
// AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
|
||||||
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
|
// Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
|
||||||
}
|
//}
|
||||||
|
|
||||||
func (Conversation) TableName() string {
|
func (Conversation) TableName() string {
|
||||||
return "conversations"
|
return "conversations"
|
||||||
|
73
pkg/common/db/relation/conversation_model_g.go
Normal file
73
pkg/common/db/relation/conversation_model_g.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package relation
|
||||||
|
|
||||||
|
import (
|
||||||
|
"Open_IM/pkg/common/db/table"
|
||||||
|
"Open_IM/pkg/common/tracelog"
|
||||||
|
"Open_IM/pkg/utils"
|
||||||
|
"context"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Conversation interface {
|
||||||
|
TableName() string
|
||||||
|
Create(ctx context.Context, conversations []*table.ConversationModel) (err error)
|
||||||
|
Delete(ctx context.Context, groupIDs []string) (err error)
|
||||||
|
UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error)
|
||||||
|
Update(ctx context.Context, groups []*table.ConversationModel) (err error)
|
||||||
|
Find(ctx context.Context, groupIDs []string) (groups []*table.ConversationModel, err error)
|
||||||
|
Take(ctx context.Context, groupID string) (group *table.ConversationModel, err error)
|
||||||
|
}
|
||||||
|
type ConversationGorm struct {
|
||||||
|
DB *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) TableName() string {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConversationGorm(DB *gorm.DB) Conversation {
|
||||||
|
return &ConversationGorm{DB: DB}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) Create(ctx context.Context, conversations []*table.ConversationModel) (err error) {
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations)
|
||||||
|
}()
|
||||||
|
return utils.Wrap(getDBConn(g.DB, tx).Create(&conversations).Error, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) {
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs)
|
||||||
|
}()
|
||||||
|
return utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Delete(&table.ConversationModel{}).Error, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "args", args)
|
||||||
|
}()
|
||||||
|
return utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Model(g).Updates(args).Error, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) Update(ctx context.Context, groups []*table.ConversationModel) (err error) {
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups)
|
||||||
|
}()
|
||||||
|
return utils.Wrap(getDBConn(g.DB, tx).Updates(&groups).Error, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) Find(ctx context.Context, groupIDs []string) (groups []*table.ConversationModel, err error) {
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs, "groups", groups)
|
||||||
|
}()
|
||||||
|
return groups, utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Find(&groups).Error, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConversationGorm) Take(ctx context.Context, groupID string) (group *table.ConversationModel, err error) {
|
||||||
|
group = &Group{}
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
|
||||||
|
}()
|
||||||
|
return group, utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Take(group).Error, "")
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user