mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-25 19:22:46 +08:00
cron
This commit is contained in:
parent
02f8003b4d
commit
219d57528b
@ -584,29 +584,31 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
|
|||||||
} else {
|
} else {
|
||||||
var hasMarkDelFlag bool
|
var hasMarkDelFlag bool
|
||||||
for i, msg := range msgs.Msg {
|
for i, msg := range msgs.Msg {
|
||||||
msgPb := &sdkws.MsgData{}
|
if msg.SendTime != 0 {
|
||||||
err = proto.Unmarshal(msg.Msg, msgPb)
|
msgPb := &sdkws.MsgData{}
|
||||||
if err != nil {
|
err = proto.Unmarshal(msg.Msg, msgPb)
|
||||||
log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
|
if err != nil {
|
||||||
return 0, utils.Wrap(err, "proto.Unmarshal failed")
|
log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID)
|
||||||
}
|
return 0, utils.Wrap(err, "proto.Unmarshal failed")
|
||||||
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
|
|
||||||
msgPb.Status = constant.MsgDeleted
|
|
||||||
bytes, _ := proto.Marshal(msgPb)
|
|
||||||
msg.Msg = bytes
|
|
||||||
msg.SendTime = 0
|
|
||||||
hasMarkDelFlag = true
|
|
||||||
} else {
|
|
||||||
// 到本条消息不需要删除, minSeq置为这条消息的seq
|
|
||||||
if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
if hasMarkDelFlag {
|
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
|
||||||
if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
|
msgPb.Status = constant.MsgDeleted
|
||||||
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
|
bytes, _ := proto.Marshal(msgPb)
|
||||||
|
msg.Msg = bytes
|
||||||
|
msg.SendTime = 0
|
||||||
|
hasMarkDelFlag = true
|
||||||
|
} else {
|
||||||
|
// 到本条消息不需要删除, minSeq置为这条消息的seq
|
||||||
|
if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
|
||||||
|
return 0, err
|
||||||
}
|
}
|
||||||
|
if hasMarkDelFlag {
|
||||||
|
if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
|
||||||
|
return delStruct.getSetMinSeq(), err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return msgPb.Seq, nil
|
||||||
}
|
}
|
||||||
return msgPb.Seq, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,17 +50,7 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgDocModel) IsFull() bool {
|
func (m *MsgDocModel) IsFull() bool {
|
||||||
index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1])
|
return m.Msg[len(m.Msg)-1].SendTime != 0
|
||||||
if index == 0 {
|
|
||||||
if len(m.Msg) >= singleGocMsgNum-1 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(m.Msg) >= singleGocMsgNum {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
||||||
@ -127,14 +116,3 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkw
|
|||||||
}
|
}
|
||||||
return exceptionMsg
|
return exceptionMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
|
|
||||||
for _, v := range seqs {
|
|
||||||
msg := new(sdkws.MsgData)
|
|
||||||
msg.Seq = v
|
|
||||||
msg.GroupID = groupID
|
|
||||||
msg.SessionType = constant.SuperGroupChatType
|
|
||||||
exceptionMsg = append(exceptionMsg, msg)
|
|
||||||
}
|
|
||||||
return exceptionMsg
|
|
||||||
}
|
|
||||||
|
@ -1,134 +0,0 @@
|
|||||||
package unrelation
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
singleGocNotificationNum = 5000
|
|
||||||
Notification = "notification"
|
|
||||||
//OldestList = 0
|
|
||||||
//NewestList = -1
|
|
||||||
)
|
|
||||||
|
|
||||||
type NotificationDocModel struct {
|
|
||||||
DocID string `bson:"uid"`
|
|
||||||
Msg []NotificationInfoModel `bson:"msg"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type NotificationInfoModel struct {
|
|
||||||
SendTime int64 `bson:"sendtime"`
|
|
||||||
Msg []byte `bson:"msg"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type NotificationDocModelInterface interface {
|
|
||||||
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []NotificationInfoModel) error
|
|
||||||
Create(ctx context.Context, model *NotificationDocModel) error
|
|
||||||
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
|
|
||||||
FindOneByDocID(ctx context.Context, docID string) (*NotificationDocModel, error)
|
|
||||||
GetNewestMsg(ctx context.Context, conversationID string) (*NotificationInfoModel, error)
|
|
||||||
GetOldestMsg(ctx context.Context, conversationID string) (*NotificationInfoModel, error)
|
|
||||||
Delete(ctx context.Context, docIDs []string) error
|
|
||||||
GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*NotificationDocModel, error)
|
|
||||||
UpdateOneDoc(ctx context.Context, msg *NotificationDocModel) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (NotificationDocModel) TableName() string {
|
|
||||||
return Notification
|
|
||||||
}
|
|
||||||
|
|
||||||
func (NotificationDocModel) GetsingleGocNotificationNum() int64 {
|
|
||||||
return singleGocNotificationNum
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *NotificationDocModel) IsFull() bool {
|
|
||||||
index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1])
|
|
||||||
if index == 0 {
|
|
||||||
if len(m.Msg) >= singleGocNotificationNum-1 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(m.Msg) >= singleGocNotificationNum {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) GetDocID(conversationID string, seq int64) string {
|
|
||||||
seqSuffix := seq / singleGocNotificationNum
|
|
||||||
return m.indexGen(conversationID, seqSuffix)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
|
|
||||||
seqMaxSuffix := maxSeq / singleGocNotificationNum
|
|
||||||
var seqUserIDs []string
|
|
||||||
for i := 0; i <= int(seqMaxSuffix); i++ {
|
|
||||||
seqUserID := m.indexGen(userID, int64(i))
|
|
||||||
seqUserIDs = append(seqUserIDs, seqUserID)
|
|
||||||
}
|
|
||||||
return seqUserIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
|
|
||||||
seqSuffix := seq / singleGocNotificationNum
|
|
||||||
return m.superGroupIndexGen(groupID, seqSuffix)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
|
|
||||||
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
|
||||||
t := make(map[string][]int64)
|
|
||||||
for i := 0; i < len(seqs); i++ {
|
|
||||||
docID := m.GetDocID(conversationID, seqs[i])
|
|
||||||
if value, ok := t[docID]; !ok {
|
|
||||||
var temp []int64
|
|
||||||
t[docID] = append(temp, seqs[i])
|
|
||||||
} else {
|
|
||||||
t[docID] = append(value, seqs[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) getMsgIndex(seq uint32) int {
|
|
||||||
seqSuffix := seq / singleGocNotificationNum
|
|
||||||
var index uint32
|
|
||||||
if seqSuffix == 0 {
|
|
||||||
index = (seq - seqSuffix*singleGocNotificationNum) - 1
|
|
||||||
} else {
|
|
||||||
index = seq - seqSuffix*singleGocNotificationNum
|
|
||||||
}
|
|
||||||
return int(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m NotificationDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
|
||||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (NotificationDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
|
|
||||||
for _, v := range seqs {
|
|
||||||
msg := new(sdkws.MsgData)
|
|
||||||
msg.Seq = v
|
|
||||||
exceptionMsg = append(exceptionMsg, msg)
|
|
||||||
}
|
|
||||||
return exceptionMsg
|
|
||||||
}
|
|
||||||
|
|
||||||
func (NotificationDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
|
|
||||||
for _, v := range seqs {
|
|
||||||
msg := new(sdkws.MsgData)
|
|
||||||
msg.Seq = v
|
|
||||||
msg.GroupID = groupID
|
|
||||||
msg.SessionType = constant.SuperGroupChatType
|
|
||||||
exceptionMsg = append(exceptionMsg, msg)
|
|
||||||
}
|
|
||||||
return exceptionMsg
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user