mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
feat: Optimize Scheduled Task (#2985)
* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks
This commit is contained in:
parent
248cb5c107
commit
b26b0a422c
4
go.mod
4
go.mod
@ -14,8 +14,8 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.63
|
github.com/openimsdk/protocol v0.0.72-alpha.67
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.52
|
github.com/openimsdk/tools v0.0.50-alpha.58
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
|
12
go.sum
12
go.sum
@ -317,12 +317,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
|||||||
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
|
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
|
||||||
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0=
|
github.com/openimsdk/protocol v0.0.72-alpha.67 h1:zlLbVkoT0OYsjO2RCutQuDFllcfNvZfdYchvlR6UIe0=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
|
github.com/openimsdk/protocol v0.0.72-alpha.67/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.52 h1:SAxnn6xgHPcEBHTebNLFgvUqmxd4d2XpBBh9jHpUEvs=
|
github.com/openimsdk/tools v0.0.50-alpha.58 h1:hkFL02Bzzp/l5x+tb7kJ9zes7hilh65EQ4qEIthsQX4=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.52/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
|
github.com/openimsdk/tools v0.0.50-alpha.58/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
@ -173,6 +173,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
|||||||
authRouterGroup.POST("/get_user_token", a.GetUserToken)
|
authRouterGroup.POST("/get_user_token", a.GetUserToken)
|
||||||
authRouterGroup.POST("/parse_token", a.ParseToken)
|
authRouterGroup.POST("/parse_token", a.ParseToken)
|
||||||
authRouterGroup.POST("/force_logout", a.ForceLogout)
|
authRouterGroup.POST("/force_logout", a.ForceLogout)
|
||||||
|
|
||||||
}
|
}
|
||||||
// Third service
|
// Third service
|
||||||
thirdGroup := r.Group("/third")
|
thirdGroup := r.Group("/third")
|
||||||
|
@ -751,3 +751,53 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *
|
|||||||
}
|
}
|
||||||
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
|
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) {
|
||||||
|
conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
|
||||||
|
for i, conversation := range conversations {
|
||||||
|
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}
|
||||||
|
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resp.Seq <= 0 {
|
||||||
|
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq)
|
||||||
|
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp.Seq++
|
||||||
|
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime)
|
||||||
|
}
|
||||||
|
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx context.Context, conversationID string, ownerUserID string, minSeq int64, latestMsgDestructTime time.Time) error {
|
||||||
|
update := map[string]any{
|
||||||
|
"latest_msg_destruct_time": latestMsgDestructTime,
|
||||||
|
}
|
||||||
|
if minSeq >= 0 {
|
||||||
|
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq}
|
||||||
|
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
update["min_seq"] = minSeq
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{ownerUserID}, conversationID, update); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -2,6 +2,7 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
@ -9,7 +10,6 @@ import (
|
|||||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/wrapperspb"
|
"github.com/openimsdk/protocol/wrapperspb"
|
||||||
"github.com/openimsdk/tools/errs"
|
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
@ -19,63 +19,50 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// hard delete in Database.
|
// hard delete in Database.
|
||||||
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
|
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
|
||||||
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if req.Timestamp > time.Now().UnixMilli() {
|
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
|
||||||
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
|
|
||||||
}
|
|
||||||
var (
|
|
||||||
docNum int
|
|
||||||
msgNum int
|
|
||||||
start = time.Now()
|
|
||||||
getLimit = 5000
|
|
||||||
)
|
|
||||||
|
|
||||||
destructMsg := func(ctx context.Context) (bool, error) {
|
|
||||||
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if len(msgs) == 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, msg := range msgs {
|
|
||||||
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if len(index) == 0 {
|
|
||||||
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
docNum++
|
|
||||||
msgNum += len(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = destructMsg(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
for i, doc := range docs {
|
||||||
log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
|
if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil {
|
||||||
|
return nil, err
|
||||||
return &msg.DestructMsgsResp{}, nil
|
}
|
||||||
|
log.ZDebug(ctx, "DestructMsgs delete doc", "index", i, "docID", doc.DocID)
|
||||||
|
index := strings.LastIndex(doc.DocID, ":")
|
||||||
|
if index < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var minSeq int64
|
||||||
|
for _, model := range doc.Msg {
|
||||||
|
if model.Msg == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if model.Msg.Seq > minSeq {
|
||||||
|
minSeq = model.Msg.Seq
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if minSeq <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
conversationID := doc.DocID[:index]
|
||||||
|
if conversationID == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
minSeq++
|
||||||
|
if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.ZDebug(ctx, "DestructMsgs delete doc set min seq", "index", i, "docID", doc.DocID, "conversationID", conversationID, "setMinSeq", minSeq)
|
||||||
|
}
|
||||||
|
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// soft delete for user self
|
// soft delete for user self
|
||||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
|
||||||
temp := convert.ConversationsPb2DB(req.Conversations)
|
temp := convert.ConversationsPb2DB(req.Conversations)
|
||||||
|
|
||||||
batchNum := 100
|
batchNum := 100
|
||||||
@ -134,3 +121,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
|||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
|
||||||
|
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil
|
||||||
|
}
|
||||||
|
@ -19,17 +19,14 @@ import (
|
|||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/protocol/third"
|
"github.com/openimsdk/protocol/third"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
@ -288,87 +285,35 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
|
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
|
||||||
var conf config.Third
|
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
engine := t.config.RpcConfig.Object.Enable
|
||||||
expireTime := time.UnixMilli(req.ExpireTime)
|
expireTime := time.UnixMilli(req.ExpireTime)
|
||||||
|
|
||||||
findPagination := &sdkws.RequestPagination{
|
|
||||||
PageNumber: 1,
|
|
||||||
ShowNumber: 500,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find all expired data in S3 database
|
// Find all expired data in S3 database
|
||||||
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
|
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit))
|
||||||
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
for i, obj := range models {
|
||||||
if total == 0 {
|
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil {
|
||||||
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
|
|
||||||
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
needDelObjectKeys := make([]string, len(models))
|
|
||||||
for _, model := range models {
|
|
||||||
needDelObjectKeys = append(needDelObjectKeys, model.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove duplicate keys, have the same key use in different models
|
|
||||||
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
|
|
||||||
|
|
||||||
for _, key := range needDelObjectKeys {
|
|
||||||
// Find all models by key
|
|
||||||
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
|
|
||||||
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
|
|
||||||
return nil, errs.Wrap(err)
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil {
|
||||||
// check keyModels, if all keyModels.
|
return nil, err
|
||||||
needDelKey := true // Default can delete
|
|
||||||
for _, keymodel := range keyModels {
|
|
||||||
// If group is empty or CreateTime is after expireTime, can't delete this key
|
|
||||||
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
|
|
||||||
needDelKey = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key)
|
||||||
// If this object is not referenced by not expire data, delete it
|
|
||||||
if needDelKey && t.minio != nil {
|
|
||||||
// If have a thumbnail, delete it
|
|
||||||
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
|
|
||||||
if thumbnailKey != "" {
|
|
||||||
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
|
|
||||||
if err != nil {
|
|
||||||
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete object
|
|
||||||
err = t.s3dataBase.DeleteObject(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete cache key
|
|
||||||
err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
|
|
||||||
if err != nil {
|
|
||||||
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle delete data in S3 database
|
|
||||||
for _, model := range models {
|
|
||||||
// Delete all expired data row in S3 database
|
|
||||||
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.Wrap(err)
|
return nil, err
|
||||||
|
}
|
||||||
|
log.ZDebug(ctx, "delete s3 object record", "index", i, "s3", obj, "count", count)
|
||||||
|
if count == 0 {
|
||||||
|
if err := t.s3.DeleteObject(ctx, obj.Key); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return &third.DeleteOutdatedDataResp{Count: int32(len(models))}, nil
|
||||||
log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)
|
|
||||||
|
|
||||||
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type FormDataMate struct {
|
type FormDataMate struct {
|
||||||
|
@ -44,7 +44,7 @@ type thirdServer struct {
|
|||||||
userRpcClient rpcclient.UserRpcClient
|
userRpcClient rpcclient.UserRpcClient
|
||||||
defaultExpire time.Duration
|
defaultExpire time.Duration
|
||||||
config *Config
|
config *Config
|
||||||
minio *minio.Minio
|
s3 s3.Interface
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -79,13 +79,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
|||||||
// Select the oss method according to the profile policy
|
// Select the oss method according to the profile policy
|
||||||
enable := config.RpcConfig.Object.Enable
|
enable := config.RpcConfig.Object.Enable
|
||||||
var (
|
var (
|
||||||
o s3.Interface
|
o s3.Interface
|
||||||
minioCli *minio.Minio
|
|
||||||
)
|
)
|
||||||
switch enable {
|
switch enable {
|
||||||
case "minio":
|
case "minio":
|
||||||
minioCli, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
|
o, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
|
||||||
o = minioCli
|
|
||||||
case "cos":
|
case "cos":
|
||||||
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
|
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
|
||||||
case "oss":
|
case "oss":
|
||||||
@ -105,15 +103,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
|||||||
s3dataBase: controller.NewS3Database(rdb, o, s3db),
|
s3dataBase: controller.NewS3Database(rdb, o, s3db),
|
||||||
defaultExpire: time.Hour * 24 * 7,
|
defaultExpire: time.Hour * 24 * 7,
|
||||||
config: config,
|
config: config,
|
||||||
minio: minioCli,
|
s3: o,
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *thirdServer) getMinioImageThumbnailKey(ctx context.Context, name string) (string, error) {
|
|
||||||
return t.minio.GetImageThumbnailKey(ctx, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
|
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
|
||||||
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
|
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -16,10 +16,6 @@ package tools
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
@ -69,87 +65,58 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
msgClient := msg.NewMsgClient(msgConn)
|
srv := &cronServer{
|
||||||
conversationClient := pbconversation.NewConversationClient(conversationConn)
|
ctx: ctx,
|
||||||
thirdClient := third.NewThirdClient(thirdConn)
|
config: config,
|
||||||
|
cron: cron.New(),
|
||||||
crontab := cron.New()
|
msgClient: msg.NewMsgClient(msgConn),
|
||||||
|
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
||||||
// scheduled hard delete outdated Msgs in specific time.
|
thirdClient: third.NewThirdClient(thirdConn),
|
||||||
destructMsgsFunc := func() {
|
|
||||||
now := time.Now()
|
|
||||||
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
|
|
||||||
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
|
|
||||||
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
|
||||||
|
|
||||||
if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
|
|
||||||
log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
|
|
||||||
}
|
|
||||||
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
|
if err := srv.registerClearS3(); err != nil {
|
||||||
clearMsgFunc := func() {
|
return err
|
||||||
now := time.Now()
|
|
||||||
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
|
|
||||||
log.ZDebug(ctx, "clear msg cron start", "now", now)
|
|
||||||
|
|
||||||
conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "Clear Msg failed.", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
|
|
||||||
}
|
}
|
||||||
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
|
if err := srv.registerDeleteMsg(); err != nil {
|
||||||
return errs.Wrap(err)
|
return err
|
||||||
}
|
}
|
||||||
|
if err := srv.registerClearUserMsg(); err != nil {
|
||||||
// scheduled delete outdated file Objects and their datas in specific time.
|
return err
|
||||||
deleteObjectFunc := func() {
|
|
||||||
now := time.Now()
|
|
||||||
executeNum := 5
|
|
||||||
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
|
|
||||||
pageShowNumber := 500
|
|
||||||
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
|
|
||||||
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
|
|
||||||
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
|
||||||
|
|
||||||
if len(config.CronTask.DeleteObjectType) == 0 {
|
|
||||||
log.ZDebug(ctx, "cron deleteoutDatedData not type need delete", "deletetime", deleteTime, "DeleteObjectType", config.CronTask.DeleteObjectType, "cont", time.Since(now))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < executeNum; i++ {
|
|
||||||
resp, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: config.CronTask.DeleteObjectType})
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if resp.Count == 0 || resp.Count < int32(pageShowNumber) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
|
|
||||||
}
|
}
|
||||||
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
|
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
|
||||||
crontab.Start()
|
srv.cron.Start()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type cronServer struct {
|
||||||
|
ctx context.Context
|
||||||
|
config *CronTaskConfig
|
||||||
|
cron *cron.Cron
|
||||||
|
msgClient msg.MsgClient
|
||||||
|
conversationClient pbconversation.ConversationClient
|
||||||
|
thirdClient third.ThirdClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cronServer) registerClearS3() error {
|
||||||
|
if c.config.CronTask.FileExpireTime <= 0 || len(c.config.CronTask.DeleteObjectType) == 0 {
|
||||||
|
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3)
|
||||||
|
return errs.WrapMsg(err, "failed to register clear s3 cron task")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cronServer) registerDeleteMsg() error {
|
||||||
|
if c.config.CronTask.RetainChatRecords <= 0 {
|
||||||
|
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg)
|
||||||
|
return errs.WrapMsg(err, "failed to register delete msg cron task")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cronServer) registerClearUserMsg() error {
|
||||||
|
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
|
||||||
|
return errs.WrapMsg(err, "failed to register clear user msg cron task")
|
||||||
|
}
|
||||||
|
63
internal/tools/cron_test.go
Normal file
63
internal/tools/cron_test.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
|
"github.com/openimsdk/protocol/msg"
|
||||||
|
"github.com/openimsdk/protocol/third"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"github.com/openimsdk/tools/mw"
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestName(t *testing.T) {
|
||||||
|
conf := &config.Discovery{
|
||||||
|
Enable: config.ETCD,
|
||||||
|
Etcd: config.Etcd{
|
||||||
|
RootDirectory: "openim",
|
||||||
|
Address: []string{"localhost:12379"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
client, err := kdisc.NewDiscoveryRegister(conf, "source")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
ctx := mcontext.SetOpUserID(context.Background(), "imAdmin")
|
||||||
|
msgConn, err := client.GetConn(ctx, "msg-rpc-service")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
thirdConn, err := client.GetConn(ctx, "third-rpc-service")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
conversationConn, err := client.GetConn(ctx, "conversation-rpc-service")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := &cronServer{
|
||||||
|
ctx: ctx,
|
||||||
|
config: &CronTaskConfig{
|
||||||
|
CronTask: config.CronTask{
|
||||||
|
RetainChatRecords: 1,
|
||||||
|
FileExpireTime: 1,
|
||||||
|
DeleteObjectType: []string{"msg-picture", "msg-file", "msg-voice", "msg-video", "msg-video-snapshot", "sdklog", ""},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
cron: cron.New(),
|
||||||
|
msgClient: msg.NewMsgClient(msgConn),
|
||||||
|
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
||||||
|
thirdClient: third.NewThirdClient(thirdConn),
|
||||||
|
}
|
||||||
|
srv.deleteMsg()
|
||||||
|
//srv.clearS3()
|
||||||
|
//srv.clearUserMsg()
|
||||||
|
}
|
36
internal/tools/msg.go
Normal file
36
internal/tools/msg.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/openimsdk/protocol/msg"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *cronServer) deleteMsg() {
|
||||||
|
now := time.Now()
|
||||||
|
deltime := now.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.RetainChatRecords))
|
||||||
|
operationID := fmt.Sprintf("cron_msg_%d_%d", os.Getpid(), deltime.UnixMilli())
|
||||||
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
|
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
||||||
|
const (
|
||||||
|
deleteCount = 10000
|
||||||
|
deleteLimit = 50
|
||||||
|
)
|
||||||
|
var count int
|
||||||
|
for i := 1; i <= deleteCount; i++ {
|
||||||
|
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
|
||||||
|
resp, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli(), Limit: deleteLimit})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "cron destruct chat records failed", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
count += int(resp.Count)
|
||||||
|
if resp.Count < deleteLimit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "count", count)
|
||||||
|
}
|
79
internal/tools/s3.go
Normal file
79
internal/tools/s3.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/openimsdk/protocol/third"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *cronServer) clearS3() {
|
||||||
|
start := time.Now()
|
||||||
|
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
|
||||||
|
operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli())
|
||||||
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
|
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
||||||
|
const (
|
||||||
|
deleteCount = 10000
|
||||||
|
deleteLimit = 100
|
||||||
|
)
|
||||||
|
|
||||||
|
var count int
|
||||||
|
for i := 1; i <= deleteCount; i++ {
|
||||||
|
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: deleteLimit})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "cron deleteoutDatedData failed", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
count += int(resp.Count)
|
||||||
|
if resp.Count < deleteLimit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// var req *third.DeleteOutdatedDataReq
|
||||||
|
// count1, err := ExtractField(ctx, c.thirdClient.DeleteOutdatedData, req, (*third.DeleteOutdatedDataResp).GetCount)
|
||||||
|
//
|
||||||
|
// c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{})
|
||||||
|
// msggateway.GetUsersOnlineStatusCaller.Invoke(ctx, &msggateway.GetUsersOnlineStatusReq{})
|
||||||
|
//
|
||||||
|
// var cli ThirdClient
|
||||||
|
//
|
||||||
|
// c111, err := cli.DeleteOutdatedData(ctx, 100)
|
||||||
|
//
|
||||||
|
// cli.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{})
|
||||||
|
//
|
||||||
|
// cli.AuthSign(ctx, &third.AuthSignReq{})
|
||||||
|
//
|
||||||
|
// cli.SetAppBadge()
|
||||||
|
//
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
|
||||||
|
// resp, err := fn(ctx, req)
|
||||||
|
// if err != nil {
|
||||||
|
// var c C
|
||||||
|
// return c, err
|
||||||
|
// }
|
||||||
|
// return get(resp), nil
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func ignore(_ any, err error) error {
|
||||||
|
// return err
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//type ThirdClient struct {
|
||||||
|
// third.ThirdClient
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (c *ThirdClient) DeleteOutdatedData(ctx context.Context, expireTime int64) (int32, error) {
|
||||||
|
// return extractField(ctx, c.ThirdClient.DeleteOutdatedData, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}, (*third.DeleteOutdatedDataResp).GetCount)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (c *ThirdClient) DeleteOutdatedData1(ctx context.Context, expireTime int64) error {
|
||||||
|
// return ignore(c.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}))
|
||||||
|
//}
|
34
internal/tools/user_msg.go
Normal file
34
internal/tools/user_msg.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *cronServer) clearUserMsg() {
|
||||||
|
now := time.Now()
|
||||||
|
operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
|
||||||
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
|
log.ZDebug(ctx, "clear user msg cron start")
|
||||||
|
const (
|
||||||
|
deleteCount = 10000
|
||||||
|
deleteLimit = 100
|
||||||
|
)
|
||||||
|
var count int
|
||||||
|
for i := 1; i <= deleteCount; i++ {
|
||||||
|
resp, err := c.conversationClient.ClearUserConversationMsg(ctx, &pbconversation.ClearUserConversationMsgReq{Timestamp: now.UnixMilli(), Limit: deleteLimit})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "ClearUserConversationMsg failed.", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
count += int(resp.Count)
|
||||||
|
if resp.Count < deleteLimit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.ZDebug(ctx, "clear user msg cron task completed", "cont", time.Since(now), "count", count)
|
||||||
|
}
|
@ -74,6 +74,8 @@ type ConversationDatabase interface {
|
|||||||
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
|
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||||
// GetPinnedConversationIDs gets pinned conversationIDs by userID
|
// GetPinnedConversationIDs gets pinned conversationIDs by userID
|
||||||
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||||
|
// FindRandConversation finds random conversations based on the specified timestamp and limit.
|
||||||
|
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||||
@ -401,3 +403,7 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
|
|||||||
}
|
}
|
||||||
return conversationIDs, nil
|
return conversationIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
|
||||||
|
return c.conversationDB.FindRandConversation(ctx, ts, limit)
|
||||||
|
}
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
@ -69,6 +68,7 @@ type CommonMsgDatabase interface {
|
|||||||
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
|
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
|
||||||
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
|
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
|
||||||
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
|
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
|
||||||
|
SetMinSeq(ctx context.Context, conversationID string, seq int64) error
|
||||||
|
|
||||||
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
|
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
|
||||||
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||||||
@ -95,13 +95,16 @@ type CommonMsgDatabase interface {
|
|||||||
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
||||||
|
|
||||||
// get Msg when destruct msg before
|
// get Msg when destruct msg before
|
||||||
GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
|
//DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
|
||||||
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
|
|
||||||
|
|
||||||
GetDocIDs(ctx context.Context) ([]string, error)
|
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
||||||
|
|
||||||
SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
|
||||||
|
DeleteDoc(ctx context.Context, docID string) error
|
||||||
|
|
||||||
|
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
||||||
@ -806,9 +809,10 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin
|
|||||||
return db.seqConversation.GetMaxSeq(ctx, conversationID)
|
return db.seqConversation.GetMaxSeq(ctx, conversationID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
|
//
|
||||||
return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
|
//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
|
||||||
}
|
// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
|
||||||
|
//}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
|
func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
|
||||||
return db.seqConversation.SetMinSeqs(ctx, seqs)
|
return db.seqConversation.SetMinSeqs(ctx, seqs)
|
||||||
@ -947,56 +951,40 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation
|
|||||||
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
|
func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
|
||||||
var msgs []*model.MsgDocModel
|
return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit)
|
||||||
for i := 0; i < len(docIDs); i += 1000 {
|
|
||||||
end := i + 1000
|
|
||||||
if end > len(docIDs) {
|
|
||||||
end = len(docIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
msgs = append(msgs, res...)
|
|
||||||
|
|
||||||
if len(msgs) >= limit {
|
|
||||||
return msgs[:limit], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return msgs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
|
//
|
||||||
var notNull int
|
//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
|
||||||
index := make([]int, 0, len(doc.Msg))
|
// var notNull int
|
||||||
for i, message := range doc.Msg {
|
// index := make([]int, 0, len(doc.Msg))
|
||||||
if message.Msg != nil {
|
// for i, message := range doc.Msg {
|
||||||
notNull++
|
// if message.Msg != nil {
|
||||||
if message.Msg.SendTime < ts {
|
// notNull++
|
||||||
index = append(index, i)
|
// if message.Msg.SendTime < ts {
|
||||||
}
|
// index = append(index, i)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
if len(index) == 0 {
|
// }
|
||||||
return index, nil
|
// if len(index) == 0 {
|
||||||
}
|
// return index, nil
|
||||||
maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
|
// }
|
||||||
conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
|
// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
|
||||||
if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil {
|
// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
|
||||||
return index, err
|
// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
|
||||||
}
|
// return index, err
|
||||||
if len(index) == notNull {
|
// }
|
||||||
log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
|
// if len(index) == notNull {
|
||||||
return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
|
// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
|
||||||
} else {
|
// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
|
||||||
log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
|
// } else {
|
||||||
return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
|
// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
|
||||||
}
|
// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
|
||||||
}
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error {
|
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
|
||||||
dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
|
dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(errs.Unwrap(err), redis.Nil) {
|
if errors.Is(errs.Unwrap(err), redis.Nil) {
|
||||||
@ -1010,8 +998,8 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
|
|||||||
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
|
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) {
|
func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
|
||||||
return db.msgDocDatabase.GetDocIDs(ctx)
|
return db.msgDocDatabase.GetRandDocIDs(ctx, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
|
func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
|
||||||
@ -1026,3 +1014,11 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio
|
|||||||
// todo: only the time in the redis cache will be taken, not the message time
|
// todo: only the time in the redis cache will be taken, not the message time
|
||||||
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
|
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
|
||||||
|
return db.msgDocDatabase.DeleteDoc(ctx, docID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
|
||||||
|
return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time)
|
||||||
|
}
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
"github.com/openimsdk/tools/db/pagination"
|
|
||||||
"github.com/openimsdk/tools/s3"
|
"github.com/openimsdk/tools/s3"
|
||||||
"github.com/openimsdk/tools/s3/cont"
|
"github.com/openimsdk/tools/s3/cont"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
@ -40,11 +39,10 @@ type S3Database interface {
|
|||||||
SetObject(ctx context.Context, info *model.Object) error
|
SetObject(ctx context.Context, info *model.Object) error
|
||||||
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
|
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
|
||||||
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
|
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
|
||||||
FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
|
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
|
||||||
DeleteObject(ctx context.Context, name string) error
|
DeleteSpecifiedData(ctx context.Context, engine string, name []string) error
|
||||||
DeleteSpecifiedData(ctx context.Context, engine string, name string) error
|
|
||||||
FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error)
|
|
||||||
DelS3Key(ctx context.Context, engine string, keys ...string) error
|
DelS3Key(ctx context.Context, engine string, keys ...string) error
|
||||||
|
GetKeyCount(ctx context.Context, engine string, key string) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
|
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database {
|
||||||
@ -120,21 +118,19 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
|
|||||||
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
|
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
|
||||||
return s.s3.FormData(ctx, name, size, contentType, duration)
|
return s.s3.FormData(ctx, name, size, contentType, duration)
|
||||||
}
|
}
|
||||||
func (s *s3Database) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
|
|
||||||
return s.db.FindNeedDeleteObjectByDB(ctx, duration, needDelType, pagination)
|
func (s *s3Database) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) {
|
||||||
|
return s.db.FindExpirationObject(ctx, engine, expiration, needDelType, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
|
func (s *s3Database) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) {
|
||||||
return s.s3.DeleteObject(ctx, name)
|
return s.db.GetKeyCount(ctx, engine, key)
|
||||||
}
|
}
|
||||||
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error {
|
|
||||||
|
func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error {
|
||||||
return s.db.Delete(ctx, engine, name)
|
return s.db.Delete(ctx, engine, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
|
|
||||||
return s.db.FindModelsByKey(ctx, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {
|
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {
|
||||||
return s.s3cache.DelS3Key(ctx, engine, keys...)
|
return s.s3cache.DelS3Key(ctx, engine, keys...)
|
||||||
}
|
}
|
||||||
|
@ -42,4 +42,5 @@ type Conversation interface {
|
|||||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
||||||
|
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
|
||||||
}
|
}
|
||||||
|
@ -228,3 +228,35 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co
|
|||||||
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
|
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
|
||||||
return c.version.FindChangeLog(ctx, userID, version, limit)
|
return c.version.FindChangeLog(ctx, userID, version, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) {
|
||||||
|
pipeline := []bson.M{
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"is_msg_destruct": true,
|
||||||
|
"msg_destruct_time": bson.M{"$ne": 0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$addFields": bson.M{
|
||||||
|
"next_msg_destruct_timestamp": bson.M{
|
||||||
|
"$add": []any{
|
||||||
|
bson.M{
|
||||||
|
"$toLong": "$latest_msg_destruct_time",
|
||||||
|
}, "$msg_destruct_time"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"next_msg_destruct_timestamp": bson.M{"$lt": ts},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$sample": bson.M{
|
||||||
|
"size": limit,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
|
||||||
|
}
|
||||||
|
@ -1227,8 +1227,7 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) {
|
func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
|
||||||
limit := 5000
|
|
||||||
var skip int
|
var skip int
|
||||||
var docIDs []string
|
var docIDs []string
|
||||||
var offset int
|
var offset int
|
||||||
@ -1267,15 +1266,18 @@ func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) {
|
|||||||
return docIDs, errs.Wrap(err)
|
return docIDs, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
|
func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
|
||||||
return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
|
return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
|
||||||
{
|
{
|
||||||
"$match": bson.M{
|
"$match": bson.M{
|
||||||
"doc_id": bson.M{
|
"msgs": bson.M{
|
||||||
"$in": docIDs,
|
"$not": bson.M{
|
||||||
},
|
"$elemMatch": bson.M{
|
||||||
"msgs.msg.send_time": bson.M{
|
"msg.send_time": bson.M{
|
||||||
"$lt": ts,
|
"$gt": ts,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1288,7 +1290,9 @@ func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, li
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"$limit": limit,
|
"$sample": bson.M{
|
||||||
|
"size": limit,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1305,53 +1309,58 @@ func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int
|
|||||||
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
|
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (m *MsgMgo) ClearMsg(ctx context.Context, t time.Time) (int64, error) {
|
|
||||||
// ts := t.UnixMilli()
|
|
||||||
// var count int64
|
|
||||||
// for {
|
|
||||||
// msgs, err := m.GetBeforeMsg(ctx, ts, 100)
|
|
||||||
// if err != nil {
|
|
||||||
// return count, err
|
|
||||||
// }
|
|
||||||
// if len(msgs) == 0 {
|
|
||||||
// return count, nil
|
|
||||||
// }
|
|
||||||
// for _, msg := range msgs {
|
|
||||||
// num, err := m.deleteOneMsg(ctx, ts, msg)
|
|
||||||
// count += num
|
|
||||||
// if err != nil {
|
|
||||||
// return count, err
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
|
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
|
||||||
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
|
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (m *MsgMgo) DeleteDocMsg(ctx context.Context, ts int64, doc *relation.MsgDocModel) (int64, error) {
|
func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
|
||||||
// var notNull int
|
pipeline := []bson.M{
|
||||||
// index := make([]int, 0, len(doc.Msg))
|
{
|
||||||
// for i, message := range doc.Msg {
|
"$match": bson.M{
|
||||||
// if message.Msg != nil {
|
"doc_id": bson.M{
|
||||||
// notNull++
|
"$regex": fmt.Sprintf("^%s", conversationID),
|
||||||
// if message.Msg.SendTime < ts {
|
},
|
||||||
// index = append(index, i)
|
},
|
||||||
// }
|
},
|
||||||
// }
|
{
|
||||||
// }
|
"$match": bson.M{
|
||||||
// if len(index) == 0 {
|
"msgs.msg.send_time": bson.M{
|
||||||
// return 0, errs.New("no msg to delete").WrapMsg("deleteOneMsg", "docID", doc.DocID)
|
"$lte": time,
|
||||||
// }
|
},
|
||||||
// if len(index) == notNull {
|
},
|
||||||
// if err := m.DeleteDoc(ctx, doc.DocID); err != nil {
|
},
|
||||||
// return 0, err
|
{
|
||||||
// }
|
"$sort": bson.M{
|
||||||
// } else {
|
"_id": -1,
|
||||||
// if err := m.setNullMsg(ctx, doc.DocID, index); err != nil {
|
},
|
||||||
// return 0, err
|
},
|
||||||
// }
|
{
|
||||||
// }
|
"$limit": 1,
|
||||||
// return int64(len(index)), nil
|
},
|
||||||
//}
|
{
|
||||||
|
"$project": bson.M{
|
||||||
|
"_id": 0,
|
||||||
|
"doc_id": 1,
|
||||||
|
"msgs.msg.send_time": 1,
|
||||||
|
"msgs.msg.seq": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if len(res) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
var seq int64
|
||||||
|
for _, v := range res[0].Msg {
|
||||||
|
if v.Msg == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if v.Msg.SendTime <= time {
|
||||||
|
seq = v.Msg.Seq
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return seq, nil
|
||||||
|
}
|
||||||
|
@ -3,12 +3,11 @@ package mgo
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/protocol/msg"
|
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
|
||||||
"github.com/openimsdk/tools/db/mongoutil"
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
@ -16,35 +15,45 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestName1(t *testing.T) {
|
func TestName1(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
||||||
defer cancel()
|
//defer cancel()
|
||||||
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
//cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||||
|
//
|
||||||
v := &MsgMgo{
|
//v := &MsgMgo{
|
||||||
coll: cli.Database("openim_v3").Collection("msg3"),
|
// coll: cli.Database("openim_v3").Collection("msg3"),
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
req := &msg.SearchMessageReq{
|
//req := &msg.SearchMessageReq{
|
||||||
//RecvID: "3187706596",
|
// //RecvID: "3187706596",
|
||||||
//SendID: "7009965934",
|
// //SendID: "7009965934",
|
||||||
ContentType: 101,
|
// ContentType: 101,
|
||||||
//SendTime: "2024-05-06",
|
// //SendTime: "2024-05-06",
|
||||||
//SessionType: 3,
|
// //SessionType: 3,
|
||||||
Pagination: &sdkws.RequestPagination{
|
// Pagination: &sdkws.RequestPagination{
|
||||||
PageNumber: 1,
|
// PageNumber: 1,
|
||||||
ShowNumber: 10,
|
// ShowNumber: 10,
|
||||||
},
|
// },
|
||||||
}
|
//}
|
||||||
total, res, err := v.SearchMessage(ctx, req)
|
//total, res, err := v.SearchMessage(ctx, req)
|
||||||
if err != nil {
|
//if err != nil {
|
||||||
panic(err)
|
// panic(err)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
for i, re := range res {
|
//for i, re := range res {
|
||||||
t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content)
|
// t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content)
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
t.Log(total)
|
//t.Log(total)
|
||||||
|
//
|
||||||
|
//msg, err := NewMsgMongo(cli.Database("openim_v3"))
|
||||||
|
//if err != nil {
|
||||||
|
// panic(err)
|
||||||
|
//}
|
||||||
|
//res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000)
|
||||||
|
//if err != nil {
|
||||||
|
// panic(err)
|
||||||
|
//}
|
||||||
|
//t.Log(len(res))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestName10(t *testing.T) {
|
func TestName10(t *testing.T) {
|
||||||
@ -73,3 +82,33 @@ func TestName10(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestName3(t *testing.T) {
|
||||||
|
t.Log(uint64(math.MaxUint64))
|
||||||
|
t.Log(int64(math.MaxInt64))
|
||||||
|
|
||||||
|
t.Log(int64(math.MinInt64))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestName4(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
||||||
|
defer cancel()
|
||||||
|
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||||
|
|
||||||
|
msg, err := NewMsgMongo(cli.Database("openim_v3"))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli()
|
||||||
|
t.Log(ts)
|
||||||
|
res, err := msg.GetLastMessageSeqByTime(ctx, "sg_1523453548", ts)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
t.Log(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestName5(t *testing.T) {
|
||||||
|
var v time.Time
|
||||||
|
t.Log(v.UnixMilli())
|
||||||
|
}
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
|
||||||
"github.com/openimsdk/tools/db/mongoutil"
|
"github.com/openimsdk/tools/db/mongoutil"
|
||||||
"github.com/openimsdk/tools/db/pagination"
|
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
@ -91,21 +90,25 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
|
|||||||
return mongoutil.FindOne[*model.Object](ctx, o.coll, bson.M{"name": name, "engine": engine})
|
return mongoutil.FindOne[*model.Object](ctx, o.coll, bson.M{"name": name, "engine": engine})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
|
func (o *S3Mongo) Delete(ctx context.Context, engine string, name []string) error {
|
||||||
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
|
if len(name) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"engine": engine, "name": bson.M{"$in": name}})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find Expires object
|
func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) {
|
||||||
func (o *S3Mongo) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
|
opt := options.Find()
|
||||||
return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{
|
if count > 0 {
|
||||||
"create_time": bson.M{"$lt": duration},
|
opt.SetLimit(count)
|
||||||
"group": bson.M{"$in": needDelType},
|
}
|
||||||
}, pagination)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find object by key
|
|
||||||
func (o *S3Mongo) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
|
|
||||||
return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
|
return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
|
||||||
"key": key,
|
"engine": engine,
|
||||||
})
|
"create_time": bson.M{"$lt": expiration},
|
||||||
|
"group": bson.M{"$in": needDelType},
|
||||||
|
}, opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) {
|
||||||
|
return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key})
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,9 @@ type Msg interface {
|
|||||||
|
|
||||||
DeleteDoc(ctx context.Context, docID string) error
|
DeleteDoc(ctx context.Context, docID string) error
|
||||||
DeleteMsgByIndex(ctx context.Context, docID string, index []int) error
|
DeleteMsgByIndex(ctx context.Context, docID string, index []int) error
|
||||||
GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error)
|
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
||||||
|
|
||||||
GetDocIDs(ctx context.Context) ([]string, error)
|
GetRandDocIDs(ctx context.Context, limit int) ([]string, error)
|
||||||
|
|
||||||
|
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
||||||
}
|
}
|
||||||
|
@ -19,13 +19,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/tools/db/pagination"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ObjectInfo interface {
|
type ObjectInfo interface {
|
||||||
SetObject(ctx context.Context, obj *model.Object) error
|
SetObject(ctx context.Context, obj *model.Object) error
|
||||||
Take(ctx context.Context, engine string, name string) (*model.Object, error)
|
Take(ctx context.Context, engine string, name string) (*model.Object, error)
|
||||||
Delete(ctx context.Context, engine string, name string) error
|
Delete(ctx context.Context, engine string, name []string) error
|
||||||
FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
|
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
|
||||||
FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error)
|
GetKeyCount(ctx context.Context, engine string, key string) (int64, error)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user