diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index d8f2ac26a..24faab98e 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -26,8 +26,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/mongo" - - "google.golang.org/protobuf/proto" ) type CommonMsgDatabase interface { @@ -148,6 +146,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return nil } num := db.msg.GetSingleGocMsgNum() + num = 100 if msgList[0].Msg != nil { firstSeq = msgList[0].Msg.Seq } @@ -164,13 +163,13 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI err error ) if msg.Msg != nil { - res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msgList[0].Msg) + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", msg.Msg) } else if msg.Revoke != nil { - res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msgList[0].Revoke) + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke) } else if msg.DelList != nil { - res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msgList[0].DelList) + res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList) } else if msg.ReadList != nil { - res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msgList[0].ReadList) + res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList) } else { return false, errs.ErrArgs.Wrap("msg all field is nil") } @@ -199,12 +198,21 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } var insert int for j := i; j < len(msgList); j++ { + seq = firstSeq + int64(j) if getDocID(seq) != docID { break } insert++ doc.Msg[getIndex(seq)] = *msgList[j] } + for i, model := range doc.Msg { + if model.DelList == nil { + doc.Msg[i].DelList = []string{} + } + if model.ReadList == nil { + doc.Msg[i].ReadList = []string{} + } + } if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { i-- @@ -220,91 +228,91 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { - num := db.msg.GetSingleGocMsgNum() - currentIndex := currentMaxSeq / num - var blockMsgs []*[]*sdkws.MsgData - for i, data := range msgList { - data.Seq = currentMaxSeq + int64(i+1) - index := data.Seq/num - currentIndex - if i == 0 && index == 1 { - index-- - currentIndex++ - } - var block *[]*sdkws.MsgData - if len(blockMsgs) == int(index) { - var size int64 - if i == 0 { - size = num - data.Seq%num - } else { - temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num - if temp >= num { - size = num - } else { - size = temp % num - } - } - temp := make([]*sdkws.MsgData, 0, size) - block = &temp - blockMsgs = append(blockMsgs, block) - } else { - block = blockMsgs[index] - } - *block = append(*block, msgList[i]) - } - create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0) - if !create { - exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex)) - if err != nil { - return err - } - create = !exist - } - for i, msgs := range blockMsgs { - docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i)) - if create || i != 0 { // 插入 - doc := unRelationTb.MsgDocModel{ - DocID: docID, - Msg: make([]unRelationTb.MsgInfoModel, num), - } - for i := 0; i < len(doc.Msg); i++ { - doc.Msg[i].ReadList = []string{} - doc.Msg[i].DelList = []string{} - } - for _, msg := range *msgs { - data, err := proto.Marshal(msg) - if err != nil { - return err - } - doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{ - SendTime: msg.SendTime, - Msg: data, - ReadList: []string{}, - DelList: []string{}, - } - } - if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return utils.Wrap(err, "") - } - prome.Inc(prome.MsgInsertMongoSuccessCounter) - } else { // 修改 - for _, msg := range *msgs { - data, err := proto.Marshal(msg) - if err != nil { - return err - } - info := unRelationTb.MsgInfoModel{ - SendTime: msg.SendTime, - Msg: data, - } - if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return err - } - prome.Inc(prome.MsgInsertMongoSuccessCounter) - } - } - } + //num := db.msg.GetSingleGocMsgNum() + //currentIndex := currentMaxSeq / num + //var blockMsgs []*[]*sdkws.MsgData + //for i, data := range msgList { + // data.Seq = currentMaxSeq + int64(i+1) + // index := data.Seq/num - currentIndex + // if i == 0 && index == 1 { + // index-- + // currentIndex++ + // } + // var block *[]*sdkws.MsgData + // if len(blockMsgs) == int(index) { + // var size int64 + // if i == 0 { + // size = num - data.Seq%num + // } else { + // temp := int64(len(msgList)-len(*blockMsgs[0])) - int64(len(blockMsgs)-1)*num + // if temp >= num { + // size = num + // } else { + // size = temp % num + // } + // } + // temp := make([]*sdkws.MsgData, 0, size) + // block = &temp + // blockMsgs = append(blockMsgs, block) + // } else { + // block = blockMsgs[index] + // } + // *block = append(*block, msgList[i]) + //} + //create := currentMaxSeq == 0 || ((*blockMsgs[0])[0].Seq%num == 0) + //if !create { + // exist, err := db.msgDocDatabase.IsExistDocID(ctx, db.msg.IndexDocID(conversationID, currentIndex)) + // if err != nil { + // return err + // } + // create = !exist + //} + //for i, msgs := range blockMsgs { + // docID := db.msg.IndexDocID(conversationID, currentIndex+int64(i)) + // if create || i != 0 { // 插入 + // doc := unRelationTb.MsgDocModel{ + // DocID: docID, + // Msg: make([]unRelationTb.MsgInfoModel, num), + // } + // for i := 0; i < len(doc.Msg); i++ { + // doc.Msg[i].ReadList = []string{} + // doc.Msg[i].DelList = []string{} + // } + // for _, msg := range *msgs { + // data, err := proto.Marshal(msg) + // if err != nil { + // return err + // } + // doc.Msg[msg.Seq%num] = unRelationTb.MsgInfoModel{ + // SendTime: msg.SendTime, + // Msg: data, + // ReadList: []string{}, + // DelList: []string{}, + // } + // } + // if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { + // prome.Inc(prome.MsgInsertMongoFailedCounter) + // return utils.Wrap(err, "") + // } + // prome.Inc(prome.MsgInsertMongoSuccessCounter) + // } else { // 修改 + // for _, msg := range *msgs { + // data, err := proto.Marshal(msg) + // if err != nil { + // return err + // } + // info := unRelationTb.MsgInfoModel{ + // SendTime: msg.SendTime, + // Msg: data, + // } + // if err := db.msgDocDatabase.UpdateMsg(ctx, docID, msg.Seq%num, &info); err != nil { + // prome.Inc(prome.MsgInsertMongoFailedCounter) + // return err + // } + // prome.Inc(prome.MsgInsertMongoSuccessCounter) + // } + // } + //} return nil } @@ -408,10 +416,11 @@ func (db *commonMsgDatabase) GetOldestMsg(ctx context.Context, conversationID st func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { msgPb = &sdkws.MsgData{} - err = proto.Unmarshal(msgInfo.Msg, msgPb) - if err != nil { - return nil, utils.Wrap(err, "") - } + // todo: unmarshal + //err = proto.Unmarshal(msgInfo.Msg, msgPb) + //if err != nil { + // return nil, utils.Wrap(err, "") + //} return msgPb, nil } @@ -644,69 +653,70 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // recursion 删除list并且返回设置的最小seq func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list - msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index) - if err != nil || msgs.DocID == "" { - if err != nil { - if err == unrelation.ErrMsgListNotExist { - log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) - } else { - log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) - } - } - // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 - err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs) - if err != nil { - return 0, err - } - return delStruct.getSetMinSeq() + 1, nil - } - log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg)) - if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { - log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) - } - if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { - delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID) - lastMsgPb := &sdkws.MsgData{} - err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) - if err != nil { - log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID) - return 0, utils.Wrap(err, "proto.Unmarshal failed") - } - delStruct.minSeq = lastMsgPb.Seq - } else { - var hasMarkDelFlag bool - for i, msg := range msgs.Msg { - if msg.SendTime != 0 { - msgPb := &sdkws.MsgData{} - err = proto.Unmarshal(msg.Msg, msgPb) - if err != nil { - log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID) - return 0, utils.Wrap(err, "proto.Unmarshal failed") - } - if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { - msgPb.Status = constant.MsgDeleted - bytes, _ := proto.Marshal(msgPb) - msg.Msg = bytes - msg.SendTime = 0 - hasMarkDelFlag = true - } else { - // 到本条消息不需要删除, minSeq置为这条消息的seq - if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil { - return 0, err - } - if hasMarkDelFlag { - if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil { - return delStruct.getSetMinSeq(), err - } - } - return msgPb.Seq, nil - } - } - } - } - // 继续递归 index+1 - seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) - return seq, err + //msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index) + //if err != nil || msgs.DocID == "" { + // if err != nil { + // if err == unrelation.ErrMsgListNotExist { + // log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) + // } else { + // log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) + // } + // } + // // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 + // err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs) + // if err != nil { + // return 0, err + // } + // return delStruct.getSetMinSeq() + 1, nil + //} + //log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg)) + //if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { + // log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) + //} + //if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { + // delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID) + // lastMsgPb := &sdkws.MsgData{} + // err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) + // if err != nil { + // log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID) + // return 0, utils.Wrap(err, "proto.Unmarshal failed") + // } + // delStruct.minSeq = lastMsgPb.Seq + //} else { + // var hasMarkDelFlag bool + // for i, msg := range msgs.Msg { + // if msg.SendTime != 0 { + // msgPb := &sdkws.MsgData{} + // err = proto.Unmarshal(msg.Msg, msgPb) + // if err != nil { + // log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID) + // return 0, utils.Wrap(err, "proto.Unmarshal failed") + // } + // if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { + // msgPb.Status = constant.MsgDeleted + // bytes, _ := proto.Marshal(msgPb) + // msg.Msg = bytes + // msg.SendTime = 0 + // hasMarkDelFlag = true + // } else { + // // 到本条消息不需要删除, minSeq置为这条消息的seq + // if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil { + // return 0, err + // } + // if hasMarkDelFlag { + // if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil { + // return delStruct.getSetMinSeq(), err + // } + // } + // return msgPb.Seq, nil + // } + // } + // } + //} + //// 继续递归 index+1 + //seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) + //return seq, err + return 0, nil } func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 00d2ac2ea..e557176a3 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -2,13 +2,12 @@ package controller import ( "context" - "encoding/json" "fmt" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "go.mongodb.org/mongo-driver/bson" "math/rand" - "sort" "strconv" "sync" "testing" @@ -126,523 +125,75 @@ func Test_BatchInsertChat2DB(t *testing.T) { } -func TestName(t *testing.T) { - s := ` [ - 189, - 498, - 310, - 163, - 313, - 335, - 327, - 342, - 123, - 97, - 4, - 362, - 210, - 298, - 436, - 9, - 369, - 432, - 132, - 69, - 248, - 93, - 91, - 112, - 145, - 194, - 84, - 443, - 179, - 241, - 257, - 237, - 169, - 460, - 33, - 441, - 126, - 187, - 390, - 402, - 51, - 35, - 455, - 175, - 389, - 61, - 309, - 467, - 492, - 453, - 159, - 276, - 165, - 417, - 173, - 157, - 12, - 209, - 269, - 36, - 226, - 356, - 92, - 267, - 482, - 318, - 219, - 119, - 176, - 245, - 74, - 13, - 450, - 196, - 215, - 28, - 167, - 366, - 442, - 201, - 341, - 68, - 2, - 484, - 328, - 44, - 423, - 403, - 105, - 109, - 480, - 271, - 134, - 336, - 299, - 148, - 365, - 135, - 277, - 87, - 244, - 301, - 218, - 59, - 280, - 283, - 55, - 499, - 133, - 316, - 407, - 146, - 56, - 394, - 386, - 297, - 285, - 137, - 58, - 214, - 142, - 6, - 124, - 48, - 60, - 212, - 75, - 50, - 412, - 458, - 127, - 45, - 266, - 202, - 368, - 138, - 260, - 41, - 193, - 88, - 114, - 410, - 95, - 382, - 416, - 281, - 434, - 359, - 98, - 462, - 300, - 352, - 230, - 247, - 117, - 64, - 287, - 405, - 224, - 19, - 259, - 305, - 220, - 150, - 477, - 111, - 448, - 78, - 103, - 7, - 385, - 151, - 429, - 325, - 273, - 317, - 470, - 454, - 170, - 223, - 5, - 307, - 396, - 315, - 53, - 154, - 446, - 24, - 255, - 227, - 76, - 456, - 250, - 321, - 330, - 391, - 355, - 49, - 479, - 387, - 216, - 39, - 251, - 312, - 217, - 136, - 262, - 322, - 344, - 466, - 242, - 100, - 388, - 38, - 323, - 376, - 379, - 279, - 239, - 85, - 306, - 181, - 485, - 120, - 333, - 334, - 17, - 395, - 81, - 374, - 147, - 139, - 185, - 42, - 1, - 424, - 199, - 225, - 113, - 438, - 128, - 338, - 156, - 493, - 46, - 160, - 11, - 3, - 171, - 464, - 62, - 238, - 431, - 440, - 302, - 65, - 308, - 348, - 125, - 174, - 195, - 77, - 392, - 249, - 82, - 350, - 444, - 232, - 186, - 494, - 384, - 275, - 129, - 294, - 246, - 357, - 102, - 96, - 73, - 15, - 263, - 296, - 236, - 29, - 340, - 152, - 149, - 143, - 437, - 172, - 190, - 34, - 158, - 254, - 295, - 483, - 397, - 337, - 72, - 343, - 178, - 404, - 270, - 346, - 205, - 377, - 486, - 497, - 370, - 414, - 240, - 360, - 490, - 94, - 256, - 8, - 54, - 398, - 183, - 228, - 162, - 399, - 289, - 83, - 86, - 197, - 243, - 57, - 25, - 288, - 488, - 372, - 168, - 206, - 188, - 491, - 452, - 353, - 478, - 421, - 221, - 430, - 184, - 204, - 26, - 211, - 140, - 155, - 468, - 161, - 420, - 303, - 30, - 449, - 131, - 500, - 20, - 71, - 79, - 445, - 425, - 293, - 411, - 400, - 320, - 474, - 272, - 413, - 329, - 177, - 122, - 21, - 347, - 314, - 451, - 101, - 367, - 311, - 40, - 476, - 415, - 418, - 363, - 282, - 469, - 89, - 274, - 481, - 475, - 203, - 268, - 393, - 261, - 200, - 121, - 164, - 472, - 10, - 284, - 14, - 358, - 153, - 383, - 67, - 473, - 373, - 191, - 144, - 16, - 345, - 361, - 433, - 116, - 331, - 489, - 66, - 106, - 487, - 426, - 99, - 27, - 141, - 264, - 439, - 371, - 213, - 18, - 253, - 292, - 130, - 409, - 278, - 419, - 90, - 496, - 447, - 465, - 461, - 339, - 80, - 31, - 70, - 233, - 326, - 37, - 265, - 252, - 222, - 118, - 198, - 406, - 286, - 380, - 104, - 304, - 351, - 408, - 180, - 22, - 364, - 381, - 401, - 234, - 375, - 459, - 319, - 229, - 207, - 291, - 52, - 463, - 427, - 23, - 235, - 32, - 208, - 192, - 349, - 231, - 354, - 435, - 182, - 428, - 332, - 378, - 290, - 108, - 258, - 471, - 115, - 47, - 457, - 166, - 43, - 495, - 63, - 110, - 107, - 422, - 324 - ]` +func GetDB() *commonMsgDatabase { + config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} + config.Config.Mongo.DBTimeout = 60 + config.Config.Mongo.DBDatabase = "openIM" + config.Config.Mongo.DBSource = "admin" + config.Config.Mongo.DBUserName = "root" + config.Config.Mongo.DBPassword = "openIM123" + config.Config.Mongo.DBMaxPoolSize = 100 + config.Config.Mongo.DBRetainChatRecords = 3650 + config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" - var arr []int - - if err := json.Unmarshal([]byte(s), &arr); err != nil { + mongo, err := unrelation.NewMongo() + if err != nil { panic(err) } - - sort.Ints(arr) - - for i, v := range arr { - fmt.Println(i, v, v == i+1) - if v != i+1 { - panic(fmt.Sprintf("expected %d, got %d", i+1, v)) - } + err = mongo.GetDatabase().Client().Ping(context.Background(), nil) + if err != nil { + panic(err) + } + return &commonMsgDatabase{ + msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()), + } +} + +func Test_Insert(t *testing.T) { + db := GetDB() + ctx := context.Background() + var arr []*unRelationTb.MsgInfoModel + for i := 0; i < 345; i++ { + arr = append(arr, &unRelationTb.MsgInfoModel{ + Msg: &unRelationTb.MsgDataModel{ + Seq: int64(i), + Content: fmt.Sprintf("test-%d", i), + }, + }) + } + if err := db.BatchInsertBlock(ctx, "test", arr, 0); err != nil { + t.Fatal(err) + } +} + +func Test_Revoke(t *testing.T) { + db := GetDB() + ctx := context.Background() + var arr []*unRelationTb.MsgInfoModel + for i := 0; i < 456; i++ { + arr = append(arr, &unRelationTb.MsgInfoModel{ + Revoke: &unRelationTb.RevokeModel{ + UserID: "uid_" + strconv.Itoa(i), + Nickname: "uname_" + strconv.Itoa(i), + Time: time.Now().UnixMilli(), + }, + }) + } + if err := db.BatchInsertBlock(ctx, "test", arr, 123); err != nil { + t.Fatal(err) + } +} + +func Test_Delete(t *testing.T) { + db := GetDB() + ctx := context.Background() + var arr []*unRelationTb.MsgInfoModel + for i := 0; i < 123; i++ { + arr = append(arr, &unRelationTb.MsgInfoModel{ + DelList: []string{"uid_1", "uid_2"}, + }) + } + if err := db.BatchInsertBlock(ctx, "test", arr, 210); err != nil { + t.Fatal(err) } - } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 8ea183b44..139350441 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -46,7 +46,7 @@ type MsgDataModel struct { SessionType int32 `bson:"session_type"` MsgFrom int32 `bson:"msg_from"` ContentType int32 `bson:"content_type"` - Content []byte `bson:"content"` + Content string `bson:"content"` Seq int64 `bson:"seq"` SendTime int64 `bson:"send_time"` CreateTime int64 `bson:"create_time"` @@ -92,7 +92,8 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 { } func (m *MsgDocModel) IsFull() bool { - return m.Msg[len(m.Msg)-1].SendTime != 0 + //return m.Msg[len(m.Msg)-1].SendTime != 0 + return false } func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index e8edfe156..6a290d582 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -6,7 +6,6 @@ import ( "fmt" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -26,7 +25,16 @@ type MsgMongoDriver struct { } func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface { - return &MsgMongoDriver{MsgCollection: database.Collection(table.MsgDocModel{}.TableName())} + collection := database.Collection(table.MsgDocModel{}.TableName()) + indexModel := mongo.IndexModel{ + Keys: bson.M{"doc_id": 1}, + Options: options.Index().SetUnique(true), + } + _, err := collection.Indexes().CreateOne(context.Background(), indexModel) + if err != nil { + panic(err) + } + return &MsgMongoDriver{MsgCollection: collection} } func (m *MsgMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.MsgInfoModel) error { @@ -103,33 +111,33 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab } func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { - doc, err := m.FindOneByDocID(ctx, docID) - if err != nil { - return nil, nil, nil, err - } - singleCount := 0 - var hasSeqList []int64 - for i := 0; i < len(doc.Msg); i++ { - var msg sdkws.MsgData - if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil { - return nil, nil, nil, err - } - if utils.Contain(msg.Seq, seqs...) { - indexes = append(indexes, i) - seqMsgs = append(seqMsgs, &msg) - hasSeqList = append(hasSeqList, msg.Seq) - singleCount++ - if singleCount == len(seqs) { - break - } - } - } - for _, i := range seqs { - if utils.Contain(i, hasSeqList...) { - continue - } - unExistSeqs = append(unExistSeqs, i) - } + //doc, err := m.FindOneByDocID(ctx, docID) + //if err != nil { + // return nil, nil, nil, err + //} + //singleCount := 0 + //var hasSeqList []int64 + //for i := 0; i < len(doc.Msg); i++ { + // var msg sdkws.MsgData + // if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil { + // return nil, nil, nil, err + // } + // if utils.Contain(msg.Seq, seqs...) { + // indexes = append(indexes, i) + // seqMsgs = append(seqMsgs, &msg) + // hasSeqList = append(hasSeqList, msg.Seq) + // singleCount++ + // if singleCount == len(seqs) { + // break + // } + // } + //} + //for _, i := range seqs { + // if utils.Contain(i, hasSeqList...) { + // continue + // } + // unExistSeqs = append(unExistSeqs, i) + //} return seqMsgs, indexes, unExistSeqs, nil } @@ -170,30 +178,30 @@ func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string } func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { - var msgDocs []table.MsgDocModel - cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1})) - if err != nil { - return nil, err - } - err = cursor.All(ctx, &msgDocs) - if err != nil { - return nil, utils.Wrap(err, "") - } - var oldestMsg table.MsgInfoModel - if len(msgDocs) > 0 { - for _, v := range msgDocs[0].Msg { - if v.SendTime != 0 { - oldestMsg = v - break - } - } - if len(oldestMsg.Msg) == 0 { - if len(msgDocs[0].Msg) > 0 { - oldestMsg = msgDocs[0].Msg[0] - } - } - return &oldestMsg, nil - } + //var msgDocs []table.MsgDocModel + //cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1})) + //if err != nil { + // return nil, err + //} + //err = cursor.All(ctx, &msgDocs) + //if err != nil { + // return nil, utils.Wrap(err, "") + //} + //var oldestMsg table.MsgInfoModel + //if len(msgDocs) > 0 { + // for _, v := range msgDocs[0].Msg { + // if v.SendTime != 0 { + // oldestMsg = v + // break + // } + // } + // if len(oldestMsg.Msg) == 0 { + // if len(msgDocs[0].Msg) > 0 { + // oldestMsg = msgDocs[0].Msg[0] + // } + // } + // return &oldestMsg, nil + //} return nil, ErrMsgNotFound } @@ -211,50 +219,50 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode } func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) { - beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) - beginIndex := m.msg.GetMsgIndex(beginSeq) - num := endSeq - beginSeq + 1 - pipeline := bson.A{ - bson.M{ - "$match": bson.M{"doc_id": docID}, - }, - bson.M{ - "$project": bson.M{ - "msgs": bson.M{ - "$slice": bson.A{"$msgs", beginIndex, num}, - }, - }, - }, - } - cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) - if err != nil { - return nil, errs.Wrap(err) - } - defer cursor.Close(ctx) - var doc table.MsgDocModel - i := 0 - for cursor.Next(ctx) { - err := cursor.Decode(&doc) - if err != nil { - return nil, err - } - if i == 0 { - break - } - } - log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) - for _, v := range doc.Msg { - var msg sdkws.MsgData - if err := proto.Unmarshal(v.Msg, &msg); err != nil { - return nil, err - } - if msg.Seq >= beginSeq && msg.Seq <= endSeq { - log.ZDebug(ctx, "find msg", "msg", &msg) - msgs = append(msgs, &msg) - } else { - log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg) - } - } + //beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) + //beginIndex := m.msg.GetMsgIndex(beginSeq) + //num := endSeq - beginSeq + 1 + //pipeline := bson.A{ + // bson.M{ + // "$match": bson.M{"doc_id": docID}, + // }, + // bson.M{ + // "$project": bson.M{ + // "msgs": bson.M{ + // "$slice": bson.A{"$msgs", beginIndex, num}, + // }, + // }, + // }, + //} + //cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) + //if err != nil { + // return nil, errs.Wrap(err) + //} + //defer cursor.Close(ctx) + //var doc table.MsgDocModel + //i := 0 + //for cursor.Next(ctx) { + // err := cursor.Decode(&doc) + // if err != nil { + // return nil, err + // } + // if i == 0 { + // break + // } + //} + //log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) + //for _, v := range doc.Msg { + // var msg sdkws.MsgData + // if err := proto.Unmarshal(v.Msg, &msg); err != nil { + // return nil, err + // } + // if msg.Seq >= beginSeq && msg.Seq <= endSeq { + // log.ZDebug(ctx, "find msg", "msg", &msg) + // msgs = append(msgs, &msg) + // } else { + // log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg) + // } + //} return msgs, nil }