mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 03:13:15 +08:00 
			
		
		
		
	* optimization: change the configuration file from being read globally to being read independently. * optimization: change the configuration file from being read globally to being read independently. * optimization: change the configuration file from being read globally to being read independently. * optimization: config file changed to dependency injection. * fix: replace global config with dependency injection * fix: replace global config with dependency injection * fix: import the enough param * fix: import the enough param * fix: import the enough param * fix: fix the component check of path * fix: fix the kafka of tls is nil problem * fix: fix the TLS.CACrt is nil error * fix: fix the valiable shadows problem * fix: fix the comflect * optimization: message remove options. * fix: fix the param pass error * fix: find error * fix: find error * fix: find eror * fix: find error * fix: find error * fix: del the undifined func * fix: find error * fix: fix the error * fix: pass config * fix: find error * fix: find error * fix: find error * fix: find error * fix: find error * fix: fix the config * fix: fix the error * fix: fix the config pass error * fix: fix the eror * fix: fix the error * fix: fix the error * fix: fix the error * fix: find error * fix: fix the error * fix: fix the config * fix: add return err * fix: fix the err2 * fix: err * fix: fix the func * fix: del the chinese comment * fix: fix the func * fix: fix the gateway_test logic * fix: s3 * test * test * fix: not found --------- Co-authored-by: luhaoling <2198702716@qq.com> Co-authored-by: withchao <993506633@qq.com>
		
			
				
	
	
		
			233 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			233 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright © 2023 OpenIM. All rights reserved.
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package tools
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"math/rand"
 | 
						|
 | 
						|
	"github.com/OpenIMSDK/protocol/sdkws"
 | 
						|
	"github.com/OpenIMSDK/tools/errs"
 | 
						|
	"github.com/OpenIMSDK/tools/log"
 | 
						|
	"github.com/OpenIMSDK/tools/mcontext"
 | 
						|
	"github.com/OpenIMSDK/tools/mw"
 | 
						|
	"github.com/OpenIMSDK/tools/tx"
 | 
						|
	"github.com/OpenIMSDK/tools/utils"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
 | 
						|
	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
 | 
						|
	"github.com/redis/go-redis/v9"
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/credentials/insecure"
 | 
						|
)
 | 
						|
 | 
						|
