mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
cron add log and fix cron
This commit is contained in:
parent
c42a47b6b9
commit
a23ecb1710
@ -1,6 +1,7 @@
|
|||||||
package tools
|
package tools
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
@ -9,12 +10,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c *MsgTool) ConversationsDestructMsgs() {
|
func (c *MsgTool) ConversationsDestructMsgs() {
|
||||||
|
log.ZInfo(context.Background(), "start msg destruct cron task")
|
||||||
ctx := mcontext.NewCtx(utils.GetSelfFuncName())
|
ctx := mcontext.NewCtx(utils.GetSelfFuncName())
|
||||||
conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
|
conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "get conversation id need destruct failed", err)
|
log.ZError(ctx, "get conversation id need destruct failed", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.ZDebug(context.Background(), "nums conversations need destruct", len(conversations))
|
||||||
for _, conversation := range conversations {
|
for _, conversation := range conversations {
|
||||||
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime)
|
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime)
|
||||||
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
|
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func StartCronTask() error {
|
func StartCronTask() error {
|
||||||
log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.ChatRecordsClearTime)
|
|
||||||
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
|
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
|
||||||
msgTool, err := InitMsgTool()
|
msgTool, err := InitMsgTool()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -20,11 +19,13 @@ func StartCronTask() error {
|
|||||||
c := cron.New()
|
c := cron.New()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
|
||||||
_, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
|
_, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime)
|
fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.ChatRecordsClearTime)
|
||||||
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
|
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
|
fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
|
||||||
|
@ -2,7 +2,6 @@ package tools
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
@ -17,9 +16,11 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
@ -32,8 +33,6 @@ type MsgTool struct {
|
|||||||
msgNotificationSender *notification.MsgNotificationSender
|
msgNotificationSender *notification.MsgNotificationSender
|
||||||
}
|
}
|
||||||
|
|
||||||
var errSeq = errors.New("cache max seq and mongo max seq is diff > 10")
|
|
||||||
|
|
||||||
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
|
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
|
||||||
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender) *MsgTool {
|
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender) *MsgTool {
|
||||||
return &MsgTool{
|
return &MsgTool{
|
||||||
@ -103,12 +102,12 @@ func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
|
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
|
||||||
maxSeqMongo, _, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
|
minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
|
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
|
||||||
return errSeq
|
log.ZError(ctx, "cache max seq and mongo max seq is diff > 10", nil, "maxSeqMongo", maxSeqMongo, "minSeqMongo", minSeqMongo, "maxSeqCache", maxSeqCache, "conversationID", conversationID)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -116,6 +115,9 @@ func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID strin
|
|||||||
func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error {
|
func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error {
|
||||||
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
|
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errs.Unwrap(err) == redis.Nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
|
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
|
||||||
|
@ -74,7 +74,7 @@ type CommonMsgDatabase interface {
|
|||||||
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
|
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
|
||||||
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
|
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
|
||||||
|
|
||||||
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
|
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
|
||||||
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
|
||||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||||
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||||
@ -843,7 +843,7 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) {
|
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
|
||||||
return db.GetMinMaxSeqMongo(ctx, conversationID)
|
return db.GetMinMaxSeqMongo(ctx, conversationID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user