diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index fc9702ebc..5fcab9eb4 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" @@ -88,6 +89,7 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo return resp, nil } } + // notification if req.Conversation.ConversationType == constant.SingleChatType && req.FieldType == constant.FieldIsPrivateChat { //sync peer user conversation if conversation is singleChatType @@ -96,13 +98,20 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil } + } else { if isSyncConversation { for _, v := range req.UserIDList { + if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error()) + } chat.ConversationChangeNotification(req.OperationID, v) } } else { for _, v := range req.UserIDList { + if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error()) + } chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID) } } @@ -132,6 +141,10 @@ func syncPeerUserConversation(conversation *pbConversation.Conversation, operati log.NewError(operationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) return err } + err = rocksCache.DelConversationFromCache(conversation.UserID, utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType)) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error()) + } chat.ConversationSetPrivateNotification(operationID, conversation.OwnerUserID, conversation.UserID, conversation.IsPrivateChat) return nil } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 890b8e90d..4c86c36f8 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -147,11 +147,21 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil } - if err := imdb.SetConversation(conversation); err != nil { + + isUpdate, err := imdb.SetConversation(conversation) + if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) resp.Failed = append(resp.Failed, v.ConversationID) continue } + if isUpdate { + err = rocksCache.DelConversationFromCache(v.OwnerUserID, v.ConversationID) + } else { + err = rocksCache.DelUserConversationIDListFromCache(v.OwnerUserID) + } + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), v.ConversationID, v.OwnerUserID) + } resp.Success = append(resp.Success, v.ConversationID) // if is set private msg operation,then peer user need to sync and set tips\ if v.ConversationType == constant.SingleChatType && req.NotificationType == constant.ConversationPrivateChatNotification { @@ -169,7 +179,7 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAllConversationsReq) (*pbUser.GetAllConversationsResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp := &pbUser.GetAllConversationsResp{Conversations: []*pbUser.Conversation{}} - conversations, err := imdb.GetUserAllConversations(req.OwnerUserID) + conversations, err := rocksCache.GetUserAllConversationList(req.OwnerUserID) log.NewDebug(req.OperationID, "conversations: ", conversations) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversations error", err.Error()) @@ -187,7 +197,7 @@ func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAll func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConversationReq) (*pbUser.GetConversationResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp := &pbUser.GetConversationResp{Conversation: &pbUser.Conversation{}} - conversation, err := imdb.GetConversation(req.OwnerUserID, req.ConversationID) + conversation, err := rocksCache.GetConversationFromCache(req.OwnerUserID, req.ConversationID) log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversation error", err.Error()) @@ -205,7 +215,7 @@ func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConvers func (s *userServer) GetConversations(ctx context.Context, req *pbUser.GetConversationsReq) (*pbUser.GetConversationsResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp := &pbUser.GetConversationsResp{Conversations: []*pbUser.Conversation{}} - conversations, err := imdb.GetConversations(req.OwnerUserID, req.ConversationIDs) + conversations, err := rocksCache.GetConversationsFromCache(req.OwnerUserID, req.ConversationIDs) log.NewDebug("", utils.GetSelfFuncName(), "conversations", conversations) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetConversations error", err.Error()) @@ -249,12 +259,21 @@ func (s *userServer) SetConversation(ctx context.Context, req *pbUser.SetConvers resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil } - err := imdb.SetConversation(conversation) + isUpdate, err := imdb.SetConversation(conversation) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil } + if isUpdate { + err = rocksCache.DelConversationFromCache(req.Conversation.OwnerUserID ,req.Conversation.ConversationID) + } else { + err = rocksCache.DelUserConversationIDListFromCache(req.Conversation.OwnerUserID) + } + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.Conversation.ConversationID, req.Conversation.OwnerUserID) + } + // notification if req.Conversation.ConversationType == constant.SingleChatType && req.NotificationType == constant.ConversationPrivateChatNotification { //sync peer user conversation if conversation is singleChatType @@ -299,6 +318,8 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} return resp, nil + if err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID); err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), conversation.ConversationID, err.Error()) } chat.ConversationChangeNotification(req.OperationID, req.OwnerUserID) log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) @@ -306,7 +327,7 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp return resp, nil } -func (s *userServer) DeleteUsers(_ context.Context, req *pbUser.DeleteUsersReq) (*pbUser.DeleteUsersResp, error) { +func (s *userServer) DeleteUsers(ctx context.Context, req *pbUser.DeleteUsersReq) (*pbUser.DeleteUsersResp, error) { log.NewInfo(req.OperationID, "DeleteUsers args ", req.String()) if !token_verify.IsManagerUserID(req.OpUserID) { log.NewError(req.OperationID, "IsManagerUserID false ", req.OpUserID) diff --git a/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go b/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go index 98a97653e..0f1b7374a 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/conversation_model.go @@ -6,16 +6,18 @@ import ( "Open_IM/pkg/utils" ) -func SetConversation(conversation db.Conversation) error { +func SetConversation(conversation db.Conversation) (bool, error) { + var isUpdate bool newConversation := conversation if db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Find(&newConversation).RowsAffected == 0 { log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "not exist in db, create") - return db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error + return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error // if exist, then update record } else { log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "exist in db, update") //force update - return db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). + isUpdate = true + return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). Updates(map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt, "is_pinned": conversation.IsPinned, "is_private_chat": conversation.IsPrivateChat, "group_at_type": conversation.GroupAtType, "is_not_in_group": conversation.IsNotInGroup}).Error } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index af6e75e21..103bd51f7 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -465,7 +465,7 @@ func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversat bytes, err := json.Marshal(conversation) return string(bytes), utils.Wrap(err, "") } - conversationStr, err := db.DB.Rc.Fetch(conversationCache+conversationID, time.Second*30*60, getConversation) + conversationStr, err := db.DB.Rc.Fetch(conversationCache+ownerUserID+":"+conversationID, time.Second*30*60, getConversation) conversation := db.Conversation{} err = json.Unmarshal([]byte(conversationStr), &conversation) if err != nil { @@ -474,6 +474,18 @@ func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversat return &conversation, nil } +func GetConversationsFromCache(ownerUserID string, conversationIDList []string) ([]db.Conversation, error) { + var conversationList []db.Conversation + for _, conversationID := range conversationIDList { + conversation, err := GetConversationFromCache(ownerUserID, conversationID) + if err != nil { + return nil, utils.Wrap(err, "GetConversationFromCache failed") + } + conversationList = append(conversationList, *conversation) + } + return conversationList, nil +} + func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) { IDList, err := GetUserConversationIDListFromCache(ownerUserID) if err != nil { @@ -490,6 +502,6 @@ func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) { return conversationList, nil } -func DelConversationFromCache(conversationID string) error { - return db.DB.Rc.TagAsDeleted(conversationCache + conversationID) +func DelConversationFromCache(ownerUserID, conversationID string) error { + return db.DB.Rc.TagAsDeleted(conversationCache + ownerUserID + ":" + conversationID) }