mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
add conversationCache
This commit is contained in:
parent
840e08fc8a
commit
ee577cae4c
@ -5,6 +5,7 @@ import (
|
|||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db"
|
"Open_IM/pkg/common/db"
|
||||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||||
|
rocksCache "Open_IM/pkg/common/db/rocks_cache"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||||
pbConversation "Open_IM/pkg/proto/conversation"
|
pbConversation "Open_IM/pkg/proto/conversation"
|
||||||
@ -88,6 +89,7 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notification
|
// notification
|
||||||
if req.Conversation.ConversationType == constant.SingleChatType && req.FieldType == constant.FieldIsPrivateChat {
|
if req.Conversation.ConversationType == constant.SingleChatType && req.FieldType == constant.FieldIsPrivateChat {
|
||||||
//sync peer user conversation if conversation is singleChatType
|
//sync peer user conversation if conversation is singleChatType
|
||||||
@ -96,13 +98,20 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo
|
|||||||
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if isSyncConversation {
|
if isSyncConversation {
|
||||||
for _, v := range req.UserIDList {
|
for _, v := range req.UserIDList {
|
||||||
|
if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil {
|
||||||
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
|
||||||
|
}
|
||||||
chat.ConversationChangeNotification(req.OperationID, v)
|
chat.ConversationChangeNotification(req.OperationID, v)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for _, v := range req.UserIDList {
|
for _, v := range req.UserIDList {
|
||||||
|
if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil {
|
||||||
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
|
||||||
|
}
|
||||||
chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID)
|
chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -132,6 +141,10 @@ func syncPeerUserConversation(conversation *pbConversation.Conversation, operati
|
|||||||
log.NewError(operationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
log.NewError(operationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
err = rocksCache.DelConversationFromCache(conversation.UserID, utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType))
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error())
|
||||||
|
}
|
||||||
chat.ConversationSetPrivateNotification(operationID, conversation.OwnerUserID, conversation.UserID, conversation.IsPrivateChat)
|
chat.ConversationSetPrivateNotification(operationID, conversation.OwnerUserID, conversation.UserID, conversation.IsPrivateChat)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -147,11 +147,21 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc
|
|||||||
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
if err := imdb.SetConversation(conversation); err != nil {
|
|
||||||
|
isUpdate, err := imdb.SetConversation(conversation)
|
||||||
|
if err != nil {
|
||||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
||||||
resp.Failed = append(resp.Failed, v.ConversationID)
|
resp.Failed = append(resp.Failed, v.ConversationID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if isUpdate {
|
||||||
|
err = rocksCache.DelConversationFromCache(v.OwnerUserID, v.ConversationID)
|
||||||
|
} else {
|
||||||
|
err = rocksCache.DelUserConversationIDListFromCache(v.OwnerUserID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), v.ConversationID, v.OwnerUserID)
|
||||||
|
}
|
||||||
resp.Success = append(resp.Success, v.ConversationID)
|
resp.Success = append(resp.Success, v.ConversationID)
|
||||||
// if is set private msg operation,then peer user need to sync and set tips\
|
// if is set private msg operation,then peer user need to sync and set tips\
|
||||||
if v.ConversationType == constant.SingleChatType && req.NotificationType == constant.ConversationPrivateChatNotification {
|
if v.ConversationType == constant.SingleChatType && req.NotificationType == constant.ConversationPrivateChatNotification {
|
||||||
@ -169,7 +179,7 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc
|
|||||||
func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAllConversationsReq) (*pbUser.GetAllConversationsResp, error) {
|
func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAllConversationsReq) (*pbUser.GetAllConversationsResp, error) {
|
||||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||||
resp := &pbUser.GetAllConversationsResp{Conversations: []*pbUser.Conversation{}}
|
resp := &pbUser.GetAllConversationsResp{Conversations: []*pbUser.Conversation{}}
|
||||||
conversations, err := imdb.GetUserAllConversations(req.OwnerUserID)
|
conversations, err := rocksCache.GetUserAllConversationList(req.OwnerUserID)
|
||||||
log.NewDebug(req.OperationID, "conversations: ", conversations)
|
log.NewDebug(req.OperationID, "conversations: ", conversations)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversations error", err.Error())
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversations error", err.Error())
|
||||||
@ -187,7 +197,7 @@ func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAll
|
|||||||
func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConversationReq) (*pbUser.GetConversationResp, error) {
|
func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConversationReq) (*pbUser.GetConversationResp, error) {
|
||||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||||
resp := &pbUser.GetConversationResp{Conversation: &pbUser.Conversation{}}
|
resp := &pbUser.GetConversationResp{Conversation: &pbUser.Conversation{}}
|
||||||
conversation, err := imdb.GetConversation(req.OwnerUserID, req.ConversationID)
|
conversation, err := rocksCache.GetConversationFromCache(req.OwnerUserID, req.ConversationID)
|
||||||
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation)
|
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversation error", err.Error())
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversation error", err.Error())
|
||||||
@ -205,7 +215,7 @@ func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConvers
|
|||||||
func (s *userServer) GetConversations(ctx context.Context, req *pbUser.GetConversationsReq) (*pbUser.GetConversationsResp, error) {
|
func (s *userServer) GetConversations(ctx context.Context, req *pbUser.GetConversationsReq) (*pbUser.GetConversationsResp, error) {
|
||||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||||
resp := &pbUser.GetConversationsResp{Conversations: []*pbUser.Conversation{}}
|
resp := &pbUser.GetConversationsResp{Conversations: []*pbUser.Conversation{}}
|
||||||
conversations, err := imdb.GetConversations(req.OwnerUserID, req.ConversationIDs)
|
conversations, err := rocksCache.GetConversationsFromCache(req.OwnerUserID, req.ConversationIDs)
|
||||||
log.NewDebug("", utils.GetSelfFuncName(), "conversations", conversations)
|
log.NewDebug("", utils.GetSelfFuncName(), "conversations", conversations)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversations error", err.Error())
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversations error", err.Error())
|
||||||
@ -249,12 +259,21 @@ func (s *userServer) SetConversation(ctx context.Context, req *pbUser.SetConvers
|
|||||||
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
err := imdb.SetConversation(conversation)
|
isUpdate, err := imdb.SetConversation(conversation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
||||||
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
if isUpdate {
|
||||||
|
err = rocksCache.DelConversationFromCache(req.Conversation.OwnerUserID ,req.Conversation.ConversationID)
|
||||||
|
} else {
|
||||||
|
err = rocksCache.DelUserConversationIDListFromCache(req.Conversation.OwnerUserID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.Conversation.ConversationID, req.Conversation.OwnerUserID)
|
||||||
|
}
|
||||||
|
|
||||||
// notification
|
// notification
|
||||||
if req.Conversation.ConversationType == constant.SingleChatType && req.NotificationType == constant.ConversationPrivateChatNotification {
|
if req.Conversation.ConversationType == constant.SingleChatType && req.NotificationType == constant.ConversationPrivateChatNotification {
|
||||||
//sync peer user conversation if conversation is singleChatType
|
//sync peer user conversation if conversation is singleChatType
|
||||||
@ -299,6 +318,8 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp
|
|||||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
|
||||||
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
if err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID); err != nil {
|
||||||
|
log.NewError(req.OperationID, utils.GetSelfFuncName(), conversation.ConversationID, err.Error())
|
||||||
}
|
}
|
||||||
chat.ConversationChangeNotification(req.OperationID, req.OwnerUserID)
|
chat.ConversationChangeNotification(req.OperationID, req.OwnerUserID)
|
||||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
|
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
|
||||||
@ -306,7 +327,7 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *userServer) DeleteUsers(_ context.Context, req *pbUser.DeleteUsersReq) (*pbUser.DeleteUsersResp, error) {
|
func (s *userServer) DeleteUsers(ctx context.Context, req *pbUser.DeleteUsersReq) (*pbUser.DeleteUsersResp, error) {
|
||||||
log.NewInfo(req.OperationID, "DeleteUsers args ", req.String())
|
log.NewInfo(req.OperationID, "DeleteUsers args ", req.String())
|
||||||
if !token_verify.IsManagerUserID(req.OpUserID) {
|
if !token_verify.IsManagerUserID(req.OpUserID) {
|
||||||
log.NewError(req.OperationID, "IsManagerUserID false ", req.OpUserID)
|
log.NewError(req.OperationID, "IsManagerUserID false ", req.OpUserID)
|
||||||
|
@ -6,16 +6,18 @@ import (
|
|||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func SetConversation(conversation db.Conversation) error {
|
func SetConversation(conversation db.Conversation) (bool, error) {
|
||||||
|
var isUpdate bool
|
||||||
newConversation := conversation
|
newConversation := conversation
|
||||||
if db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Find(&newConversation).RowsAffected == 0 {
|
if db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Find(&newConversation).RowsAffected == 0 {
|
||||||
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "not exist in db, create")
|
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "not exist in db, create")
|
||||||
return db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error
|
return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error
|
||||||
// if exist, then update record
|
// if exist, then update record
|
||||||
} else {
|
} else {
|
||||||
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "exist in db, update")
|
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "exist in db, update")
|
||||||
//force update
|
//force update
|
||||||
return db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
|
isUpdate = true
|
||||||
|
return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
|
||||||
Updates(map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt, "is_pinned": conversation.IsPinned, "is_private_chat": conversation.IsPrivateChat,
|
Updates(map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt, "is_pinned": conversation.IsPinned, "is_private_chat": conversation.IsPrivateChat,
|
||||||
"group_at_type": conversation.GroupAtType, "is_not_in_group": conversation.IsNotInGroup}).Error
|
"group_at_type": conversation.GroupAtType, "is_not_in_group": conversation.IsNotInGroup}).Error
|
||||||
}
|
}
|
||||||
|
@ -465,7 +465,7 @@ func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversat
|
|||||||
bytes, err := json.Marshal(conversation)
|
bytes, err := json.Marshal(conversation)
|
||||||
return string(bytes), utils.Wrap(err, "")
|
return string(bytes), utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
conversationStr, err := db.DB.Rc.Fetch(conversationCache+conversationID, time.Second*30*60, getConversation)
|
conversationStr, err := db.DB.Rc.Fetch(conversationCache+ownerUserID+":"+conversationID, time.Second*30*60, getConversation)
|
||||||
conversation := db.Conversation{}
|
conversation := db.Conversation{}
|
||||||
err = json.Unmarshal([]byte(conversationStr), &conversation)
|
err = json.Unmarshal([]byte(conversationStr), &conversation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -474,6 +474,18 @@ func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversat
|
|||||||
return &conversation, nil
|
return &conversation, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetConversationsFromCache(ownerUserID string, conversationIDList []string) ([]db.Conversation, error) {
|
||||||
|
var conversationList []db.Conversation
|
||||||
|
for _, conversationID := range conversationIDList {
|
||||||
|
conversation, err := GetConversationFromCache(ownerUserID, conversationID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, utils.Wrap(err, "GetConversationFromCache failed")
|
||||||
|
}
|
||||||
|
conversationList = append(conversationList, *conversation)
|
||||||
|
}
|
||||||
|
return conversationList, nil
|
||||||
|
}
|
||||||
|
|
||||||
func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) {
|
func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) {
|
||||||
IDList, err := GetUserConversationIDListFromCache(ownerUserID)
|
IDList, err := GetUserConversationIDListFromCache(ownerUserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -490,6 +502,6 @@ func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) {
|
|||||||
return conversationList, nil
|
return conversationList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func DelConversationFromCache(conversationID string) error {
|
func DelConversationFromCache(ownerUserID, conversationID string) error {
|
||||||
return db.DB.Rc.TagAsDeleted(conversationCache + conversationID)
|
return db.DB.Rc.TagAsDeleted(conversationCache + ownerUserID + ":" + conversationID)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user