diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index a927e7afc..b4ae38418 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -223,7 +223,7 @@ func callbackMsgModify(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { } func CallbackBeforeExtendMsgModify() cbApi.CommonCallbackResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} + callbackResp := cbApi.CommonCallbackResp{OperationID: ""} return callbackResp } diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index f25731aea..45e97ee25 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -13,7 +13,7 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -const cExtendMsgSet = "extend_msg_set" +const cExtendMsgSet = "extend_msgs" type ExtendMsgSet struct { ID string `bson:"id" json:"ID"` @@ -32,10 +32,10 @@ type ReactionExtendMsgSet struct { } type ExtendMsg struct { - Content []*ReactionExtendMsgSet `bson:"content" json:"content"` - ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` - CreateTime int32 `bson:"create_time" json:"createTime"` - LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` + Content map[string]ReactionExtendMsgSet `bson:"content" json:"content"` + ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` + CreateTime int32 `bson:"create_time" json:"createTime"` + LatestUpdateTime int32 `bson:"latest_update_time" json:"latestUpdateTime"` } func GetExtendMsgSetID(ID string, index int32) string { @@ -94,16 +94,16 @@ func (d *DataBases) GetExtendMsgSet(ID string, index int32, opts *GetExtendMsgSe return &set, err } -func (d *DataBases) InsertExtendMsgAndGetIndex(ID string, index int32, msg *ExtendMsg) (msgIndex int32, err error) { +// first modify msg +func (d *DataBases) InsertExtendMsgAndGetIndex(ID string, index int32, msg *ExtendMsg) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - result := c.FindOneAndUpdate(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, "$push": bson.M{"extend_msgs": msg}}}) - set := &ExtendMsgSet{} - err = result.Decode(set) - return set.ExtendMsgNum, err + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond(), "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) + return err } -func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index, msgIndex int32, userID, value string) error { +// insert or update +func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index int32, clientMsgID, userID, value string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) reactionExtendMsgSet := ReactionExtendMsgSet{ @@ -115,23 +115,21 @@ func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(ID string, index, msgInde opt := &options.UpdateOptions{ Upsert: &upsert, } - //s := fmt.Sprintf("extend_msgs.%d.content", msgIndex) - _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": msgIndex}}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond()}, "&push": bson.M{"content": &reactionExtendMsgSet}}, opt) + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$set": bson.M{"latest_update_time": utils.GetCurrentTimestampBySecond()}, fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): reactionExtendMsgSet}, opt) return err } -func (d *DataBases) DeleteReactionExtendMsgSet(ID string, index, msgIndex int32, userID string) error { +func (d *DataBases) DeleteReactionExtendMsgSet(ID string, index int32, clientMsgID, userID string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - //s := fmt.Sprintf("extend_msgs.%d.content", msgIndex) - _, err := c.DeleteOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": msgIndex}}) + _, err := c.UpdateOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index)}, bson.M{"$unset": bson.M{}}) return err } // by index start end -func (d *DataBases) GetExtendMsgList(ID string, index, msgStartIndex, msgEndIndex int32) (extendMsgList []*ExtendMsg, err error) { +func (d *DataBases) GetExtendMsgList(ID string, index int32, clientMsgID string) (extendMsg *ExtendMsg, err error) { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{"$slice": []int32{msgStartIndex, msgEndIndex}}}).Decode(&extendMsgList) - return extendMsgList, err + err = c.FindOne(ctx, bson.M{"uid": GetExtendMsgSetID(ID, index), "extend_msgs": bson.M{}}).Decode(&extendMsg) + return extendMsg, err } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index f5b98869c..3edb9f238 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -595,23 +595,20 @@ func DelExtendMsgSetFromCache(ID string, index int32) error { return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgSetCache+db.GetExtendMsgSetID(ID, index)), "DelExtendMsgSetFromCache err") } -func GetExtendMsg(ID string, index, extendMsgIndex int32) (*db.ExtendMsg, error) { +func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, error) { getExtendMsg := func() (string, error) { - extendMsg, err := db.DB.GetExtendMsgList(ID, index, extendMsgIndex, extendMsgIndex+1) + extendMsg, err := db.DB.GetExtendMsgList(ID, index, clientMsgID) if err != nil { return "", utils.Wrap(err, "GetExtendMsgList failed") } - if len(extendMsg) == 0 { - return "", nil - } - bytes, err := json.Marshal(extendMsg[0]) + bytes, err := json.Marshal(extendMsg) if err != nil { return "", utils.Wrap(err, "Marshal failed") } return string(bytes), nil } - extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex)), time.Second*30*60, getExtendMsg) + extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID, time.Second*30*60, getExtendMsg) if err != nil { return nil, utils.Wrap(err, "Fetch failed") } @@ -623,6 +620,6 @@ func GetExtendMsg(ID string, index, extendMsgIndex int32) (*db.ExtendMsg, error) return extendMsg, nil } -func DelExtendMsg(ID string, index, extendMsgIndex int32) error { - return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+strconv.Itoa(int(extendMsgIndex))), "DelExtendMsg err") +func DelExtendMsg(ID string, index int32, clientMsgID string) error { + return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID), "DelExtendMsg err") }