From 24288aa597198d56ede588be06b17452da314aa0 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 9 Dec 2022 18:03:40 +0800 Subject: [PATCH] modify msg --- .../logic/online_msg_to_mongo_handler.go | 47 +++++++++++++- internal/rpc/msg/extend_msg_callback.go | 6 +- pkg/base_info/msg.go | 8 +-- pkg/common/db/extend_msg_mongo_model.go | 65 +++++++++---------- 4 files changed, 85 insertions(+), 41 deletions(-) diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go index 11bc399ad..a18186714 100644 --- a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -61,7 +61,52 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil { log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList) } - //if v.MsgData.ContentType == ? {} + } else if v.MsgData.ContentType == constant.ReactionMessageModifierNotification { + var req pbMsg.ModifyMessageReactionExtensionsReq + if req.IsExternalExtensions { + log.NewInfo(req.OperationID, "msg:", req.String(), "this is external extensions") + continue + } + if !req.IsReact { + // first time to modify + var reactionExtensionList = make(map[string]db.KeyValue) + for k, v := range req.ReactionExtensionList { + reactionExtensionList[k] = db.KeyValue{ + TypeKey: v.TypeKey, + Value: v.Value, + LatestUpdateTime: v.LatestUpdateTime, + } + } + extendMsg := db.ExtendMsg{ + ReactionExtensionList: reactionExtensionList, + ClientMsgID: req.ClientMsgID, + MsgFirstModifyTime: req.MsgFirstModifyTime, + } + if req.AttachedInfo != nil { + extendMsg.AttachedInfo = req.AttachedInfo.Value + } + if req.Ex != nil { + extendMsg.Ex = req.Ex.Value + } + if err := db.DB.InsertExtendMsg(req.SourceID, req.SessionType, &extendMsg); err != nil { + log.NewError(req.OperationID, "MsgFirstModify InsertExtendMsg failed", req.SourceID, req.SessionType, extendMsg, err.Error()) + continue + } + } else { + // is already modify + if err := db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, req.ReactionExtensionList); err != nil { + log.NewError(req.OperationID, "InsertOrUpdateReactionExtendMsgSet failed") + } + } + } else if v.MsgData.ContentType == 2301 { + var req pbMsg.OperateMessageListReactionExtensionsReq + var clientMsgIDList []string + for _, v := range req.MessageReactionKeyList { + clientMsgIDList = append(clientMsgIDList, v.ClientMsgID) + } + if err := db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, clientMsgIDList, req.OpUserID); err != nil { + log.NewError(req.OperationID, "InsertOrUpdateReactionExtendMsgSet failed") + } } } } diff --git a/internal/rpc/msg/extend_msg_callback.go b/internal/rpc/msg/extend_msg_callback.go index 956c86c9d..2654ee528 100644 --- a/internal/rpc/msg/extend_msg_callback.go +++ b/internal/rpc/msg/extend_msg_callback.go @@ -1,9 +1,11 @@ package msg -func callbackSetMessageReactionExtensions() { +import "Open_IM/pkg/proto/msg" + +func callbackSetMessageReactionExtensions(req *msg.ModifyMessageReactionExtensionsReq) { } -func callbackDeleteMessageReactionExtensions() { +func callbackDeleteMessageReactionExtensions(req *msg.OperateMessageListReactionExtensionsReq) { } diff --git a/pkg/base_info/msg.go b/pkg/base_info/msg.go index 4927d7969..193ea40ab 100644 --- a/pkg/base_info/msg.go +++ b/pkg/base_info/msg.go @@ -69,8 +69,8 @@ type ModifyMessageReactionExtensionsReq struct { type ModifyMessageReactionExtensionsResp struct { CommResp Data struct { - SuccessList []*sdk_ws.ExtendMsg `json:"successList"` - FailedList []*sdk_ws.ExtendMsg `json:"failedList"` + SuccessList []*msg.ExtendMsgResp `json:"successList"` + FailedList []*msg.ExtendMsgResp `json:"failedList"` } `json:"data"` } @@ -84,8 +84,8 @@ type OperateMessageListReactionExtensionsReq struct { type OperateMessageListReactionExtensionsResp struct { CommResp Data struct { - SuccessList []*sdk_ws.ExtendMsg `json:"successList"` - FailedList []*sdk_ws.ExtendMsg `json:"failedList"` + SuccessList []*msg.ExtendMsgResp `json:"successList"` + FailedList []*msg.ExtendMsgResp `json:"failedList"` } `json:"data"` } diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index d4229ba3a..1c5e3d10b 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -14,10 +14,11 @@ import ( ) const cExtendMsgSet = "extend_msgs" +const MaxNum = 100 type ExtendMsgSet struct { - SourceID string `bson:"id" json:"ID"` - SessionType string `bson:"session_type" json:"sessionType"` + SourceID string `bson:"source_id" json:"ID"` + SessionType int32 `bson:"session_type" json:"sessionType"` ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time @@ -31,11 +32,11 @@ type KeyValue struct { } type ExtendMsg struct { - ReactionExtensionList map[string]KeyValue `bson:"content" json:"content"` + ReactionExtensionList map[string]KeyValue `bson:"reaction_extension_list" json:"reactionExtensionList"` ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` - MsgFirstModifyTime int64 `bson:"create_time" json:"createTime"` // this extendMsg create time - AttachedInfo *string `bson:"attached_info" json:"attachedInfo"` - Ex *string `bson:"ex" json:"ex"` + MsgFirstModifyTime int64 `bson:"msg_first_modify_time" json:"msgFirstModifyTime"` // this extendMsg create time + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + Ex string `bson:"ex" json:"ex"` } func GetExtendMsgSetID(ID string, index int32) string { @@ -79,35 +80,35 @@ type GetExtendMsgSetOpts struct { ExcludeExtendMsgs bool } -func (d *DataBases) GetExtendMsgSet(ID string, index int32, opts *GetExtendMsgSetOpts) (*ExtendMsgSet, error) { +// first modify msg +func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - var set ExtendMsgSet - var findOneOpt *options.FindOneOptions - if opts != nil { - if opts.ExcludeExtendMsgs { - findOneOpt = &options.FindOneOptions{} - findOneOpt.SetProjection(bson.M{"extend_msgs": 0}) + result, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) + if err != nil { + return utils.Wrap(err, "") + } + if result.UpsertedCount == 0 { + if err := d.CreateExtendMsgSet(&ExtendMsgSet{ + SourceID: sourceID, + SessionType: sessionType, + ExtendMsgs: map[string]ExtendMsg{msg.ClientMsgID: *msg}, + ExtendMsgNum: 1, + CreateTime: msg.MsgFirstModifyTime, + MaxMsgUpdateTime: msg.MsgFirstModifyTime, + }); err != nil { + return err } } - err := c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, findOneOpt).Decode(&set) - return &set, err -} - -// first modify msg -func (d *DataBases) InsertExtendMsgAndGetIndex(ID string, index int32, msg *ExtendMsg) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) - return err + return nil } // insert or update -func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index int32, clientMsgID, userID, value string) error { +func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID, typeKey, value string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) reactionExtendMsgSet := KeyValue{ - TypeKey: userID, + TypeKey: typeKey, Value: value, LatestUpdateTime: utils.GetCurrentTimestampBySecond(), } @@ -115,21 +116,17 @@ func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index int32, c opt := &options.UpdateOptions{ Upsert: &upsert, } - _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond()}, fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): reactionExtendMsgSet}, opt) + _, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$set": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, typeKey): reactionExtendMsgSet}}, opt) return err } -func (d *DataBases) DeleteReactionExtendMsgSet(ID string, index int32, clientMsgID, userID string) error { +// delete TypeKey +func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID []string, userID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$unset": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): ""}}) + _, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$unset": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): ""}}) return err } -// by index start end -func (d *DataBases) GetExtendMsgList(ID string, index int32, clientMsgID string) (extendMsg *ExtendMsg, err error) { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{}}).Decode(&extendMsg) - return extendMsg, err +func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) error { }