diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index ae427442a..6198f3a24 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -2,7 +2,9 @@ package controller import ( pbMsg "Open_IM/pkg/proto/msg" + "Open_IM/pkg/proto/sdkws" "context" + "encoding/json" ) type MsgInterface interface { @@ -14,10 +16,61 @@ type MsgInterface interface { DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error) ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error) + ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error + // 获取群ID或者UserID最新一条在mongo里面的消息 + GetNewestMsg(ID string) (msg *sdkws.MsgData, err error) + // 获取群ID或者UserID最老一条在mongo里面的消息 + GetOldestMsg(ID string) (msg *sdkws.MsgData, err error) + + GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) + GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) + GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, indexList []int, unExistSeqList []uint32, err error) + SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error + + CleanUpUserMsgFromMongo(userID string, operationID string) error +} + +func NewMsgController() MsgDatabaseInterface { + return MsgController +} + +type MsgController struct { } type MsgDatabaseInterface interface { BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64) + DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) + // logic delete + DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error + DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error) + ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error) + ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error + // 获取群ID或者UserID最新一条在mongo里面的消息 + GetNewestMsg(ID string) (msg *sdkws.MsgData, err error) + // 获取群ID或者UserID最老一条在mongo里面的消息 + GetOldestMsg(ID string) (msg *sdkws.MsgData, err error) + + GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) + GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, err error) + GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*sdkws.MsgData, indexList []int, unExistSeqList []uint32, err error) + SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error + // 删除用户所有消息/redis/mongo然后重置seq + CleanUpUserMsgFromMongo(userID string, operationID string) error +} + +func NewMsgDatabase() MsgDatabaseInterface { + return MsgDatabase +} + +type MsgDatabase struct { +} + +func (m *MsgDatabase) BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error { + +} + +func (m *MsgDatabase) CleanUpUserMsgFromMongo(userID string, operationID string) error { + } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 108e78898..b212ed9a7 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -1,6 +1,10 @@ package unrelation -import "strconv" +import ( + "Open_IM/pkg/common/constant" + "Open_IM/pkg/proto/sdkws" + "strconv" +) const ( singleGocMsgNum = 5000 @@ -77,3 +81,23 @@ func (UserMsgDocModel) getMsgIndex(seq uint32) int { func (UserMsgDocModel) indexGen(uid string, seqSuffix uint32) string { return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) } + +func (UserMsgDocModel) genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*sdkws.MsgData) { + for _, v := range seqList { + msg := new(sdkws.MsgData) + msg.Seq = v + exceptionMsg = append(exceptionMsg, msg) + } + return exceptionMsg +} + +func (UserMsgDocModel) genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) { + for _, v := range seqList { + msg := new(sdkws.MsgData) + msg.Seq = v + msg.GroupID = groupID + msg.SessionType = constant.SuperGroupChatType + exceptionMsg = append(exceptionMsg, msg) + } + return exceptionMsg +} diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 96fd280d9..50e85fafd 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -470,26 +470,6 @@ func (d *db.DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, s return seqMsg, indexList, unexistSeqList, nil } -func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*sdkws.MsgData) { - for _, v := range seqList { - msg := new(sdkws.MsgData) - msg.Seq = v - exceptionMsg = append(exceptionMsg, msg) - } - return exceptionMsg -} - -func genExceptionSuperGroupMessageBySeqList(seqList []uint32, groupID string) (exceptionMsg []*sdkws.MsgData) { - for _, v := range seqList { - msg := new(sdkws.MsgData) - msg.Seq = v - msg.GroupID = groupID - msg.SessionType = constant.SuperGroupChatType - exceptionMsg = append(exceptionMsg, msg) - } - return exceptionMsg -} - func (d *db.DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -560,22 +540,6 @@ func (d *db.DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgData return nil } -func (d *db.DataBases) DelUserChatMongo2(uid string) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) - filter := bson.M{"uid": uid} - - delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600 - if _, err := c.UpdateOne(ctx, filter, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil { - return utils.Wrap(err, "") - } - return nil -} - -func (d *db.DataBases) MgoSkipUID(count int) (string, error) { - return "", nil -} - func (d *db.DataBases) CleanUpUserMsgFromMongo(userID string, operationID string) error { ctx := context.Background() c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)