config path

This commit is contained in:
wangchuxiao 2023-02-24 10:47:36 +08:00
parent 3106f671af
commit dd7a7d9cda
2 changed files with 42 additions and 42 deletions

View File

@ -287,7 +287,7 @@ func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMin
func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
msgs, err := m.MsgInterface.GetMessagesBySeq(ctx, req.UserID, req.Seqs)
msgs, err := m.MsgInterface.GetMessagesBySeqs(ctx, req.UserID, req.Seqs)
if err != nil {
return nil, err
}

View File

@ -24,7 +24,7 @@ import (
"github.com/golang/protobuf/proto"
)
type MsgDatabaseInterface interface {
type MsgDatabase interface {
// 批量插入消息
BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
// 刪除redis中消息缓存
@ -73,11 +73,11 @@ type MsgDatabaseInterface interface {
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface {
return &MsgDatabase{}
func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase {
return &msgDatabase{}
}
type MsgDatabase struct {
type msgDatabase struct {
mgo unRelationTb.MsgDocModelInterface
cache cache.MsgCache
msg unRelationTb.MsgDocModel
@ -85,7 +85,7 @@ type MsgDatabase struct {
rdb redis.Client
}
func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel {
func (db *msgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel {
r := make(map[string]unRelationTb.KeyValueModel)
for key, value := range reactionExtensionList {
r[key] = unRelationTb.KeyValueModel{
@ -97,35 +97,35 @@ func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*s
return r
}
func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType)
}
func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value)
}
func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration)
}
func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey)
}
func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType)
}
func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
}
func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
}
func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime)
if err != nil {
return nil, err
@ -153,40 +153,40 @@ func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessio
}, nil
}
func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error {
return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList))
}
func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status)
}
func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.cache.GetSendMsgStatus(ctx, id)
}
func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
//TODO implement me
panic("implement me")
}
func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
return db.cache.GetUserMaxSeq(ctx, userID)
}
func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
return db.cache.GetUserMinSeq(ctx, userID)
}
func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
return db.cache.GetGroupMaxSeq(ctx, groupID)
}
func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
return db.cache.GetGroupMinSeq(ctx, groupID)
}
func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
//newTime := utils.GetCurrentTimestampByMill()
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
return errors.New("too large")
@ -270,11 +270,11 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string,
return nil
}
func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
}
func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
@ -326,7 +326,7 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin
return lastMaxSeq, utils.Wrap(err, "")
}
func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
sortkeys.Int64s(seqs)
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
lock := sync.Mutex{}
@ -347,7 +347,7 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i
return totalUnExistSeqs, nil
}
func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
@ -360,7 +360,7 @@ func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s
return unExistSeqs, nil
}
func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.mgo.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
@ -391,7 +391,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s
return seqMsgs, indexes, unExistSeqs, nil
}
func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID)
if err != nil {
return nil, err
@ -399,7 +399,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb
return db.unmarshalMsg(msgInfo)
}
func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return nil, err
@ -407,7 +407,7 @@ func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb
return db.unmarshalMsg(msgInfo)
}
func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
msgPb = &sdkws.MsgData{}
err = proto.Unmarshal(msgInfo.Msg, msgPb)
if err != nil {
@ -416,7 +416,7 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *
return msgPb, nil
}
func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) {
var hasSeqs []int64
singleCount := 0
m := db.msg.GetDocIDSeqsMap(sourceID, seqs)
@ -457,7 +457,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [
return seqMsg, nil
}
func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
if err != nil {
if err != redis.Nil {
@ -478,7 +478,7 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i
return successMsgs, nil
}
func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) {
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
if err != nil {
if err != redis.Nil {
@ -499,7 +499,7 @@ func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin
return successMsgs, nil
}
func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
if err != nil {
return err
@ -508,7 +508,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error
return utils.Wrap(err, "")
}
func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
@ -536,7 +536,7 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context,
return nil
}
func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
@ -562,7 +562,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index)
if err != nil || msgs.DocID == "" {
@ -644,7 +644,7 @@ func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user
return
}
func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
@ -656,7 +656,7 @@ func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context
return
}
func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID)
if err != nil {
return 0, 0, err
@ -678,10 +678,10 @@ func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (
return
}
func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}