Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-05-18 16:42:19 +08:00
commit c53daa76f2
10 changed files with 82 additions and 174 deletions

View File

@ -136,7 +136,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
return nil, err
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConvetstionID(conversationID))
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)

View File

@ -2,13 +2,13 @@ package tools
import (
"fmt"
"sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/robfig/cron/v3"
"sync"
)
const cronTaskOperationID = "cronTaskOperationID-"
const moduleName = "cron"
func StartCronTask() error {
@ -22,7 +22,7 @@ func StartCronTask() error {
c := cron.New()
var wg sync.WaitGroup
wg.Add(1)
_, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllUserClearMsgAndFixSeq)
_, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
if err != nil {
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
return err

View File

@ -7,7 +7,6 @@ import (
"math"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
@ -16,11 +15,11 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8"
)
type MsgTool struct {
msgDatabase controller.CommonMsgDatabase
conversationDatabase controller.ConversationDataBase
userDatabase controller.UserDatabase
groupDatabase controller.GroupDatabase
}
@ -56,185 +55,76 @@ func InitMsgTool() (*MsgTool, error) {
return msgTool, nil
}
func (c *MsgTool) AllUserClearMsgAndFixSeq() {
func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
ctx := mcontext.NewCtx(utils.GetSelfFuncName())
log.ZInfo(ctx, "============================ start del cron task ============================")
var err error
userIDs, err := c.userDatabase.GetAllUserID(ctx)
if err == nil {
c.ClearUsersMsg(ctx, userIDs)
} else {
log.ZError(ctx, "ClearUsersMsg failed", err)
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDs failed", err)
return
}
// working group msg clear
superGroupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err == nil {
c.ClearSuperGroupMsg(ctx, superGroupIDs)
} else {
log.ZError(ctx, "ClearSuperGroupMsg failed", err)
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
c.ClearConversationsMsg(ctx, conversationIDs)
log.ZInfo(ctx, "============================ start del cron finished ============================")
}
func (c *MsgTool) ClearUsersMsg(ctx context.Context, userIDs []string) {
for _, userID := range userIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserMsgsAndSetMinSeq failed", err, "userID", userID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
}
maxSeqCache, maxSeqMongo, err := c.GetAndFixUserSeqs(ctx, userID)
if err != nil {
continue
if err := c.fixAndCheckSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo)
}
}
func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string) {
for _, groupID := range superGroupIDs {
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
if err != nil {
log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID)
continue
}
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
}
if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "fixGroupSeq failed", err, "groupID", groupID, "userID", userIDs)
}
}
}
func (c *MsgTool) FixGroupSeq(ctx context.Context, groupID string) error {
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
maxSeqMongo, _, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
if err != nil {
return err
}
return c.fixGroupSeq(ctx, groupID, userIDs)
}
func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error {
_, maxSeqMongo, _, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, groupID)
if err != nil {
if err == unrelation.ErrMsgNotFound {
return nil
}
return err
}
for _, userID := range userIDs {
if _, err := c.GetAndFixGroupUserSeq(ctx, userID, groupID, maxSeqCache); err != nil {
continue
}
}
if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo); err != nil {
log.ZWarn(ctx, "cache max seq and mongo max seq is diff > 10", err, "groupID", groupID, "maxSeqCache", maxSeqCache, "maxSeqMongo", maxSeqMongo, "constant.WriteDiffusion", constant.WriteDiffusion)
}
return nil
}
func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) {
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, userID)
if err != nil {
if err != unrelation.ErrMsgNotFound {
log.ZError(ctx, "GetUserMinMaxSeqInMongoAndCache failed", err, "userID", userID)
}
return 0, 0, err
}
log.ZDebug(ctx, "userID", userID, "minSeqMongo", minSeqMongo, "maxSeqMongo", maxSeqMongo, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
if minSeqCache > maxSeqCache {
if err := c.msgDatabase.SetMinSeq(ctx, userID, maxSeqCache); err != nil {
log.ZError(ctx, "SetUserMinSeq failed", err, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
} else {
log.ZInfo(ctx, "SetUserMinSeq success", "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
}
}
return maxSeqCache, maxSeqMongo, nil
}
func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) {
minSeqCache, err = c.msgDatabase.GetMinSeq(ctx, groupID)
if err != nil {
log.ZError(ctx, "GetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID)
return 0, err
}
if minSeqCache > maxSeqCache {
if err := c.msgDatabase.SetConversationUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil {
log.ZError(ctx, "SetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
} else {
log.ZInfo(ctx, "SetGroupUserMinSeq success", "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
}
}
return minSeqCache, nil
}
func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache, maxSeqMongo int64) error {
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
return errSeq
}
return nil
}
func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) {
func (c *MsgTool) fixAndCheckSeq(ctx context.Context, conversationID string) error {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
if err != nil {
return err
}
func (c *MsgTool) ShowSuperGroupSeqs(ctx context.Context, groupID string) {
minSeq, err := c.msgDatabase.GetMinSeq(ctx, conversationID)
if err != nil {
return err
}
func (c *MsgTool) ShowSuperGroupUserSeqs(ctx context.Context, groupID, userID string) {
if minSeq > maxSeq {
if err = c.msgDatabase.SetMinSeq(ctx, conversationID, maxSeq); err != nil {
return err
}
}
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
return err
}
return nil
}
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
userIDs, err := c.userDatabase.GetAllUserID(ctx)
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDs failed", err)
return err
}
for _, userID := range userIDs {
userCurrentMinSeq, err := c.msgDatabase.GetMinSeq(ctx, userID)
if err != nil && err != redis.Nil {
continue
}
userCurrentMaxSeq, err := c.msgDatabase.GetMaxSeq(ctx, userID)
if err != nil && err != redis.Nil {
continue
}
if userCurrentMinSeq > userCurrentMaxSeq {
if err = c.msgDatabase.SetMinSeq(ctx, userID, userCurrentMaxSeq); err != nil {
fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq)
}
fmt.Println("fix", userID, userCurrentMaxSeq)
}
}
fmt.Println("fix users seq success")
groupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err != nil {
return err
}
for _, groupID := range groupIDs {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, groupID)
if err != nil {
fmt.Println("GetGroupMaxSeq failed", groupID)
continue
}
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
if err != nil {
fmt.Println("get groupID", groupID, "failed, try again later")
continue
}
for _, userID := range userIDs {
userMinSeq, err := c.msgDatabase.GetMinSeq(ctx, groupID)
if err != nil && err != redis.Nil {
fmt.Println("GetGroupUserMinSeq failed", groupID, userID)
continue
}
if userMinSeq > maxSeq {
if err = c.msgDatabase.SetMinSeq(ctx, groupID, maxSeq); err != nil {
fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq)
}
fmt.Println("fix", groupID, userID, maxSeq, userMinSeq)
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
for _, conversationID := range conversationIDs {
if err := c.fixAndCheckSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
}
fmt.Println("fix all seq finished")

View File

@ -90,13 +90,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("init failed")
return
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -133,13 +133,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", testUID1)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -165,13 +165,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", conversationID)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -211,7 +211,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", conversationID)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
@ -221,7 +221,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -252,7 +252,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", conversationID)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
@ -262,7 +262,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -312,7 +312,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {

View File

@ -137,7 +137,10 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st
if seq.Err() != nil && seq.Err() != redis.Nil {
return nil, errs.Wrap(v.Err())
}
m[items[i]] = utils.StringToInt64(seq.Val())
val := utils.StringToInt64(seq.Val())
if val != 0 {
m[items[i]] = val
}
}
return m, nil
}

