mongo operation

This commit is contained in:
wangchuxiao 2022-12-09 19:54:49 +08:00
parent 24288aa597
commit 6d45f0730c
3 changed files with 101 additions and 59 deletions

View File

@ -100,12 +100,10 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
} }
} else if v.MsgData.ContentType == 2301 { } else if v.MsgData.ContentType == 2301 {
var req pbMsg.OperateMessageListReactionExtensionsReq var req pbMsg.OperateMessageListReactionExtensionsReq
var clientMsgIDList []string
for _, v := range req.MessageReactionKeyList { for _, v := range req.MessageReactionKeyList {
clientMsgIDList = append(clientMsgIDList, v.ClientMsgID) if err := db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, v.ClientMsgID, v.MsgFirstModifyTime, v.ReactionExtensionList); err != nil {
} log.NewError(req.OperationID, "InsertOrUpdateReactionExtendMsgSet failed")
if err := db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, clientMsgIDList, req.OpUserID); err != nil { }
log.NewError(req.OperationID, "InsertOrUpdateReactionExtendMsgSet failed")
} }
} }
} }

View File

@ -2,12 +2,15 @@ package db
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"errors"
"fmt" "fmt"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"strconv" "strconv"
"strings"
"time" "time"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
@ -17,12 +20,12 @@ const cExtendMsgSet = "extend_msgs"
const MaxNum = 100 const MaxNum = 100
type ExtendMsgSet struct { type ExtendMsgSet struct {
SourceID string `bson:"source_id" json:"ID"` SourceID string `bson:"source_id" json:"sourceID"`
SessionType int32 `bson:"session_type" json:"sessionType"` SessionType int32 `bson:"session_type" json:"sessionType"`
ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"`
ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"`
CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time
MaxMsgUpdateTime int64 `bson:"max_msg_update_time"` // index find msg MaxMsgUpdateTime int64 `bson:"max_msg_update_time" json:"maxMsgUpdateTime"` // index find msg
} }
type KeyValue struct { type KeyValue struct {
@ -39,10 +42,20 @@ type ExtendMsg struct {
Ex string `bson:"ex" json:"ex"` Ex string `bson:"ex" json:"ex"`
} }
func GetExtendMsgSetID(ID string, index int32) string { func GetExtendMsgMaxNum() int32 {
return MaxNum
}
func GetExtendMsgSourceID(ID string, index int32) string {
return ID + ":" + strconv.Itoa(int(index)) return ID + ":" + strconv.Itoa(int(index))
} }
func SplitSourceIDAndGetIndex(sourceID string) int32 {
l := strings.Split(sourceID, ":")
index, _ := strconv.Atoi(l[len(l)-1])
return int32(index)
}
func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
@ -84,49 +97,108 @@ type GetExtendMsgSetOpts struct {
func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error { func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
result, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) regex := fmt.Sprintf("^%s", sourceID)
var err error
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0})
// update newest
result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType}, findOpts)
if err != nil { if err != nil {
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
if result.UpsertedCount == 0 { var setList []ExtendMsgSet
if err := d.CreateExtendMsgSet(&ExtendMsgSet{ if err := result.All(ctx, &setList); err != nil {
SourceID: sourceID, return utils.Wrap(err, "")
}
if len(setList) == 0 || setList[0].ExtendMsgNum >= GetExtendMsgMaxNum() {
var index int32
if len(setList) > 0 {
index = SplitSourceIDAndGetIndex(setList[0].SourceID)
}
err = d.CreateExtendMsgSet(&ExtendMsgSet{
SourceID: GetExtendMsgSourceID(sourceID, index),
SessionType: sessionType, SessionType: sessionType,
ExtendMsgs: map[string]ExtendMsg{msg.ClientMsgID: *msg}, ExtendMsgs: map[string]ExtendMsg{msg.ClientMsgID: *msg},
ExtendMsgNum: 1, ExtendMsgNum: 1,
CreateTime: msg.MsgFirstModifyTime, CreateTime: msg.MsgFirstModifyTime,
MaxMsgUpdateTime: msg.MsgFirstModifyTime, MaxMsgUpdateTime: msg.MsgFirstModifyTime,
}); err != nil { })
return err } else {
} _, err = c.UpdateOne(ctx, bson.M{"source_id": setList[0].SourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}})
} }
return nil return utils.Wrap(err, "")
} }
// insert or update // insert or update
func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID, typeKey, value string) error { func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
reactionExtendMsgSet := KeyValue{ var updateBson = bson.M{}
TypeKey: typeKey, for _, v := range reactionExtensionList {
Value: value, updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = v
LatestUpdateTime: utils.GetCurrentTimestampBySecond(),
} }
upsert := true upsert := true
opt := &options.UpdateOptions{ opt := &options.UpdateOptions{
Upsert: &upsert, Upsert: &upsert,
} }
_, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$set": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, typeKey): reactionExtendMsgSet}}, opt) findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0})
return err regex := fmt.Sprintf("^%s", sourceID)
result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": msgFirstModifyTime}}, findOpts)
if err != nil {
return utils.Wrap(err, "")
}
var setList []ExtendMsgSet
if err := result.All(ctx, &setList); err != nil {
return utils.Wrap(err, "")
}
if len(setList) == 0 {
return utils.Wrap(errors.New("InsertOrUpdateReactionExtendMsgSet failed, len(setList) == 0"), "")
}
_, err = c.UpdateOne(ctx, bson.M{"source_id": setList[0].SourceID, "session_type": sessionType}, bson.M{"$set": updateBson}, opt)
return utils.Wrap(err, "")
} }
// delete TypeKey // delete TypeKey
func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID []string, userID string) error { func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList []*server_api_params.KeyValue) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
_, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$unset": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): ""}}) var updateBson = bson.M{}
for _, v := range reactionExtensionList {
updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = ""
}
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0})
regex := fmt.Sprintf("^%s", sourceID)
result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": msgFirstModifyTime}}, findOpts)
if err != nil {
return utils.Wrap(err, "")
}
var setList []ExtendMsgSet
if err := result.All(ctx, &setList); err != nil {
return utils.Wrap(err, "")
}
if len(setList) == 0 {
return utils.Wrap(errors.New("InsertOrUpdateReactionExtendMsgSet failed, len(setList) == 0"), "")
}
_, err = c.UpdateOne(ctx, bson.M{"source_id": setList[0].SourceID, "session_type": sessionType}, bson.M{"$unset": updateBson})
return err return err
} }
func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) error { func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet)
findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1})
regex := fmt.Sprintf("^%s", sourceID)
result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "msgFirstModifyTime": bson.M{"$lte": maxMsgUpdateTime}}, findOpts)
if err != nil {
return nil, utils.Wrap(err, "")
}
var extendMsgList []ExtendMsg
if err := result.All(ctx, &extendMsgList); err != nil {
return nil, utils.Wrap(err, "")
}
if len(extendMsgList) == 0 {
return nil, utils.Wrap(errors.New("GetExtendMsg failed, len(setList) == 0"), "")
}
return &extendMsgList[0], nil
} }

