From dd7a7d9cdad131053dd68c0bfc19257039dfc170 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 24 Feb 2023 10:47:36 +0800 Subject: [PATCH] config path --- internal/rpc/msg/send_pull.go | 2 +- pkg/common/db/controller/msg.go | 82 ++++++++++++++++----------------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index bfa0a063f..3919b125d 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -287,7 +287,7 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) { resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)} - msgs, err := m.MsgInterface.GetMessagesBySeq(ctx, req.UserID, req.Seqs) + msgs, err := m.MsgInterface.GetMessagesBySeqs(ctx, req.UserID, req.Seqs) if err != nil { return nil, err } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index c7083fbc8..63baac03e 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -24,7 +24,7 @@ import ( "github.com/golang/protobuf/proto" ) -type MsgDatabaseInterface interface { +type MsgDatabase interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error // 刪除redis中消息缓存 @@ -73,11 +73,11 @@ type MsgDatabaseInterface interface { GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) } -func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { - return &MsgDatabase{} +func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase { + return &msgDatabase{} } -type MsgDatabase struct { +type msgDatabase struct { mgo unRelationTb.MsgDocModelInterface cache cache.MsgCache msg unRelationTb.MsgDocModel @@ -85,7 +85,7 @@ type MsgDatabase struct { rdb redis.Client } -func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel { +func (db *msgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel { r := make(map[string]unRelationTb.KeyValueModel) for key, value := range reactionExtensionList { r[key] = unRelationTb.KeyValueModel{ @@ -97,35 +97,35 @@ func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*s return r } -func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { +func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType) } -func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { +func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value) } -func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { +func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration) } -func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { +func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey) } -func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { +func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType) } -func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { +func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey) } -func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { +func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList)) } -func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { +func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime) if err != nil { return nil, err @@ -153,40 +153,40 @@ func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessio }, nil } -func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { +func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList)) } -func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { +func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return db.cache.SetSendMsgStatus(ctx, id, status) } -func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { +func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { return db.cache.GetSendMsgStatus(ctx, id) } -func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { +func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { return db.cache.GetUserMaxSeq(ctx, userID) } -func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { +func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { return db.cache.GetUserMinSeq(ctx, userID) } -func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { return db.cache.GetGroupMaxSeq(ctx, groupID) } -func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { +func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { return db.cache.GetGroupMinSeq(ctx, groupID) } -func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { +func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { //newTime := utils.GetCurrentTimestampByMill() if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { return errors.New("too large") @@ -270,11 +270,11 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, return nil } -func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { return db.cache.DeleteMessageFromCache(ctx, userID, msgs) } -func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { +func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { //newTime := utils.GetCurrentTimestampByMill() lenList := len(msgList) if int64(lenList) > db.msg.GetSingleGocMsgNum() { @@ -326,7 +326,7 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin return lastMaxSeq, utils.Wrap(err, "") } -func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { sortkeys.Int64s(seqs) docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) lock := sync.Mutex{} @@ -347,7 +347,7 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i return totalUnExistSeqs, nil } -func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { return nil, err @@ -360,7 +360,7 @@ func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s return unExistSeqs, nil } -func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { +func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { doc, err := db.mgo.FindOneByDocID(ctx, docID) if err != nil { return nil, nil, nil, err @@ -391,7 +391,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s return seqMsgs, indexes, unExistSeqs, nil } -func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID) if err != nil { return nil, err @@ -399,7 +399,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return nil, err @@ -407,7 +407,7 @@ func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { msgPb = &sdkws.MsgData{} err = proto.Unmarshal(msgInfo.Msg, msgPb) if err != nil { @@ -416,7 +416,7 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb * return msgPb, nil } -func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { var hasSeqs []int64 singleCount := 0 m := db.msg.GetDocIDSeqsMap(sourceID, seqs) @@ -457,7 +457,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ return seqMsg, nil } -func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) if err != nil { if err != redis.Nil { @@ -478,7 +478,7 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i return successMsgs, nil } -func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) if err != nil { if err != redis.Nil { @@ -499,7 +499,7 @@ func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin return successMsgs, nil } -func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { +func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) if err != nil { return err @@ -508,7 +508,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error return utils.Wrap(err, "") } -func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { +func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -536,7 +536,7 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, return nil } -func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { +func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -562,7 +562,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // seq 70 // set minSeq 21 // recursion 删除list并且返回设置的最小seq -func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { +func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index) if err != nil || msgs.DocID == "" { @@ -644,7 +644,7 @@ func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user return } -func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { +func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID) if err != nil { return 0, 0, 0, err @@ -656,7 +656,7 @@ func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context return } -func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { +func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return 0, 0, err @@ -678,10 +678,10 @@ func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) ( return } -func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) } -func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { +func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { return db.cache.SetUserMinSeq(ctx, userID, minSeq) }