type MsgTool struct {
 | 
						|
	msgDatabase           controller.CommonMsgDatabase
 | 
						|
	conversationDatabase  controller.ConversationDatabase
 | 
						|
	userDatabase          controller.UserDatabase
 | 
						|
	groupDatabase         controller.GroupDatabase
 | 
						|
	msgNotificationSender *notification.MsgNotificationSender
 | 
						|
	Config                *config.GlobalConfig
 | 
						|
}
 | 
						|
 | 
						|
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
 | 
						|
	groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase,
 | 
						|
	msgNotificationSender *notification.MsgNotificationSender, config *config.GlobalConfig,
 | 
						|
) *MsgTool {
 | 
						|
	return &MsgTool{
 | 
						|
		msgDatabase:           msgDatabase,
 | 
						|
		userDatabase:          userDatabase,
 | 
						|
		groupDatabase:         groupDatabase,
 | 
						|
		conversationDatabase:  conversationDatabase,
 | 
						|
		msgNotificationSender: msgNotificationSender,
 | 
						|
		Config:                config,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func InitMsgTool(config *config.GlobalConfig) (*MsgTool, error) {
 | 
						|
	rdb, err := cache.NewRedis(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	mongo, err := unrelation.NewMongo(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	discov, err := kdisc.NewDiscoveryRegister(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
 | 
						|
	userDB, err := mgo.NewUserMongo(mongo.GetDatabase(config.Mongo.Database))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase(config.Mongo.Database), config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database))
 | 
						|
	ctxTx := tx.NewMongo(mongo.GetClient())
 | 
						|
	userDatabase := controller.NewUserDatabase(
 | 
						|
		userDB,
 | 
						|
		cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
 | 
						|
		ctxTx,
 | 
						|
		userMongoDB,
 | 
						|
	)
 | 
						|
	groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, ctxTx, nil)
 | 
						|
	conversationDatabase := controller.NewConversationDatabase(
 | 
						|
		conversationDB,
 | 
						|
		cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
 | 
						|
		ctxTx,
 | 
						|
	)
 | 
						|
	msgRpcClient := rpcclient.NewMessageRpcClient(discov, config)
 | 
						|
	msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
 | 
						|
	msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
 | 
						|
	return msgTool, nil
 | 
						|
}
 | 
						|
 | 
						|
//func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
 | 
						|
//	ctx := mcontext.NewCtx(utils.GetSelfFuncName())
 | 
						|
//	log.ZInfo(ctx, "============================ start del cron task ============================")
 | 
						|
//	conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
 | 
						|
//	if err != nil {
 | 
						|
//		log.ZError(ctx, "GetAllConversationIDs failed", err)
 | 
						|
//		return
 | 
						|
//	}
 | 
						|
//	for _, conversationID := range conversationIDs {
 | 
						|
//		conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
 | 
						|
//	}
 | 
						|
//	c.ClearConversationsMsg(ctx, conversationIDs)
 | 
						|
//	log.ZInfo(ctx, "============================ start del cron finished ============================")
 | 
						|
//}
 | 
						|
 | 
						|
func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
 | 
						|
	ctx := mcontext.NewCtx(utils.GetSelfFuncName())
 | 
						|
	log.ZInfo(ctx, "============================ start del cron task ============================")
 | 
						|
	num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
 | 
						|
	if err != nil {
 | 
						|
		log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	const batchNum = 50
 | 
						|
	log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
 | 
						|
	if num == 0 {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	count := int(num/batchNum + num/batchNum/2)
 | 
						|
	if count < 1 {
 | 
						|
		count = 1
 | 
						|
	}
 | 
						|
	maxPage := 1 + num/batchNum
 | 
						|
	if num%batchNum != 0 {
 | 
						|
		maxPage++
 | 
						|
	}
 | 
						|
	for i := 0; i < count; i++ {
 | 
						|
		pageNumber := rand.Int63() % maxPage
 | 
						|
		pagination := &sdkws.RequestPagination{
 | 
						|
			PageNumber: int32(pageNumber),
 | 
						|
			ShowNumber: batchNum,
 | 
						|
		}
 | 
						|
		conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
 | 
						|
		if err != nil {
 | 
						|
			log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
 | 
						|
		if len(conversationIDs) == 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		c.ClearConversationsMsg(ctx, conversationIDs)
 | 
						|
	}
 | 
						|
	log.ZInfo(ctx, "============================ start del cron finished ============================")
 | 
						|
}
 | 
						|
 | 
						|
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
 | 
						|
	for _, conversationID := range conversationIDs {
 | 
						|
		if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.Config.RetainChatRecords*24*60*60)); err != nil {
 | 
						|
			log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", c.Config.RetainChatRecords)
 | 
						|
		}
 | 
						|
		if err := c.checkMaxSeq(ctx, conversationID); err != nil {
 | 
						|
			log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
 | 
						|
	minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
 | 
						|
		err = fmt.Errorf("cache max seq and mongo max seq is diff > 10,  maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s", maxSeqMongo, minSeqMongo, maxSeqCache, conversationID)
 | 
						|
		return errs.Wrap(err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error {
 | 
						|
	maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
 | 
						|
	if err != nil {
 | 
						|
		if errs.Unwrap(err) == redis.Nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
 | 
						|
	conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	for _, conversationID := range conversationIDs {
 | 
						|
		conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
 | 
						|
	}
 | 
						|
	for _, conversationID := range conversationIDs {
 | 
						|
		if err := c.checkMaxSeq(ctx, conversationID); err != nil {
 | 
						|
			log.ZWarn(ctx, "fixSeq failed", err, "conversationID", conversationID)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	fmt.Println("fix all seq finished")
 | 
						|
	return nil
 | 
						|
}
 |