mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-30 10:39:47 +08:00
refactoring scheduled tasks
This commit is contained in:
parent
705bc37f99
commit
8807597ffb
@ -1,7 +1,9 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/openimsdk/protocol/user"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@ -164,6 +166,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
|||||||
authRouterGroup.POST("/get_user_token", a.GetUserToken)
|
authRouterGroup.POST("/get_user_token", a.GetUserToken)
|
||||||
authRouterGroup.POST("/parse_token", a.ParseToken)
|
authRouterGroup.POST("/parse_token", a.ParseToken)
|
||||||
authRouterGroup.POST("/force_logout", a.ForceLogout)
|
authRouterGroup.POST("/force_logout", a.ForceLogout)
|
||||||
|
|
||||||
}
|
}
|
||||||
// Third service
|
// Third service
|
||||||
thirdGroup := r.Group("/third")
|
thirdGroup := r.Group("/third")
|
||||||
@ -297,3 +300,16 @@ var Whitelist = []string{
|
|||||||
"/auth/get_admin_token",
|
"/auth/get_admin_token",
|
||||||
"/auth/parse_token",
|
"/auth/parse_token",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
|
||||||
|
var uc user.UserClient
|
||||||
|
var g *gin.Engine
|
||||||
|
g.POST("/get_admin_token", New(uc, uc.AccountCheck))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func New[A, B, C any](c C, fn func(c C, ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error)) func(c *gin.Context) {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -763,3 +763,32 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *
|
|||||||
}
|
}
|
||||||
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
|
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) {
|
||||||
|
conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, conversation := range conversations {
|
||||||
|
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}
|
||||||
|
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resp.Seq == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err = c.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{
|
||||||
|
ConversationID: conversation.ConversationID,
|
||||||
|
OwnerUserID: []string{conversation.OwnerUserID},
|
||||||
|
MinSeq: resp.Seq + 1,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &pbconversation.ClearUserConversationMsgResp{}, nil
|
||||||
|
}
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
pbconv "github.com/openimsdk/protocol/conversation"
|
pbconv "github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/wrapperspb"
|
"github.com/openimsdk/protocol/wrapperspb"
|
||||||
"github.com/openimsdk/tools/errs"
|
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
@ -20,16 +19,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// hard delete in Database.
|
// hard delete in Database.
|
||||||
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
|
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
|
||||||
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if req.Timestamp > time.Now().UnixMilli() {
|
|
||||||
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
|
|
||||||
}
|
|
||||||
if req.Limit <= 0 {
|
|
||||||
return nil, errs.ErrArgs.WrapMsg("request limit error")
|
|
||||||
}
|
|
||||||
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
|
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -66,7 +59,7 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// soft delete for user self
|
// soft delete for user self
|
||||||
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
|
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
|
||||||
temp := convert.ConversationsPb2DB(req.Conversations)
|
temp := convert.ConversationsPb2DB(req.Conversations)
|
||||||
|
|
||||||
batchNum := 100
|
batchNum := 100
|
||||||
@ -128,3 +121,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
|
|||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
|
||||||
|
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil
|
||||||
|
}
|
||||||
|
@ -16,7 +16,7 @@ func (c *cronServer) deleteMsg() {
|
|||||||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
|
||||||
const (
|
const (
|
||||||
deleteCount = 20
|
deleteCount = 200
|
||||||
deleteLimit = 50
|
deleteLimit = 50
|
||||||
)
|
)
|
||||||
var count int
|
var count int
|
||||||
|
@ -11,25 +11,69 @@ import (
|
|||||||
|
|
||||||
func (c *cronServer) clearS3() {
|
func (c *cronServer) clearS3() {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
executeNum := 10
|
|
||||||
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
|
|
||||||
pageShowNumber := 500
|
|
||||||
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
|
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
|
||||||
operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli())
|
operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli())
|
||||||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
|
||||||
|
const (
|
||||||
|
deleteCount = 200
|
||||||
|
deleteLimit = 100
|
||||||
|
)
|
||||||
|
|
||||||
var count int
|
var count int
|
||||||
for i := 1; i <= executeNum; i++ {
|
for i := 1; i <= deleteCount; i++ {
|
||||||
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
|
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: deleteLimit})
|
||||||
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: int32(pageShowNumber)})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "cron deleteoutDatedData failed", err)
|
log.ZError(ctx, "cron deleteoutDatedData failed", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
count += int(resp.Count)
|
count += int(resp.Count)
|
||||||
if resp.Count < int32(pageShowNumber) {
|
if resp.Count < deleteLimit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count)
|
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// var req *third.DeleteOutdatedDataReq
|
||||||
|
// count1, err := ExtractField(ctx, c.thirdClient.DeleteOutdatedData, req, (*third.DeleteOutdatedDataResp).GetCount)
|
||||||
|
//
|
||||||
|
// c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{})
|
||||||
|
// msggateway.GetUsersOnlineStatusCaller.Invoke(ctx, &msggateway.GetUsersOnlineStatusReq{})
|
||||||
|
//
|
||||||
|
// var cli ThirdClient
|
||||||
|
//
|
||||||
|
// c111, err := cli.DeleteOutdatedData(ctx, 100)
|
||||||
|
//
|
||||||
|
// cli.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{})
|
||||||
|
//
|
||||||
|
// cli.AuthSign(ctx, &third.AuthSignReq{})
|
||||||
|
//
|
||||||
|
// cli.SetAppBadge()
|
||||||
|
//
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
|
||||||
|
// resp, err := fn(ctx, req)
|
||||||
|
// if err != nil {
|
||||||
|
// var c C
|
||||||
|
// return c, err
|
||||||
|
// }
|
||||||
|
// return get(resp), nil
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func ignore(_ any, err error) error {
|
||||||
|
// return err
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//type ThirdClient struct {
|
||||||
|
// third.ThirdClient
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (c *ThirdClient) DeleteOutdatedData(ctx context.Context, expireTime int64) (int32, error) {
|
||||||
|
// return extractField(ctx, c.ThirdClient.DeleteOutdatedData, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}, (*third.DeleteOutdatedDataResp).GetCount)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//func (c *ThirdClient) DeleteOutdatedData1(ctx context.Context, expireTime int64) error {
|
||||||
|
// return ignore(c.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}))
|
||||||
|
//}
|
||||||
|
@ -3,7 +3,6 @@ package tools
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/msg"
|
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"os"
|
"os"
|
||||||
@ -14,18 +13,22 @@ func (c *cronServer) clearUserMsg() {
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
|
operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
|
||||||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||||||
log.ZDebug(ctx, "clear msg cron start")
|
log.ZDebug(ctx, "clear user msg cron start")
|
||||||
conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
|
const (
|
||||||
if err != nil {
|
deleteCount = 200
|
||||||
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
|
deleteLimit = 100
|
||||||
return
|
)
|
||||||
|
var count int
|
||||||
|
for i := 1; i <= deleteCount; i++ {
|
||||||
|
resp, err := c.conversationClient.ClearUserConversationMsg(ctx, &pbconversation.ClearUserConversationMsgReq{Timestamp: now.UnixMilli(), Limit: deleteLimit})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "ClearUserConversationMsg failed.", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
count += int(resp.Count)
|
||||||
|
if resp.Count < deleteLimit {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
log.ZDebug(ctx, "clear user msg cron task completed", "cont", time.Since(now), "count", count)
|
||||||
_, err = c.msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "Clear Msg failed.", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
|
|
||||||
}
|
}
|
||||||
|
@ -74,6 +74,8 @@ type ConversationDatabase interface {
|
|||||||
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
|
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||||
// GetPinnedConversationIDs gets pinned conversationIDs by userID
|
// GetPinnedConversationIDs gets pinned conversationIDs by userID
|
||||||
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
|
||||||
|
// FindRandConversation finds random conversations based on the specified timestamp and limit.
|
||||||
|
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
|
||||||
@ -401,3 +403,7 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
|
|||||||
}
|
}
|
||||||
return conversationIDs, nil
|
return conversationIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
|
||||||
|
return c.conversationDB.FindRandConversation(ctx, ts, limit)
|
||||||
|
}
|
||||||
|
@ -103,6 +103,8 @@ type CommonMsgDatabase interface {
|
|||||||
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
|
||||||
|
|
||||||
DeleteDoc(ctx context.Context, docID string) error
|
DeleteDoc(ctx context.Context, docID string) error
|
||||||
|
|
||||||
|
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
||||||
@ -1016,3 +1018,7 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio
|
|||||||
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
|
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
|
||||||
return db.msgDocDatabase.DeleteDoc(ctx, docID)
|
return db.msgDocDatabase.DeleteDoc(ctx, docID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
|
||||||
|
return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time)
|
||||||
|
}
|
||||||
|
@ -42,4 +42,5 @@ type Conversation interface {
|
|||||||
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
|
||||||
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
|
||||||
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
|
||||||
|
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
|
||||||
}
|
}
|
||||||
|
@ -228,3 +228,35 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co
|
|||||||
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
|
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
|
||||||
return c.version.FindChangeLog(ctx, userID, version, limit)
|
return c.version.FindChangeLog(ctx, userID, version, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) {
|
||||||
|
pipeline := []bson.M{
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"is_msg_destruct": true,
|
||||||
|
"msg_destruct_time": bson.M{"$ne": 0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$addFields": bson.M{
|
||||||
|
"next_msg_destruct_timestamp": bson.M{
|
||||||
|
"$add": []any{
|
||||||
|
bson.M{
|
||||||
|
"$toLong": "$latest_msg_destruct_time",
|
||||||
|
}, "$msg_destruct_time"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$match": bson.M{
|
||||||
|
"next_msg_destruct_timestamp": bson.M{"$lt": ts},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$sample": bson.M{
|
||||||
|
"size": limit,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
|
||||||
|
}
|
||||||
|
@ -1290,7 +1290,9 @@ func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"$sample": limit,
|
"$sample": bson.M{
|
||||||
|
"size": limit,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1307,53 +1309,56 @@ func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int
|
|||||||
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
|
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (m *MsgMgo) ClearMsg(ctx context.Context, t time.Time) (int64, error) {
|
|
||||||
// ts := t.UnixMilli()
|
|
||||||
// var count int64
|
|
||||||
// for {
|
|
||||||
// msgs, err := m.GetBeforeMsg(ctx, ts, 100)
|
|
||||||
// if err != nil {
|
|
||||||
// return count, err
|
|
||||||
// }
|
|
||||||
// if len(msgs) == 0 {
|
|
||||||
// return count, nil
|
|
||||||
// }
|
|
||||||
// for _, msg := range msgs {
|
|
||||||
// num, err := m.deleteOneMsg(ctx, ts, msg)
|
|
||||||
// count += num
|
|
||||||
// if err != nil {
|
|
||||||
// return count, err
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
|
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
|
||||||
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
|
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (m *MsgMgo) DeleteDocMsg(ctx context.Context, ts int64, doc *relation.MsgDocModel) (int64, error) {
|
func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
|
||||||
// var notNull int
|
pipeline := []bson.M{
|
||||||
// index := make([]int, 0, len(doc.Msg))
|
{
|
||||||
// for i, message := range doc.Msg {
|
"$match": bson.M{
|
||||||
// if message.Msg != nil {
|
"doc_id": bson.M{
|
||||||
// notNull++
|
"$regex": fmt.Sprintf("^%s:", conversationID),
|
||||||
// if message.Msg.SendTime < ts {
|
},
|
||||||
// index = append(index, i)
|
},
|
||||||
// }
|
},
|
||||||
// }
|
{
|
||||||
// }
|
"$match": bson.M{
|
||||||
// if len(index) == 0 {
|
"msgs.msg.send_time": bson.M{
|
||||||
// return 0, errs.New("no msg to delete").WrapMsg("deleteOneMsg", "docID", doc.DocID)
|
"msgs.msg.send_time": bson.M{
|
||||||
// }
|
"$lte": time,
|
||||||
// if len(index) == notNull {
|
},
|
||||||
// if err := m.DeleteDoc(ctx, doc.DocID); err != nil {
|
},
|
||||||
// return 0, err
|
},
|
||||||
// }
|
},
|
||||||
// } else {
|
{
|
||||||
// if err := m.setNullMsg(ctx, doc.DocID, index); err != nil {
|
"$sort": bson.M{
|
||||||
// return 0, err
|
"_id": -1,
|
||||||
// }
|
},
|
||||||
// }
|
},
|
||||||
// return int64(len(index)), nil
|
{
|
||||||
//}
|
"$limit": 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$project": bson.M{
|
||||||
|
"_id": 0,
|
||||||
|
"msgs.msg.send_time": 1,
|
||||||
|
"msgs.msg.seq": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if len(res) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
var seq int64
|
||||||
|
for _, v := range res[0].Msg {
|
||||||
|
if v.Msg.SendTime <= time {
|
||||||
|
seq = v.Msg.Seq
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return seq, nil
|
||||||
|
}
|
||||||
|
@ -15,10 +15,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestName1(t *testing.T) {
|
func TestName1(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
|
||||||
defer cancel()
|
//defer cancel()
|
||||||
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
//cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||||
|
//
|
||||||
//v := &MsgMgo{
|
//v := &MsgMgo{
|
||||||
// coll: cli.Database("openim_v3").Collection("msg3"),
|
// coll: cli.Database("openim_v3").Collection("msg3"),
|
||||||
//}
|
//}
|
||||||
@ -44,16 +44,16 @@ func TestName1(t *testing.T) {
|
|||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//t.Log(total)
|
//t.Log(total)
|
||||||
|
//
|
||||||
msg, err := NewMsgMongo(cli.Database("openim_v3"))
|
//msg, err := NewMsgMongo(cli.Database("openim_v3"))
|
||||||
if err != nil {
|
//if err != nil {
|
||||||
panic(err)
|
// panic(err)
|
||||||
}
|
//}
|
||||||
res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000)
|
//res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000)
|
||||||
if err != nil {
|
//if err != nil {
|
||||||
panic(err)
|
// panic(err)
|
||||||
}
|
//}
|
||||||
t.Log(len(res))
|
//t.Log(len(res))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestName10(t *testing.T) {
|
func TestName10(t *testing.T) {
|
||||||
@ -95,13 +95,20 @@ func TestName4(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||||
|
|
||||||
msg, err := NewMsgMongo(cli.Database("openim_v3"))
|
msg, err := NewConversationMongo(cli.Database("openim_v3"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000)
|
ts := time.Now().UnixMilli()
|
||||||
|
t.Log(ts)
|
||||||
|
res, err := msg.FindRandConversation(ctx, ts, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
t.Log(len(res))
|
t.Log(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestName5(t *testing.T) {
|
||||||
|
var v time.Time
|
||||||
|
t.Log(v.UnixMilli())
|
||||||
}
|
}
|
||||||
|
@ -48,4 +48,6 @@ type Msg interface {
|
|||||||
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
||||||
|
|
||||||
GetRandDocIDs(ctx context.Context, limit int) ([]string, error)
|
GetRandDocIDs(ctx context.Context, limit int) ([]string, error)
|
||||||
|
|
||||||
|
GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user