mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-01 00:42:13 +08:00 
			
		
		
		
	* fix: GetUserReqApplicationList error when there is a disbanded group chat
* fix: error when querying some information about disbanded group
* fix: GetUserReqApplicationList dismissed group error
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* fix: the original message referenced by the pull message processing is withdrawn
* merge
* cicd: robot automated Change
* sdkws.MsgData
* user
* interface{} -> any
* user
* third
* group
* group
* group
* group
* group
* group
* conversation
* standalone mysql db model
* tx
* s3
* group
* mongo
* group
* group
* group
* group
* group
* group
* refactor: add openim mysql to mongo refactor
Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
* refactor: add openim mysql to mongo refactor
Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
* remove mysql
* remove mysql
* friend
* friend
* friend
* friend
* friend
* friend
* group
* convert
* index
* index
* all
* all
* mysql2mongo
* data conversion
* up35
* up35
* feat: add format set
Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
* fix: fix scripts
Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
* merge main
* merge main
* Update init-config.sh
* fix: user args check
---------
Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
Co-authored-by: Xinwei Xiong <3293172751@qq.com>
		
	
			
		
			
				
	
	
		
			119 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			119 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright © 2023 OpenIM. All rights reserved.
 | ||
| //
 | ||
| // Licensed under the Apache License, Version 2.0 (the "License");
 | ||
| // you may not use this file except in compliance with the License.
 | ||
| // You may obtain a copy of the License at
 | ||
| //
 | ||
| //     http://www.apache.org/licenses/LICENSE-2.0
 | ||
| //
 | ||
| // Unless required by applicable law or agreed to in writing, software
 | ||
| // distributed under the License is distributed on an "AS IS" BASIS,
 | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | ||
| // See the License for the specific language governing permissions and
 | ||
| // limitations under the License.
 | ||
| 
 | ||
| package msgtransfer
 | ||
| 
 | ||
| import (
 | ||
| 	"errors"
 | ||
| 	"fmt"
 | ||
| 	"log"
 | ||
| 	"net/http"
 | ||
| 	"sync"
 | ||
| 
 | ||
| 	"github.com/OpenIMSDK/tools/mw"
 | ||
| 	"github.com/prometheus/client_golang/prometheus"
 | ||
| 	"github.com/prometheus/client_golang/prometheus/collectors"
 | ||
| 	"github.com/prometheus/client_golang/prometheus/promhttp"
 | ||
| 	"google.golang.org/grpc"
 | ||
| 	"google.golang.org/grpc/credentials/insecure"
 | ||
| 
 | ||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | ||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
 | ||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
 | ||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
 | ||
| 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
 | ||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 | ||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 | ||
| )
 | ||
| 
 | ||
| type MsgTransfer struct {
 | ||
| 	historyCH      *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
 | ||
| 	historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
 | ||
| 	// modifyCH       *ModifyMsgConsumerHandler          // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
 | ||
| }
 | ||
| 
 | ||
| func StartTransfer(prometheusPort int) error {
 | ||
| 	rdb, err := cache.NewRedis()
 | ||
| 	if err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	mongo, err := unrelation.NewMongo()
 | ||
| 	if err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	if err := mongo.CreateMsgIndex(); err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
 | ||
| 	/*
 | ||
| 		client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
 | ||
| 			openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
 | ||
| 				config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/
 | ||
| 	if err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | ||
| 	msgModel := cache.NewMsgCacheModel(rdb)
 | ||
| 	msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
 | ||
| 	msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
 | ||
| 	conversationRpcClient := rpcclient.NewConversationRpcClient(client)
 | ||
| 	groupRpcClient := rpcclient.NewGroupRpcClient(client)
 | ||
| 	msgTransfer := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
 | ||
| 	return msgTransfer.Start(prometheusPort)
 | ||
| }
 | ||
| 
 | ||
| func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer {
 | ||
| 	return &MsgTransfer{
 | ||
| 		historyCH:      NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
 | ||
| 		historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase),
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| func (m *MsgTransfer) Start(prometheusPort int) error {
 | ||
| 	var wg sync.WaitGroup
 | ||
| 	wg.Add(1)
 | ||
| 	fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
 | ||
| 	if prometheusPort <= 0 {
 | ||
| 		return errors.New("prometheusPort not correct")
 | ||
| 	}
 | ||
| 	if config.Config.ChatPersistenceMysql {
 | ||
| 		// go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
 | ||
| 	} else {
 | ||
| 		fmt.Println("msg transfer not start mysql consumer")
 | ||
| 	}
 | ||
| 	go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
 | ||
| 	go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
 | ||
| 	// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
 | ||
| 	/*err := prome.StartPrometheusSrv(prometheusPort)
 | ||
| 	if err != nil {
 | ||
| 		return err
 | ||
| 	}*/
 | ||
| 	////////////////////////////
 | ||
| 	if config.Config.Prometheus.Enable {
 | ||
| 		reg := prometheus.NewRegistry()
 | ||
| 		reg.MustRegister(
 | ||
| 			collectors.NewGoCollector(),
 | ||
| 		)
 | ||
| 		reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...)
 | ||
| 		http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
 | ||
| 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil))
 | ||
| 	}
 | ||
| 	////////////////////////////////////////
 | ||
| 	wg.Wait()
 | ||
| 	return nil
 | ||
| }
 |