From b75507396ba07e47b075bc689ee255c077be83b1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Dec 2022 19:34:54 +0800 Subject: [PATCH] EX MSG --- internal/cron_task/clear_msg.go | 41 +++++---- internal/cron_task/cron_task.go | 4 +- internal/rpc/group/group.go | 4 +- pkg/common/db/extend_msg_mongo_model.go | 111 +++++++++++++++++------ pkg/common/db/mongoModel.go | 9 +- pkg/common/db/rocks_cache/rocks_cache.go | 62 +++++++++++++ 6 files changed, 177 insertions(+), 54 deletions(-) diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 481b3b104..c8885a9ef 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -82,7 +82,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() uint32 { // index 0....19(del) 20...69 // seq 70 // set minSeq 21 -// recursion +// recursion 删除list并且返回设置的最小seq func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMsgRecursionStruct) (uint32, error) { // find from oldest list msgs, err := db.DB.GetUserMsgListByIndex(ID, index) @@ -105,11 +105,13 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if len(msgs.Msg) > db.GetSingleGocMsgNum() { log.NewWarn(operationID, utils.GetSelfFuncName(), "msgs too large", len(msgs.Msg), msgs.UID) } + var hasMsgDoNotNeedDel bool for i, msg := range msgs.Msg { // 找到列表中不需要删除的消息了, 表示为递归到最后一个块 if utils.GetCurrentTimestampByMill() < msg.SendTime+(int64(config.Config.Mongo.DBRetainChatRecords)*24*60*60*1000) { log.NewDebug(operationID, ID, "find uid", msgs.UID) // 删除块失败 递归结束 返回0 + hasMsgDoNotNeedDel = true if err := delMongoMsgsPhysical(delStruct.delUidList); err != nil { return 0, err } @@ -120,7 +122,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs } // 如果不是块中第一个,就把前面比他早插入的全部设置空 seq字段除外。 if i > 0 { - err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) + delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1) if err != nil { log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i) return delStruct.getSetMinSeq(), utils.Wrap(err, "") @@ -128,6 +130,10 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs } // 递归结束 return msgPb.Seq, nil + } else { + if !msgListIsFull(msgs) { + + } } } // 该列表中消息全部为老消息并且列表满了, 加入删除列表继续递归 @@ -141,14 +147,24 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs if msgListIsFull(msgs) { log.NewDebug(operationID, "msg list is full", msgs.UID) delStruct.delUidList = append(delStruct.delUidList, msgs.UID) + } else { + // 列表没有满且没有不需要被删除的消息 代表他是最新的消息块 + if !hasMsgDoNotNeedDel { + delStruct.minSeq, err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, len(msgs.Msg)-1) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, "Index:", len(msgs.Msg)-1) + err = delMongoMsgsPhysical(delStruct.delUidList) + if err != nil { + return delStruct.getSetMinSeq(), err + } + return delStruct.getSetMinSeq(), nil + } + } } - log.NewDebug(operationID, ID, "continue", delStruct) + log.NewDebug(operationID, ID, "continue to", delStruct) // 继续递归 index+1 seq, err := deleteMongoMsg(operationID, ID, index+1, delStruct) - if err != nil { - return seq, utils.Wrap(err, "deleteMongoMsg failed") - } - return seq, nil + return seq, utils.Wrap(err, "deleteMongoMsg failed") } func msgListIsFull(chat *db.UserChat) bool { @@ -164,18 +180,11 @@ func msgListIsFull(chat *db.UserChat) bool { return false } -func CheckGroupUserMinSeq(operationID, groupID, userID string, diffusionType int) error { +func CheckGroupUserMinSeq(operationID, groupID, userID string) error { return nil } -func CheckUserMinSeqWithMongo(operationID, userID string, diffusionType int) error { - //var seqRedis uint64 - //var err error - //if diffusionType == constant.WriteDiffusion { - // seqRedis, err = db.DB.GetUserMinSeq(ID) - //} else { - // seqRedis, err = db.DB.GetGroupUserMinSeq(ID) - //} +func CheckUserMinSeqWithMongo(operationID, userID string) error { return nil } diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index f08cc3db4..9e4bcbfd4 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -77,7 +77,7 @@ func StartClearMsg(operationID string, userIDList []string) { if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } - if err := CheckUserMinSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { + if err := CheckUserMinSeqWithMongo(operationID, userID); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), userID, err) } } @@ -99,7 +99,7 @@ func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } for _, userID := range userIDList { - if err := CheckGroupUserMinSeq(operationID, groupID, userID, constant.ReadDiffusion); err != nil { + if err := CheckGroupUserMinSeq(operationID, groupID, userID); err != nil { log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) } } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 4945bed83..ce83fa14f 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -178,13 +178,13 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR utils.CopyStructFields(&groupMember, us) callbackResp := CallbackBeforeMemberJoinGroup(req.OperationID, &groupMember, groupInfo.Ex) if callbackResp.ErrCode != 0 { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp) + log.NewError(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup resp: ", callbackResp) } if callbackResp.ActionCode != constant.ActionAllow { if callbackResp.ErrCode == 0 { callbackResp.ErrCode = 201 } - log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup result", "end rpc and return", callbackResp) return &pbGroup.CreateGroupResp{ ErrCode: int32(callbackResp.ErrCode), ErrMsg: callbackResp.ErrMsg, diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index 1edc5cc25..551009e07 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -1,8 +1,17 @@ package db +import ( + "Open_IM/pkg/common/config" + "context" + "strconv" + "time" +) + +const cExtendMsgSet = "extend_msg_set" + type ExtendMsgSet struct { ID string `bson:"id" json:"ID"` - ExtendMsg []*ExtendMsg `bson:"extend_msg" json:"extendMsg"` + ExtendMsgs []*ExtendMsg `bson:"extend_msg" json:"extendMsg"` LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` AttachedInfo string `bson:"attached_info" json:"attachedInfo"` Ex string `bson:"ex" json:"ex"` @@ -10,40 +19,82 @@ type ExtendMsgSet struct { CreateTime int32 `bson:"create_time" json:"createTime"` } +type ReactionExtendMsgSet struct { + TypeKey string `bson:"type_key" json:"typeKey"` + Value string `bson:"value" json:"value"` +} + type ExtendMsg struct { - SendID string `bson:"send_id" json:"sendID"` - ServerMsgID string `bson:"server_msg_id" json:"serverMsgID"` - Ex string `bson:"ex" json:"ex"` - AttachedInfo string `bson:"attached_info" json:"attachedInfo"` - LikeUserIDList []string `bson:"like_user_id_list" json:"likeUserIDList"` - Content string `bson:"content" json:"content"` - ExtendMsgComments []*ExtendMsgComment `bson:"extend_msg_comments" json:"extendMsgComment"` - Vote *Vote `bson:"vote" json:"vote"` - Urls []string `bson:"urls" json:"urls"` - CreateTime int32 `bson:"create_time" json:"createTime"` + Content []*ReactionExtendMsgSet `bson:"content" json:"content"` + ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` + CreateTime int32 `bson:"create_time" json:"createTime"` } -type Vote struct { - Content string `bson:"content" json:"content"` - AttachedInfo string `bson:"attached_info" json:"attachedInfo"` - Ex string `bson:"ex" json:"ex"` - Options []*Options `bson:"options" json:"options"` +//type Vote struct { +// Content string `bson:"content" json:"content"` +// AttachedInfo string `bson:"attached_info" json:"attachedInfo"` +// Ex string `bson:"ex" json:"ex"` +// Options []*Options `bson:"options" json:"options"` +//} +// +//type Options struct { +// Content string `bson:"content" json:"content"` +// AttachedInfo string `bson:"attached_info" json:"attachedInfo"` +// Ex string `bson:"ex" json:"ex"` +// VoteUserIDList []string `bson:"vote_user_id_list" json:"voteUserIDList"` +//} +// +//type ExtendMsgComment struct { +// UserID string `bson:"user_id" json:"userID"` +// ReplyUserID string `bson:"reply_user_id" json:"replyUserID"` +// ReplyContentID string `bson:"reply_content_id" json:"replyContentID"` +// ContentID string `bson:"content_id" json:"contentID"` +// Content string `bson:"content" json:"content"` +// CreateTime int32 `bson:"create_time" json:"createTime"` +// AttachedInfo string `bson:"attached_info" json:"attachedInfo"` +// Ex string `bson:"ex" json:"ex"` +//} + +func GetExtendMsgSetID(ID string, index int32) string { + return ID + ":" + strconv.Itoa(int(index)) } -type Options struct { - Content string `bson:"content" json:"content"` - AttachedInfo string `bson:"attached_info" json:"attachedInfo"` - Ex string `bson:"ex" json:"ex"` - VoteUserIDList []string `bson:"vote_user_id_list" json:"voteUserIDList"` +func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + _, err := c.InsertOne(ctx, set) + return err } -type ExtendMsgComment struct { - UserID string `bson:"user_id" json:"userID"` - ReplyUserID string `bson:"reply_user_id" json:"replyUserID"` - ReplyContentID string `bson:"reply_content_id" json:"replyContentID"` - ContentID string `bson:"content_id" json:"contentID"` - Content string `bson:"content" json:"content"` - CreateTime int32 `bson:"create_time" json:"createTime"` - AttachedInfo string `bson:"attached_info" json:"attachedInfo"` - Ex string `bson:"ex" json:"ex"` +func (d *DataBases) GetAllExtendMsgSet(ID string) ([]*ExtendMsgSet, error) { + //ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + //c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + +} + +type GetExtendMsgSetOpts struct { + IncludeExtendMsgs bool +} + +func (d *DataBases) GetExtendMsgSet(ID string, index int32, opts *GetExtendMsgSetOpts) (*ExtendMsgSet, error) { + //ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + //c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) +} + +func (d *DataBases) InsertExtendMsg(ID string, msg *ExtendMsg) error { + //ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + //c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + return nil +} + +func (d *DataBases) UpdateOneExtendMsgSet(ID string, index, MsgIndex int32, msg *ExtendMsg, msgSet *ExtendMsgSet) error { + //ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + //c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + return nil +} + +func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) ([]*ExtendMsgSet, error) { + //ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + //c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + return nil, nil } diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 5bd96032e..8ae026522 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -291,13 +291,13 @@ func (d *DataBases) DelMongoMsgs(IDList []string) error { return err } -func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error { +func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) userChat := &UserChat{} - err := c.FindOne(ctx, bson.M{"uid": suffixID}).Decode(&userChat) + err = c.FindOne(ctx, bson.M{"uid": suffixID}).Decode(&userChat) if err != nil { - return err + return 0, err } for i, msg := range userChat.Msg { if i <= index { @@ -312,10 +312,11 @@ func (d *DataBases) ReplaceMsgToBlankByIndex(suffixID string, index int) error { } msg.Msg = bytes msg.SendTime = 0 + replaceMaxSeq = msgPb.Seq } } _, err = c.UpdateOne(ctx, bson.M{"uid": suffixID}, bson.M{"$set": bson.M{"msg": userChat.Msg}}) - return err + return replaceMaxSeq, err } func (d *DataBases) GetNewestMsg(ID string) (msg *open_im_sdk.MsgData, err error) { diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index cda798100..e95557ad3 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -33,6 +33,8 @@ const ( groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:" conversationCache = "CONVERSATION_CACHE:" conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:" + extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" + extendMsgCache = "EXTEND_MSG_CACHE:" ) func DelKeys() { @@ -564,3 +566,63 @@ func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) { func DelConversationFromCache(ownerUserID, conversationID string) error { return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") } + +func GetExtendMsgSetFromCache(ID string, index int32) (*db.ExtendMsgSet, error) { + getExtendMsgSet := func() (string, error) { + extendMsgSet, err := db.DB.GetExtendMsgSet(ID, index, &db.GetExtendMsgSetOpts{IncludeExtendMsgs: false}) + if err != nil { + return "", utils.Wrap(err, "GetExtendMsgSet failed") + } + bytes, err := json.Marshal(extendMsgSet) + if err != nil { + return "", utils.Wrap(err, "Marshal failed") + } + return string(bytes), nil + } + extendMsgSetStr, err := db.DB.Rc.Fetch(extendMsgSetCache+db.GetExtendMsgSetID(ID, index), time.Second*30*60, getExtendMsgSet) + if err != nil { + return nil, utils.Wrap(err, "Fetch failed") + } + extendMsgSet := &db.ExtendMsgSet{} + err = json.Unmarshal([]byte(extendMsgSetStr), extendMsgSet) + if err != nil { + return nil, utils.Wrap(err, "Unmarshal failed") + } + return extendMsgSet, nil +} + +func DelExtendMsgSetFromCache(ID string, index int32) error { + return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgSetCache+db.GetExtendMsgSetID(ID, index)), "DelExtendMsgSetFromCache err") +} + +func GetExtendMsg(ID string, index, extendMsgIndex int32) (*db.ExtendMsg, error) { + getExtendMsg := func() (string, error) { + extendMsg, err := db.DB.GetExtendMsgList(ID, index, extendMsgIndex, extendMsgIndex+1) + if err != nil { + return "", utils.Wrap(err, "GetExtendMsgList failed") + } + if len(extendMsg) == 0 { + return "", nil + } + bytes, err := json.Marshal(extendMsg[0]) + if err != nil { + return "", utils.Wrap(err, "Marshal failed") + } + return string(bytes), nil + } + + extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex)), time.Second*30*60, getExtendMsg) + if err != nil { + return nil, utils.Wrap(err, "Fetch failed") + } + extendMsg := &db.ExtendMsg{} + err = json.Unmarshal([]byte(extendMsgStr), extendMsg) + if err != nil { + return nil, utils.Wrap(err, "Unmarshal failed") + } + return extendMsg, nil +} + +func DelExtendMsg(ID string, index, extendMsgIndex int32) error { + return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex))), "DelExtendMsg err") +}