From 23c24308f7ba2571671bfc35d0183e098be25635 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 6 Mar 2023 16:23:16 +0800 Subject: [PATCH] makefile --- internal/common/notification/extend_msg.go | 4 +- internal/msgtransfer/modify_msg_handler.go | 18 ++---- .../msgtransfer/persistent_msg_handler.go | 2 +- internal/rpc/msg/extend_msg.go | 24 ++++---- pkg/apistruct/msg.go | 28 ++++----- pkg/common/db/controller/msg.go | 59 ++++++++----------- .../db/table/unrelation/extend_msg_set.go | 13 ++++ 7 files changed, 70 insertions(+), 78 deletions(-) diff --git a/internal/common/notification/extend_msg.go b/internal/common/notification/extend_msg.go index a2df13434..f4a9ce471 100644 --- a/internal/common/notification/extend_msg.go +++ b/internal/common/notification/extend_msg.go @@ -25,7 +25,7 @@ func (c *Check) ExtendMessageUpdatedNotification(ctx context.Context, sendID str if len(keyMap) == 0 { return } - m.SuccessReactionExtensionList = keyMap + m.SuccessReactionExtensions = keyMap m.ClientMsgID = req.ClientMsgID m.IsReact = resp.IsReact m.IsExternalExtensions = req.IsExternalExtensions @@ -47,7 +47,7 @@ func (c *Check) ExtendMessageDeleteNotification(ctx context.Context, sendID stri if len(keyMap) == 0 { return } - m.SuccessReactionExtensionList = keyMap + m.SuccessReactionExtensions = keyMap m.ClientMsgID = req.ClientMsgID m.MsgFirstModifyTime = req.MsgFirstModifyTime diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 4a63fe434..b5d01510e 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -10,7 +10,6 @@ import ( "OpenIM/pkg/common/log" "OpenIM/pkg/common/tracelog" pbMsg "OpenIM/pkg/proto/msg" - sdkws "OpenIM/pkg/proto/sdkws" "OpenIM/pkg/utils" "context" "encoding/json" @@ -23,6 +22,7 @@ type ModifyMsgConsumerHandler struct { modifyMsgConsumerGroup *kfk.MConsumerGroup extendMsgDatabase controller.ExtendMsgDatabase + extendSetMsgModel unRelationTb.ExtendMsgSetModel } func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyMsgConsumerHandler { @@ -66,7 +66,6 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama if !isReactionFromCache { continue } - ctx := context.Background() tracelog.SetOperationID(ctx, operationID) if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageModifier { notification := &apistruct.ReactionMessageModifierNotification{} @@ -85,7 +84,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama ClientMsgID: notification.ClientMsgID, MsgFirstModifyTime: notification.MsgFirstModifyTime, } - for _, v := range notification.SuccessReactionExtensionList { + for _, v := range notification.SuccessReactionExtensions { reactionExtensionList[v.TypeKey] = unRelationTb.KeyValueModel{ TypeKey: v.TypeKey, Value: v.Value, @@ -98,16 +97,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama continue } } else { - var reactionExtensionList = make(map[string]*sdkws.KeyValue) - for _, v := range notification.SuccessReactionExtensionList { - reactionExtensionList[v.TypeKey] = &sdkws.KeyValue{ - TypeKey: v.TypeKey, - Value: v.Value, - LatestUpdateTime: v.LatestUpdateTime, - } - } - // is already modify - if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, reactionExtensionList); err != nil { + if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil { log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed") } } @@ -116,7 +106,7 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil { continue } - if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, notification.SuccessReactionExtensionList); err != nil { + if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.SourceID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil { log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed") } } diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index f72ed4450..481b3f505 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -65,7 +65,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs } if tag { log.NewInfo(operationID, "msg_transfer msg persisting", string(msg)) - if err = pc.chatLogInterface.CreateChatLog(msgFromMQ); err != nil { + if err = pc.chatLogDatabase.CreateChatLog(msgFromMQ); err != nil { log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) return } diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index 25d223ff6..00998b9a8 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -93,7 +93,7 @@ func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.S //} else { // log.Debug(req.OperationID, "redis handle secondly", req.String()) // - // for k, v := range req.ReactionExtensionList { + // for k, v := range req.Pb2Model { // err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) // if err != nil { // setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v) @@ -179,7 +179,7 @@ func (m *msgServer) GetMessagesReactionExtensions(ctx context.Context, req *msg. // utils.JsonStringToStruct(v, temp) // keyMap[k] = temp // } - // oneMessage.ReactionExtensionList = keyMap + // oneMessage.Pb2Model = keyMap // // } else { // mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime) @@ -191,14 +191,14 @@ func (m *msgServer) GetMessagesReactionExtensions(ctx context.Context, req *msg. // } // keyMap := make(map[string]*sdkws.KeyValue) // - // for k, v := range mongoValue.ReactionExtensionList { + // for k, v := range mongoValue.Pb2Model { // temp := new(sdkws.KeyValue) // temp.TypeKey = v.TypeKey // temp.Value = v.Value // temp.LatestUpdateTime = v.LatestUpdateTime // keyMap[k] = temp // } - // oneMessage.ReactionExtensionList = keyMap + // oneMessage.Pb2Model = keyMap // } // rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) //} @@ -218,7 +218,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms //if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 { // rResp.ErrCode = int32(callbackResp.ErrCode) // rResp.ErrMsg = callbackResp.ErrMsg - // for _, value := range req.ReactionExtensionList { + // for _, value := range req.Pb2Model { // temp := new(msg.KeyValueResp) // temp.KeyValue = value // temp.ErrMsg = callbackResp.ErrMsg @@ -242,7 +242,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms // *req = append((*req)[:i], (*req)[i+1:]...) // } // } - // }(&req.ReactionExtensionList, v.KeyValue.TypeKey) + // }(&req.Pb2Model, v.KeyValue.TypeKey) // rResp.Result = append(rResp.Result, v) // } //} @@ -250,7 +250,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms //if err != nil { // rResp.ErrCode = 100 // rResp.ErrMsg = err.Error() - // for _, value := range req.ReactionExtensionList { + // for _, value := range req.Pb2Model { // temp := new(msg.KeyValueResp) // temp.KeyValue = value // temp.ErrMsg = err.Error() @@ -262,7 +262,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms // //if isExists { // log.Debug(req.OperationID, "redis handle this delete", req.String()) - // for _, v := range req.ReactionExtensionList { + // for _, v := range req.Pb2Model { // err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey) // if err != nil { // setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v) @@ -293,7 +293,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms // if err != nil { // rResp.ErrCode = 100 // rResp.ErrMsg = err.Error() - // for _, value := range req.ReactionExtensionList { + // for _, value := range req.Pb2Model { // temp := new(msg.KeyValueResp) // temp.KeyValue = value // temp.ErrMsg = err.Error() @@ -306,7 +306,7 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms // if err != nil { // rResp.ErrCode = 200 // rResp.ErrMsg = err.Error() - // for _, value := range req.ReactionExtensionList { + // for _, value := range req.Pb2Model { // temp := new(msg.KeyValueResp) // temp.KeyValue = value // temp.ErrMsg = err.Error() @@ -316,10 +316,10 @@ func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *ms // return &rResp, nil // } // setValue := make(map[string]*sdkws.KeyValue) - // for _, v := range req.ReactionExtensionList { + // for _, v := range req.Pb2Model { // // temp := new(sdkws.KeyValue) - // if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok { + // if vv, ok := mongoValue.Pb2Model[v.TypeKey]; ok { // utils.CopyStructFields(temp, &vv) // if v.LatestUpdateTime != vv.LatestUpdateTime { // setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index e7d926d19..0d5927350 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -121,21 +121,21 @@ type DeleteMessageReactionExtensionsResp struct { } type ReactionMessageModifierNotification struct { - SourceID string `json:"sourceID" binding:"required"` - OpUserID string `json:"opUserID" binding:"required"` - SessionType int32 `json:"sessionType" binding:"required"` - SuccessReactionExtensionList map[string]*sdkws.KeyValue `json:"reactionExtensionList,omitempty" binding:"required"` - ClientMsgID string `json:"clientMsgID" binding:"required"` - IsReact bool `json:"isReact"` - IsExternalExtensions bool `json:"isExternalExtensions"` - MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` + SourceID string `json:"sourceID" binding:"required"` + OpUserID string `json:"opUserID" binding:"required"` + SessionType int32 `json:"sessionType" binding:"required"` + SuccessReactionExtensions map[string]*sdkws.KeyValue `json:"reactionExtensionList,omitempty" binding:"required"` + ClientMsgID string `json:"clientMsgID" binding:"required"` + IsReact bool `json:"isReact"` + IsExternalExtensions bool `json:"isExternalExtensions"` + MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } type ReactionMessageDeleteNotification struct { - SourceID string `json:"sourceID" binding:"required"` - OpUserID string `json:"opUserID" binding:"required"` - SessionType int32 `json:"sessionType" binding:"required"` - SuccessReactionExtensionList map[string]*sdkws.KeyValue `json:"reactionExtensionList,omitempty" binding:"required"` - ClientMsgID string `json:"clientMsgID" binding:"required"` - MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` + SourceID string `json:"sourceID" binding:"required"` + OpUserID string `json:"opUserID" binding:"required"` + SessionType int32 `json:"sessionType" binding:"required"` + SuccessReactionExtensions map[string]*sdkws.KeyValue `json:"reactionExtensionList,omitempty" binding:"required"` + ClientMsgID string `json:"clientMsgID" binding:"required"` + MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 57f75cdf9..e2488428e 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -77,8 +77,8 @@ type MsgDatabase interface { func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase { return &msgDatabase{ - msgDocModel: msgDocModel, - cache: cacheModel, + msgDocDatabase: msgDocModel, + cache: cacheModel, } } @@ -90,22 +90,11 @@ func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDat } type msgDatabase struct { - msgDocModel unRelationTb.MsgDocModelInterface - cache cache.Model - msg unRelationTb.MsgDocModel - extendMsgModel unRelationTb.ExtendMsgSetModelInterface -} - -func (db *msgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]*unRelationTb.KeyValueModel { - r := make(map[string]*unRelationTb.KeyValueModel) - for key, value := range reactionExtensionList { - r[key] = &unRelationTb.KeyValueModel{ - TypeKey: value.TypeKey, - Value: value.Value, - LatestUpdateTime: value.LatestUpdateTime, - } - } - return r + msgDocDatabase unRelationTb.MsgDocModelInterface + extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface + cache cache.Model + msg unRelationTb.MsgDocModel + extendMsgSetModel unRelationTb.ExtendMsgSetModel } func (db *msgDatabase) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { @@ -133,11 +122,11 @@ func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID stri } func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error { - return db.extendMsgModel.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensions)) + return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions)) } func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { - extendMsgSet, err := db.extendMsgModel.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime) + extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime) if err != nil { return nil, err } @@ -165,7 +154,7 @@ func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessio } func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error { - return db.extendMsgModel.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensions)) + return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions)) } func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { @@ -244,13 +233,13 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, //filter := bson.M{"uid": seqUid} //log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID) //err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err() - err = db.msgDocModel.PushMsgsToDoc(ctx, docID, msgsToMongo) + err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo) if err != nil { if err == mongo.ErrNoDocuments { doc := &unRelationTb.MsgDocModel{} doc.DocID = docID doc.Msg = msgsToMongo - if err = db.msgDocModel.Create(ctx, doc); err != nil { + if err = db.msgDocDatabase.Create(ctx, doc); err != nil { prome.Inc(prome.MsgInsertMongoFailedCounter) //log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") @@ -270,7 +259,7 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, nextDoc.DocID = docIDNext nextDoc.Msg = msgsToMongoNext //log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) - if err = db.msgDocModel.Create(ctx, nextDoc); err != nil { + if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil { prome.Inc(prome.MsgInsertMongoFailedCounter) //log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") @@ -364,7 +353,7 @@ func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s return nil, err } for i, v := range seqMsgs { - if err = db.msgDocModel.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil { + if err = db.msgDocDatabase.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil { return nil, err } } @@ -372,7 +361,7 @@ func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s } func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { - doc, err := db.msgDocModel.FindOneByDocID(ctx, docID) + doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) if err != nil { return nil, nil, nil, err } @@ -403,7 +392,7 @@ func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s } func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { - msgInfo, err := db.msgDocModel.GetNewestMsg(ctx, sourceID) + msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, sourceID) if err != nil { return nil, err } @@ -411,7 +400,7 @@ func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb } func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { - msgInfo, err := db.msgDocModel.GetOldestMsg(ctx, sourceID) + msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, sourceID) if err != nil { return nil, err } @@ -432,7 +421,7 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ singleCount := 0 m := db.msg.GetDocIDSeqsMap(sourceID, seqs) for docID, value := range m { - doc, err := db.msgDocModel.FindOneByDocID(ctx, docID) + doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) if err != nil { //log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error()) continue @@ -575,7 +564,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // recursion 删除list并且返回设置的最小seq func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list - msgs, err := db.msgDocModel.GetMsgsByIndex(ctx, sourceID, index) + msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, sourceID, index) if err != nil || msgs.DocID == "" { if err != nil { if err == unrelation.ErrMsgListNotExist { @@ -585,7 +574,7 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, } } // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList) - err = db.msgDocModel.Delete(ctx, delStruct.delDocIDs) + err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs) if err != nil { return 0, err } @@ -620,11 +609,11 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, msg.SendTime = 0 hasMarkDelFlag = true } else { - if err := db.msgDocModel.Delete(ctx, delStruct.delDocIDs); err != nil { + if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil { return 0, err } if hasMarkDelFlag { - if err := db.msgDocModel.UpdateOneDoc(ctx, msgs); err != nil { + if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil { return delStruct.getSetMinSeq(), utils.Wrap(err, "") } } @@ -668,7 +657,7 @@ func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context } func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { - oldestMsgMongo, err := db.msgDocModel.GetOldestMsg(ctx, sourceID) + oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, sourceID) if err != nil { return 0, 0, err } @@ -677,7 +666,7 @@ func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) ( return 0, 0, err } minSeqMongo = msgPb.Seq - newestMsgMongo, err := db.msgDocModel.GetNewestMsg(ctx, sourceID) + newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, sourceID) if err != nil { return 0, 0, err } diff --git a/pkg/common/db/table/unrelation/extend_msg_set.go b/pkg/common/db/table/unrelation/extend_msg_set.go index 12a834fc0..0afe9d9da 100644 --- a/pkg/common/db/table/unrelation/extend_msg_set.go +++ b/pkg/common/db/table/unrelation/extend_msg_set.go @@ -1,6 +1,7 @@ package unrelation import ( + "OpenIM/pkg/proto/sdkws" "context" "strconv" "strings" @@ -66,3 +67,15 @@ func (e *ExtendMsgSetModel) SplitSourceIDAndGetIndex() int32 { type GetAllExtendMsgSetOpts struct { ExcludeExtendMsgs bool } + +func (ExtendMsgSetModel) Pb2Model(reactionExtensionList map[string]*sdkws.KeyValue) map[string]*KeyValueModel { + r := make(map[string]*KeyValueModel) + for key, value := range reactionExtensionList { + r[key] = &KeyValueModel{ + TypeKey: value.TypeKey, + Value: value.Value, + LatestUpdateTime: value.LatestUpdateTime, + } + } + return r +}