View File

@ -567,37 +567,9 @@ func DelConversationFromCache(ownerUserID, conversationID string) error {
return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err")
} }
func GetExtendMsgSetFromCache(ID string, index int32) (*db.ExtendMsgSet, error) { func GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*db.ExtendMsg, error) {
getExtendMsgSet := func() (string, error) {
extendMsgSet, err := db.DB.GetExtendMsgSet(ID, index, &db.GetExtendMsgSetOpts{ExcludeExtendMsgs: false})
if err != nil {
return "", utils.Wrap(err, "GetExtendMsgSet failed")
}
bytes, err := json.Marshal(extendMsgSet)
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
extendMsgSetStr, err := db.DB.Rc.Fetch(extendMsgSetCache+db.GetExtendMsgSetID(ID, index), time.Second*30*60, getExtendMsgSet)
if err != nil {
return nil, utils.Wrap(err, "Fetch failed")
}
extendMsgSet := &db.ExtendMsgSet{}
err = json.Unmarshal([]byte(extendMsgSetStr), extendMsgSet)
if err != nil {
return nil, utils.Wrap(err, "Unmarshal failed")
}
return extendMsgSet, nil
}
func DelExtendMsgSetFromCache(ID string, index int32) error {
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgSetCache+db.GetExtendMsgSetID(ID, index)), "DelExtendMsgSetFromCache err")
}
func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, error) {
getExtendMsg := func() (string, error) { getExtendMsg := func() (string, error) {
extendMsg, err := db.DB.GetExtendMsgList(ID, index, clientMsgID) extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, maxMsgUpdateTime)
if err != nil { if err != nil {
return "", utils.Wrap(err, "GetExtendMsgList failed") return "", utils.Wrap(err, "GetExtendMsgList failed")
} }
@ -608,7 +580,7 @@ func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, er
return string(bytes), nil return string(bytes), nil
} }
extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID, time.Second*30*60, getExtendMsg) extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "Fetch failed") return nil, utils.Wrap(err, "Fetch failed")
} }
@ -621,5 +593,5 @@ func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, er
} }
func DelExtendMsg(ID string, index int32, clientMsgID string) error { func DelExtendMsg(ID string, index int32, clientMsgID string) error {
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID), "DelExtendMsg err") return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+clientMsgID), "DelExtendMsg err")
} }