diff --git a/pkg/common/db/cache/extend_msg_set.go b/pkg/common/db/cache/extend_msg_set.go index dc5d0e3a5..6a02f0af3 100644 --- a/pkg/common/db/cache/extend_msg_set.go +++ b/pkg/common/db/cache/extend_msg_set.go @@ -9,6 +9,11 @@ import ( "time" ) +const ( + extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" + extendMsgCache = "EXTEND_MSG_CACHE:" +) + type ExtendMsgSetCache struct { expireTime time.Duration rcClient *rockscache.Client diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index c036de5f8..066c7367f 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -1,42 +1,15 @@ package cache import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db/relation" - "Open_IM/pkg/common/log" - "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "encoding/json" - "math/big" - "sort" - "strconv" + "github.com/dtm-labs/rockscache" "time" ) -const ( - //userInfoCache = "USER_INFO_CACHE:" - //friendRelationCache = "FRIEND_RELATION_CACHE:" - blackListCache = "BLACK_LIST_CACHE:" - //groupCache = "GROUP_CACHE:" - //groupInfoCache = "GROUP_INFO_CACHE:" - //groupOwnerIDCache = "GROUP_OWNER_ID:" - //joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" - //groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" - //groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:" - //allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:" - //joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:" - //groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:" - //groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:" - conversationCache = "CONVERSATION_CACHE:" - conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:" - - extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" - extendMsgCache = "EXTEND_MSG_CACHE:" -) - const scanCount = 3000 -const RandomExpireAdjustment = 0.2 + func (rc *RcClient) DelKeys() { for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, @@ -68,3 +41,44 @@ func (rc *RcClient) DelKeys() { } } } + + +func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { + var t T + var write bool + v, err := rcClient.Fetch(key, expire, func() (s string, err error) { + t, err = fn(ctx) + if err != nil { + return "", err + } + bs, err := json.Marshal(t) + if err != nil { + return "", utils.Wrap(err, "") + } + write = true + return string(bs), nil + }) + if err != nil { + return t, err + } + if write { + return t, nil + } + err = json.Unmarshal([]byte(v), &t) + if err != nil { + return t, utils.Wrap(err, "") + } + return t, nil +} + +func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) { + rs := make([]T, 0, len(list)) + for _, e := range list { + r, err := fn(ctx, e) + if err != nil { + return nil, err + } + rs = append(rs, r) + } + return rs, nil +} diff --git a/pkg/common/db/cache/utils.go b/pkg/common/db/cache/utils.go deleted file mode 100644 index d007ff58a..000000000 --- a/pkg/common/db/cache/utils.go +++ /dev/null @@ -1,49 +0,0 @@ -package cache - -import ( - "Open_IM/pkg/utils" - "context" - "encoding/json" - "github.com/dtm-labs/rockscache" - "time" -) - -func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { - var t T - var write bool - v, err := rcClient.Fetch(key, expire, func() (s string, err error) { - t, err = fn(ctx) - if err != nil { - return "", err - } - bs, err := json.Marshal(t) - if err != nil { - return "", utils.Wrap(err, "") - } - write = true - return string(bs), nil - }) - if err != nil { - return t, err - } - if write { - return t, nil - } - err = json.Unmarshal([]byte(v), &t) - if err != nil { - return t, utils.Wrap(err, "") - } - return t, nil -} - -func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) { - rs := make([]T, 0, len(list)) - for _, e := range list { - r, err := fn(ctx, e) - if err != nil { - return nil, err - } - rs = append(rs, r) - } - return rs, nil -} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index b0b429f89..776b86b0f 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -1 +1,21 @@ package controller + +import ( + pbMsg "Open_IM/pkg/proto/msg" + "context" +) + +type MsgInterface interface { + BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error + BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64) + DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) + DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error + DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error) + ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error) +} + +type MsgDatabaseInterface interface { + BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error + BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64) + DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) +} diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 980dfa653..0fee6fc26 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -7,9 +7,9 @@ const ( CChat = "msg" ) -type UserChatModel struct { - UID string `bson:"uid"` - Msg []MsgInfoModel `bson:"msg"` +type UserMsgDocModel struct { + DocID string `bson:"uid"` + Msg []MsgInfoModel `bson:"msg"` } type MsgInfoModel struct { @@ -17,20 +17,20 @@ type MsgInfoModel struct { Msg []byte `bson:"msg"` } -func (UserChatModel) TableName() string { +func (UserMsgDocModel) TableName() string { return CChat } -func (UserChatModel) GetSingleGocMsgNum() int { +func (UserMsgDocModel) GetSingleDocMsgNum() int { return singleGocMsgNum } -func (u UserChatModel) getSeqUid(uid string, seq uint32) string { +func (u UserMsgDocModel) getSeqUid(uid string, seq uint32) string { seqSuffix := seq / singleGocMsgNum return u.indexGen(uid, seqSuffix) } -func (u UserChatModel) getSeqUserIDList(userID string, maxSeq uint32) []string { +func (u UserMsgDocModel) getSeqUserIDList(userID string, maxSeq uint32) []string { seqMaxSuffix := maxSeq / singleGocMsgNum var seqUserIDList []string for i := 0; i <= int(seqMaxSuffix); i++ { @@ -40,16 +40,16 @@ func (u UserChatModel) getSeqUserIDList(userID string, maxSeq uint32) []string { return seqUserIDList } -func (UserChatModel) getSeqSuperGroupID(groupID string, seq uint32) string { +func (UserMsgDocModel) getSeqSuperGroupID(groupID string, seq uint32) string { seqSuffix := seq / singleGocMsgNum return superGroupIndexGen(groupID, seqSuffix) } -func (u UserChatModel) GetSeqUid(uid string, seq uint32) string { +func (u UserMsgDocModel) GetSeqUid(uid string, seq uint32) string { return u.getSeqUid(uid, seq) } -func (UserChatModel) getMsgIndex(seq uint32) int { +func (UserMsgDocModel) getMsgIndex(seq uint32) int { seqSuffix := seq / singleGocMsgNum var index uint32 if seqSuffix == 0 { @@ -60,6 +60,6 @@ func (UserChatModel) getMsgIndex(seq uint32) int { return int(index) } -func (UserChatModel) indexGen(uid string, seqSuffix uint32) string { +func (UserMsgDocModel) indexGen(uid string, seqSuffix uint32) string { return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10) } diff --git a/pkg/common/db/unrelation/batch_insert_chat.go b/pkg/common/db/unrelation/batch_insert_chat.go index fdfe1f3b0..ce78e4725 100644 --- a/pkg/common/db/unrelation/batch_insert_chat.go +++ b/pkg/common/db/unrelation/batch_insert_chat.go @@ -169,117 +169,3 @@ func (d *db.DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.M } return utils.Wrap(err, ""), lastMaxSeq } - -//func (d *DataBases) BatchInsertChatBoth(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) { -// err, lastMaxSeq := d.BatchInsertChat2Cache(userID, msgList, operationID) -// if err != nil { -// log.Error(operationID, "BatchInsertChat2Cache failed ", err.Error(), userID, len(msgList)) -// return err, 0 -// } -// for { -// if runtime.NumGoroutine() > 50000 { -// log.NewWarn(operationID, "too many NumGoroutine ", runtime.NumGoroutine()) -// time.Sleep(10 * time.Millisecond) -// } else { -// break -// } -// } -// return nil, lastMaxSeq -//} -// -//func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error { -// newTime := getCurrentTimestampByMill() -// if len(msgList) > GetSingleGocMsgNum() { -// return errors.New("too large") -// } -// isInit := false -// currentMaxSeq, err := d.GetUserMaxSeq(userID) -// if err == nil { -// -// } else if err == go_redis.Nil { -// isInit = true -// currentMaxSeq = 0 -// } else { -// return utils.Wrap(err, "") -// } -// var remain uint64 -// //if currentMaxSeq < uint64(GetSingleGocMsgNum()) { -// // remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum())) -// //} else { -// // remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum())) -// //} -// -// blk0 := uint64(GetSingleGocMsgNum() - 1) -// if currentMaxSeq < uint64(GetSingleGocMsgNum()) { -// remain = blk0 - currentMaxSeq -// } else { -// excludeBlk0 := currentMaxSeq - blk0 -// remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum()) -// } -// -// insertCounter := uint64(0) -// msgListToMongo := make([]MsgInfo, 0) -// msgListToMongoNext := make([]MsgInfo, 0) -// seqUid := "" -// seqUidNext := "" -// log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList)) -// //4998 remain ==1 -// //4999 -// for _, m := range msgList { -// log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) -// currentMaxSeq++ -// sMsg := MsgInfo{} -// sMsg.SendTime = m.MsgData.SendTime -// m.MsgData.Seq = uint32(currentMaxSeq) -// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { -// return utils.Wrap(err, "") -// } -// if isInit { -// msgListToMongoNext = append(msgListToMongoNext, sMsg) -// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) -// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) -// continue -// } -// if insertCounter < remain { -// msgListToMongo = append(msgListToMongo, sMsg) -// insertCounter++ -// seqUid = getSeqUid(userID, uint32(currentMaxSeq)) -// log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) -// } else { -// msgListToMongoNext = append(msgListToMongoNext, sMsg) -// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) -// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) -// } -// } -// // ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) -// -// ctx := context.Background() -// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) -// -// if seqUid != "" { -// filter := bson.M{"uid": seqUid} -// log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo) -// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err() -// if err != nil { -// log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter) -// return utils.Wrap(err, "") -// } -// } -// if seqUidNext != "" { -// filter := bson.M{"uid": seqUidNext} -// sChat := UserChat{} -// sChat.UID = seqUidNext -// sChat.Msg = msgListToMongoNext -// log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext) -// if _, err = c.InsertOne(ctx, &sChat); err != nil { -// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) -// return utils.Wrap(err, "") -// } -// } -// log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList)) -// return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "") -//} - -//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) { -// -//} diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 37daa9480..a20fb1a65 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -74,8 +74,8 @@ func (m *Mongo) CreateTagIndex() { } func (m *Mongo) CreateMsgIndex() { - if err := m.createMongoIndex(cChat, false, "uid"); err != nil { - fmt.Println(err.Error() + " index create failed " + cChat + " uid, please create index by yourself in field uid") + if err := m.createMongoIndex(unrelation.CChat, false, "uid"); err != nil { + fmt.Println(err.Error() + " index create failed " + unrelation.CChat + " uid, please create index by yourself in field uid") } } diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 9436b06b3..677d00798 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -27,6 +27,16 @@ func NewMsgMongoDriver(mgoDB *mongo.Database) *MsgMongoDriver { return &MsgMongoDriver{mgoDB: mgoDB, MsgCollection: mgoDB.Collection(unrelation.CChat)} } +func (m *MsgMongoDriver) FindOneAndUpdate(ctx context.Context, filter, update, output interface{}, opts ...*options.FindOneAndUpdateOptions) error { + return m.MsgCollection.FindOneAndUpdate(ctx, filter, update, opts...).Decode(output) +} + +func (m *MsgMongoDriver) UpdateOne(ctx context.Context, filter, update interface{}, opts ...*options.UpdateOptions) error { + _, err := m.MsgCollection.UpdateOne(ctx, filter, update, opts...) + return err +} + +// database controller func (m *MsgMongoDriver) DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) { sortkeys.Uint32s(seqList) suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 { @@ -73,6 +83,7 @@ func (m *MsgMongoDriver) DelMsgBySeqListInOneDoc(ctx context.Context, suffixUser return unexistSeqList, nil } +// database func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error { sortkeys.Uint32s(seqList) seqMsgs, err := d.GetMsgBySeqListMongo2(ctx, uid, seqList) @@ -88,6 +99,7 @@ func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList [] return nil } +// model func (m *MsgMongoDriver) ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error { log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)