diff --git a/internal/api/msg.go b/internal/api/msg.go index 235e1e176..2b29371b1 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -123,6 +123,26 @@ func (m *Message) RevokeMsg(c *gin.Context) { a2r.Call(msg.MsgClient.RevokeMsg, m.client, c) } +func (m *Message) ClearConversationsMsg(c *gin.Context) { + a2r.Call(msg.MsgClient.ClearConversationsMsg, m.client, c) +} + +func (m *Message) UserClearAllMsg(c *gin.Context) { + a2r.Call(msg.MsgClient.UserClearAllMsg, m.client, c) +} + +func (m *Message) DeleteMsgs(c *gin.Context) { + a2r.Call(msg.MsgClient.DeleteMsgs, m.client, c) +} + +func (m *Message) DeleteMsgPhysicalBySeq(c *gin.Context) { + a2r.Call(msg.MsgClient.DeleteMsgPhysicalBySeq, m.client, c) +} + +func (m *Message) DeleteMsgPhysical(c *gin.Context) { + a2r.Call(msg.MsgClient.DeleteMsgPhysical, m.client, c) +} + func (m *Message) SetMessageReactionExtensions(c *gin.Context) { a2r.Call(msg.MsgClient.SetMessageReactionExtensions, m.client, c) } diff --git a/internal/api/route.go b/internal/api/route.go index 420b1e9fe..9fbcd5575 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -139,9 +139,14 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry, rdb redis.Universal msgGroup.POST("/newest_seq", m.GetSeq) msgGroup.POST("/send_msg", m.SendMessage) msgGroup.POST("/pull_msg_by_seq", m.PullMsgBySeqs) - // todo del msg route msgGroup.POST("/revoke_msg", m.RevokeMsg) + msgGroup.POST("/clear_conversation_msg", m.ClearConversationsMsg) + msgGroup.POST("/user_clear_all_msg", m.UserClearAllMsg) + msgGroup.POST("/delete_msgs", m.DeleteMsgs) + msgGroup.POST("/delete_msg_phsical_by_seq", m.DeleteMsgPhysicalBySeq) + msgGroup.POST("/delete_msg_physical", m.DeleteMsgPhysical) + msgGroup.POST("/batch_send_msg", m.ManagementBatchSendMsg) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 6eb936ab0..438997e29 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -2,6 +2,7 @@ package msggateway import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" @@ -145,7 +146,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data Req) ([]byte, erro if err := proto.Unmarshal(data.Data, &req); err != nil { return nil, err } - resp, err := g.pushClient.DelUserPushToken(context, req) + resp, err := g.pushClient.DelUserPushToken(context, &req) if err != nil { return nil, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index ef082d763..e2b2557b4 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -3,10 +3,7 @@ package msgtransfer import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -51,51 +48,6 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont if err != nil { log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) } - for _, v := range msgFromMQ.MsgData { - switch v.ContentType { - case constant.DeleteMessageNotification: - deleteMessageTips := sdkws.DeleteMessageTips{} - err := proto.Unmarshal(v.Content, &deleteMessageTips) - if err != nil { - log.ZError(ctx, "tips unmarshal err:", err, "msg", msg) - continue - } - if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { - log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) - continue - } - //case constant.MsgRevokeNotification: - // var elem sdkws.NotificationElem - // if err := json.Unmarshal(v.Content, &elem); err != nil { - // log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content)) - // continue - // } - // var tips sdkws.RevokeMsgTips - // if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - // log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content)) - // continue - // } - // msgs, err := mc.msgDatabase.GetMsgBySeqs(ctx, tips.ConversationID, []int64{tips.Seq}) - // if err != nil { - // log.ZError(ctx, "GetMsgBySeqs", err, "conversationID", tips.ConversationID, "seq", tips.Seq) - // continue - // } - // if len(msgs) == 0 { - // log.ZError(ctx, "GetMsgBySeqs empty", errors.New("seq not found"), "conversationID", tips.ConversationID, "seq", tips.Seq) - // continue - // } - // msgs[0].Content = []byte(elem.Detail) - // data, err := proto.Marshal(msgs[0]) - // if err != nil { - // log.ZError(ctx, "proto.Marshal MsgData", err) - // continue - // } - // if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil { - // log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq) - // continue - // } - } - } } func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 084320583..8220e94a4 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -52,14 +52,24 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { return nil, err } + if err := m.MsgDatabase.DeleteUserMsgsBySeqs(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil { + return nil, err + } return &msg.DeleteMsgsResp{}, nil } func (m *msgServer) DeleteMsgPhysicalBySeq(ctx context.Context, req *msg.DeleteMsgPhysicalBySeqReq) (*msg.DeleteMsgPhysicalBySeqResp, error) { + err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs) + if err != nil { + return nil, err + } return &msg.DeleteMsgPhysicalBySeqResp{}, nil } func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhysicalReq) (*msg.DeleteMsgPhysicalResp, error) { + if err := tokenverify.CheckAdmin(ctx); err != nil { + return nil, err + } for _, conversationID := range req.ConversationIDs { if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, req.RemainTime); err != nil { log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err) diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 6d9064f05..f118789b9 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -3,6 +3,8 @@ package msg import ( "context" "encoding/json" + "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" @@ -11,7 +13,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "time" ) func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { @@ -43,7 +44,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. sessionType = constant.SuperGroupChatType conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID) } - msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, conversationID, []int64{req.Seq}) + msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.RecvID, conversationID, []int64{req.Seq}) if err != nil { return nil, err } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index eb243cb47..4728e3bba 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -3,11 +3,9 @@ package controller import ( "fmt" "sort" - "strconv" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" @@ -30,8 +28,6 @@ import ( const ( updateKeyMsg = iota updateKeyRevoke - updateKeyDel - updateKeyRead ) type CommonMsgDatabase interface { @@ -48,9 +44,12 @@ type CommonMsgDatabase interface { GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) // 通过seqList获取大群在 mongo里面的消息 GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) - // 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error + // 用户根据seq删除消息 + DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error + // 物理删除消息置空 + DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) @@ -63,7 +62,6 @@ type CommonMsgDatabase interface { SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) - GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error @@ -164,10 +162,6 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } case updateKeyRevoke: _, ok = field.(*unRelationTb.RevokeModel) - case updateKeyDel: - _, ok = field.([]string) - case updateKeyRead: - _, ok = field.([]string) default: return errs.ErrInternalServer.Wrap("key is invalid") } @@ -175,30 +169,20 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return errs.ErrInternalServer.Wrap("field type is invalid") } } - getDocID := func(seq int64) string { - return conversationID + ":" + strconv.FormatInt(seq/num, 10) - } - getIndex := func(seq int64) int64 { - return seq % num - } // 返回值为true表示数据库存在该文档,false表示数据库不存在该文档 updateMsgModel := func(seq int64, i int) (bool, error) { var ( res *mongo.UpdateResult err error ) - docID := getDocID(seq) - index := getIndex(seq) + docID := db.msg.GetDocID(conversationID, seq) + index := db.msg.GetMsgIndex(seq) field := fields[i] switch key { case updateKeyMsg: res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) case updateKeyRevoke: res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) - case updateKeyDel: - res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", field) - case updateKeyRead: - res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", field) } if err != nil { return false, err @@ -218,33 +202,25 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } } doc := unRelationTb.MsgDocModel{ - DocID: getDocID(seq), + DocID: db.msg.GetDocID(conversationID, seq), Msg: make([]*unRelationTb.MsgInfoModel, num), } var insert int // 插入的数量 for j := i; j < len(fields); j++ { seq = firstSeq + int64(j) - if getDocID(seq) != doc.DocID { + if db.msg.GetDocID(conversationID, seq) != doc.DocID { break } insert++ switch key { case updateKeyMsg: - doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ + doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{ Msg: fields[j].(*unRelationTb.MsgDataModel), } case updateKeyRevoke: - doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ + doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{ Revoke: fields[j].(*unRelationTb.RevokeModel), } - case updateKeyDel: - doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ - DelList: fields[j].([]string), - } - case updateKeyRead: - doc.Msg[getIndex(seq)] = &unRelationTb.MsgInfoModel{ - ReadList: fields[j].([]string), - } } } for i, model := range doc.Msg { @@ -255,9 +231,6 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI if model.DelList == nil { doc.Msg[i].DelList = []string{} } - // if model.ReadList == nil { - // doc.Msg[i].ReadList = []string{} - // } } if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { @@ -381,43 +354,43 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st return totalMsgs, nil } -func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) { - var reFetchSeqs []int64 - if delNums > 0 { - newBeginSeq := rangeBegin - delNums - if newBeginSeq >= begin { - newEndSeq := rangeBegin - 1 - for i := newBeginSeq; i <= newEndSeq; i++ { - reFetchSeqs = append(reFetchSeqs, i) - } - } - } - if len(reFetchSeqs) == 0 { - return - } - if len(reFetchSeqs) > 0 { - // m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs) - // for docID, seqs := range m { - // msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs) - // if err != nil { - // return nil, err - // } - // for _, msg := range msgs { - // if msg.Status != constant.MsgDeleted { - // seqMsgs = append(seqMsgs, msg) - // } - // } - // } - } - if len(seqMsgs) < int(delNums) { - seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin) - if err != nil { - return seqMsgs, err - } - seqMsgs = append(seqMsgs, seqMsgs2...) - } - return seqMsgs, nil -} +// func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) { +// var reFetchSeqs []int64 +// if delNums > 0 { +// newBeginSeq := rangeBegin - delNums +// if newBeginSeq >= begin { +// newEndSeq := rangeBegin - 1 +// for i := newBeginSeq; i <= newEndSeq; i++ { +// reFetchSeqs = append(reFetchSeqs, i) +// } +// } +// } +// if len(reFetchSeqs) == 0 { +// return +// } +// if len(reFetchSeqs) > 0 { +// m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs) +// for docID, seqs := range m { +// msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs) +// if err != nil { +// return nil, err +// } +// for _, msg := range msgs { +// if msg.Status != constant.MsgDeleted { +// seqMsgs = append(seqMsgs, msg) +// } +// } +// } +// } +// if len(seqMsgs) < int(delNums) { +// seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin) +// if err != nil { +// return seqMsgs, err +// } +// seqMsgs = append(seqMsgs, seqMsgs2...) +// } +// return seqMsgs, nil +// } func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) @@ -463,23 +436,9 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin } totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...) } - log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs) + log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs, "del seqs", delSeqs) + // 补未找到的消息 seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...) - for _, msg := range seqMsgs { - if msg.Status == constant.MsgDeleted { - delSeqs = append(delSeqs, msg.Seq) - } - } - if len(delSeqs) > 0 { - // msgs, err := db.refetchDelSeqsMsgs(ctx, conversationID, int64(len(delSeqs)), allSeqs[0], begin) - // if err != nil { - // log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin) - // } - // for _, msg := range msgs { - // seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg)) - // } - } - // sort by seq if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 { sort.Sort(utils.MsgBySeq(seqMsgs)) } @@ -644,6 +603,30 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio return seq, err } +func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { + var indexes []int + for _, seq := range seqs { + indexes = append(indexes, int(db.msg.GetMsgIndex(seq))) + } + if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil { + return err + } + } + return nil +} + +func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { + for _, seq := range seqs { + if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { + return err + } + } + } + return nil +} + func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) { for _, conversationID := range conversationIDs { maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) @@ -709,7 +692,6 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context if err != nil { return } - // from cache minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID) if err != nil { return diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 6de3b4031..38e0872c5 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -3,15 +3,16 @@ package controller import ( "context" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" - "go.mongodb.org/mongo-driver/bson" "math/rand" "strconv" "sync" "testing" "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" + "go.mongodb.org/mongo-driver/bson" ) func Test_BatchInsertChat2DB(t *testing.T) { diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index f733a1212..323e4a7fc 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -3,7 +3,6 @@ package unrelation import ( "context" "strconv" - "strings" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "go.mongodb.org/mongo-driver/mongo" @@ -94,31 +93,10 @@ func (m *MsgDocModel) IsFull() bool { } func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { - seqSuffix := seq / singleGocMsgNum + seqSuffix := (seq - 1) / singleGocMsgNum return m.indexGen(conversationID, seqSuffix) } -func (m MsgDocModel) IndexDocID(conversationID string, index int64) string { - return m.indexGen(conversationID, index) -} - -func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { - seqMaxSuffix := maxSeq / singleGocMsgNum - var seqUserIDs []string - for i := 0; i <= int(seqMaxSuffix); i++ { - seqUserID := m.indexGen(userID, int64(i)) - seqUserIDs = append(seqUserIDs, seqUserID) - } - return seqUserIDs -} - -func (m MsgDocModel) ToNextDoc(docID string) string { - l := strings.Split(docID, ":") - index, _ := strconv.Atoi(l[len(l)-1]) - index++ - return strings.Split(docID, ":")[0] + ":" + strconv.Itoa(index) -} - func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { t := make(map[string][]int64) for i := 0; i < len(seqs); i++ { @@ -134,14 +112,7 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st } func (m MsgDocModel) GetMsgIndex(seq int64) int64 { - seqSuffix := seq / singleGocMsgNum - var index int64 - if seqSuffix == 0 { - index = (seq - seqSuffix*singleGocMsgNum) - 1 - } else { - index = seq - seqSuffix*singleGocMsgNum - } - return index + return (seq - 1) % singleGocMsgNum } func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go index 9904c8d2d..53cc2bee8 100644 --- a/pkg/rpcclient/push.go +++ b/pkg/rpcclient/push.go @@ -2,6 +2,7 @@ package rpcclient import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" @@ -20,12 +21,12 @@ func NewPushClient(client discoveryregistry.SvcDiscoveryRegistry) *PushClient { } } -func (p *PushClient) DelUserPushToken(ctx context.Context, req push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { +func (p *PushClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { cc, err := p.getConn(ctx) if err != nil { return nil, err } - resp, err := push.NewPushMsgServiceClient(cc).DelUserPushToken(ctx, &req) + resp, err := push.NewPushMsgServiceClient(cc).DelUserPushToken(ctx, req) if err != nil { return nil, err }