View File

@ -31,6 +31,7 @@ type ConversationDatabase interface {
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
GetAllConversationIDs(ctx context.Context) ([]string, error)
}
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -245,3 +246,7 @@ func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID st
func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
return c.cache.GetUserConversationIDsHash(ctx, ownerUserID)
}
func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) {
return c.conversationDB.GetAllConversationIDs(ctx)
}

View File

@ -56,6 +56,7 @@ type CommonMsgDatabase interface {
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
@ -678,6 +679,10 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
return
}
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) {
return db.GetMinMaxSeqMongo(ctx, conversationID)
}
func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil {

View File

@ -76,3 +76,7 @@ func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, c
var conversation relation.ConversationModel
return int(conversation.RecvMsgOpt), utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID).Select("recv_msg_opt").Find(&conversation).Error, "")
}
func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) {
return conversationIDs, utils.Wrap(c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, "")
}

View File

@ -44,5 +44,6 @@ type ConversationModelInterface interface {
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
GetAllConversationIDs(ctx context.Context) ([]string, error)
NewTx(tx any) ConversationModelInterface
}

View File

@ -276,9 +276,9 @@ func IsNotification(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_")
}
func GetNotificationConvetstionID(conversationID string) string {
func GetNotificationConversationIDByConversationID(conversationID string) string {
l := strings.Split(conversationID, "_")
if len(l) > 2 {
if len(l) > 1 {
l[0] = "n"
return strings.Join(l, "_")
}