This commit is contained in:
withchao 2025-02-11 16:31:53 +08:00
parent ec6c840165
commit 04bac6ddb7
26 changed files with 303 additions and 207 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net"
@ -29,16 +30,23 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/standalone"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
func init() {
config.SetStandalone()
prommetrics.RegistryAll()
}
func main() {
var configPath string
flag.StringVar(&configPath, "c", "", "config path")
@ -62,6 +70,7 @@ func main() {
putCmd(cmd, true, msggateway.Start)
putCmd(cmd, true, msgtransfer.Start)
putCmd(cmd, true, api.Start)
putCmd(cmd, true, tools.Start)
ctx := context.Background()
if err := cmd.run(ctx); err != nil {
fmt.Println(err)
@ -91,7 +100,7 @@ func (x *cmds) getTypePath(typ reflect.Type) string {
}
func (x *cmds) initDiscovery() {
x.config.Discovery.Enable = config.Standalone
x.config.Discovery.Enable = "standalone"
vof := reflect.ValueOf(&x.config.Discovery.RpcService).Elem()
tof := reflect.TypeOf(&x.config.Discovery.RpcService).Elem()
num := tof.NumField()
@ -143,6 +152,9 @@ func (x *cmds) initAllConfig() error {
}
}
x.initDiscovery()
x.config.Redis.Disable = true
x.config.LocalCache = config.LocalCache{}
config.InitNotification(&x.config.Notification)
return nil
}
@ -209,13 +221,23 @@ func (x *cmds) run(ctx context.Context) error {
return err
}
}
ip, err := network.GetLocalIP()
if err != nil {
return err
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return fmt.Errorf("prometheus listen %d error %w", port, err)
}
defer listener.Close()
prommetrics.RegistryAll()
log.ZDebug(ctx, "prometheus start", "addr", listener.Addr())
target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port))
if err != nil {
return err
}
if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
return err
}
go func() {
err := prommetrics.Start(listener)
if err == nil {
@ -278,7 +300,6 @@ func (x *cmds) run(ctx context.Context) error {
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Second * 20):
log.ZError(ctx, "server exit timeout", nil)

View File

@ -17,17 +17,15 @@ package api
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
@ -60,7 +58,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g
Handler: router,
Addr: net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)),
}
log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort)
go func() {
defer close(done)
<-ctx.Done()
@ -69,6 +66,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g
log.ZWarn(ctx, "api server shutdown err", err)
}
}()
log.CInfo(ctx, "api server is init", "runtimeEnv", runtimeenv.RuntimeEnvironment(), "address", httpServer.Addr, "apiPort", apiPort)
err := httpServer.ListenAndServe()
if err == nil {
err = errors.New("api done")
@ -76,19 +74,18 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, service g
cancel(err)
}()
//if config.Discovery.Enable == conf.ETCD {
// cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
// cm.Watch(ctx)
//}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case val := <-sigs:
log.ZDebug(ctx, "recv exit", "signal", val.String())
cancel(fmt.Errorf("signal %s", val.String()))
case <-ctx.Done():
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}
//sigs := make(chan os.Signal, 1)
//signal.Notify(sigs, syscall.SIGTERM)
//select {
//case val := <-sigs:
// log.ZDebug(ctx, "recv exit", "signal", val.String())
// cancel(fmt.Errorf("signal %s", val.String()))
//case <-ctx.Done():
//}
exitCause := context.Cause(ctx)
log.ZWarn(ctx, "api server exit", exitCause)
timer := time.NewTimer(time.Second * 15)

View File

@ -19,8 +19,8 @@ import (
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
@ -48,10 +48,12 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc
return err
}
rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build())
dbb := dbbuild.NewBuilder(nil, &conf.RedisConfig)
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
longServer := NewWsServer(
conf,
WithPort(wsPort),

View File

@ -16,34 +16,22 @@ package msgtransfer
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/mqbuild"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
"google.golang.org/grpc"
@ -74,25 +62,19 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig)
builder := mqbuild.NewBuilder(&config.KafkaConfig)
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", config.Index)
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
//client, err := discRegister.NewDiscoveryRegister(&config.Discovery, nil)
//if err != nil {
// return err
//}
//client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
// grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
@ -193,67 +175,6 @@ func (m *MsgTransfer) Start(ctx context.Context, index int, config *Config, clie
return err
}
registerIP, err := network.GetRpcRegisterIP("")
if err != nil {
return err
}
getAutoPort := func() (net.Listener, int, error) {
registerAddr := net.JoinHostPort(registerIP, "0")
listener, err := net.Listen("tcp", registerAddr)
if err != nil {
return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr)
}
_, portStr, _ := net.SplitHostPort(listener.Addr().String())
port, _ := strconv.Atoi(portStr)
return listener, port, nil
}
if config.Discovery.Enable != conf.Standalone && config.MsgTransfer.Prometheus.Enable {
if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}
var (
listener net.Listener
prometheusPort int
)
if config.MsgTransfer.Prometheus.AutoSetPorts {
listener, prometheusPort, err = getAutoPort()
if err != nil {
return err
}
etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
_, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort)))
if err != nil {
return errs.WrapMsg(err, "etcd put err")
}
} else {
prometheusPort, err = datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index)
if err != nil {
return err
}
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort))
}
}
go func() {
defer func() {
if r := recover(); r != nil {
log.ZPanic(m.ctx, "MsgTransfer Start Panic", errs.ErrPanic(r))
}
}()
if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
netDone <- struct{}{}
}
}()
}
// todo
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

View File

@ -41,14 +41,15 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
}
func NewOnlinePusher(disCov discovery.Conn, config *Config) (OnlinePusher, error) {
if conf.Standalone() {
return NewDefaultAllNode(disCov, config), nil
}
if runtimeenv.RuntimeEnvironment() == conf.KUBERNETES {
return NewDefaultAllNode(disCov, config), nil
}
switch config.Discovery.Enable {
case conf.ETCD:
return NewDefaultAllNode(disCov, config), nil
case conf.Standalone:
return NewDefaultAllNode(disCov, config), nil
default:
return nil, errs.New(fmt.Sprintf("unsupported discovery type %s", config.Discovery.Enable))
}

View File

@ -9,9 +9,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/mqbuild"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
@ -46,7 +46,8 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
dbb := dbbuild.NewBuilder(nil, &config.RedisConfig)
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
@ -55,7 +56,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil {
return err
}
builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig)
builder := mqbuild.NewBuilder(&config.KafkaConfig)
offlinePushProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToOfflinePushTopic)
if err != nil {

View File

@ -18,11 +18,11 @@ import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
@ -56,7 +56,8 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
dbb := dbbuild.NewBuilder(nil, &config.RedisConfig)
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}

View File

@ -19,24 +19,22 @@ import (
"sort"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@ -67,11 +65,12 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}

View File

@ -23,6 +23,7 @@ import (
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -43,8 +44,6 @@ import (
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@ -78,11 +77,12 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
@ -98,11 +98,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil {
return err
}
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
//msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
//conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err

View File

@ -17,23 +17,21 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/mqbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/notification"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
)
@ -79,16 +77,17 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
builder := mqbuild.NewBuilder(&config.Discovery, &config.KafkaConfig)
builder := mqbuild.NewBuilder(&config.KafkaConfig)
redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic)
if err != nil {
return err
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}

View File

@ -17,26 +17,24 @@ package relation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
@ -68,11 +66,12 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}

View File

@ -19,6 +19,7 @@ import (
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -30,8 +31,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cos"
@ -62,14 +61,16 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
logdb, err := mgo.NewLogMongo(mgocli.GetDB())
if err != nil {
return err

View File

@ -23,27 +23,25 @@ import (
"time"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
friendpb "github.com/openimsdk/protocol/relation"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@ -77,14 +75,16 @@ type Config struct {
}
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
users := make([]*tablerelation.User, 0)
for _, v := range config.Share.IMAdminUserID {

View File

@ -4,41 +4,34 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/robfig/cron/v3"
"google.golang.org/grpc"
)
type CronTaskConfig struct {
type Config struct {
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
}
func Start(ctx context.Context, conf *CronTaskConfig) error {
// func Start(ctx context.Context, conf *CronTaskConfig) error {
func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
if conf.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
return nil
}
client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, nil)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0])
msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg)
@ -91,7 +84,7 @@ func Start(ctx context.Context, conf *CronTaskConfig) error {
type cronServer struct {
ctx context.Context
config *CronTaskConfig
config *Config
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient

View File

@ -74,12 +74,15 @@ func (a *ApiCmd) Exec() error {
func (a *ApiCmd) runE() error {
a.apiConfig.Index = config.Index(a.Index())
var prometheus config.Prometheus
prometheus := config.Prometheus{
Enable: a.apiConfig.API.Prometheus.Enable,
Ports: a.apiConfig.API.Prometheus.Ports,
}
return startrpc.Start(
a.ctx, &a.apiConfig.Discovery,
&prometheus,
a.apiConfig.API.Api.ListenIP, "",
false,
a.apiConfig.API.Prometheus.AutoSetPorts,
nil, int(a.apiConfig.Index),
a.apiConfig.Discovery.RpcService.MessageGateway,
&a.apiConfig.Notification,

View File

@ -19,6 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@ -28,11 +29,11 @@ type CronTaskCmd struct {
*RootCmd
ctx context.Context
configMap map[string]any
cronTaskConfig *tools.CronTaskConfig
cronTaskConfig *tools.Config
}
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig tools.CronTaskConfig
var cronTaskConfig tools.Config
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
@ -52,5 +53,18 @@ func (a *CronTaskCmd) Exec() error {
}
func (a *CronTaskCmd) runE() error {
return tools.Start(a.ctx, a.cronTaskConfig)
var prometheus config.Prometheus
return startrpc.Start(
a.ctx, &a.cronTaskConfig.Discovery,
&prometheus,
"", "",
false,
nil, 0,
"",
nil,
a.cronTaskConfig,
[]string{},
[]string{},
tools.Start,
)
}

View File

@ -323,6 +323,7 @@ type RPC struct {
}
type Redis struct {
Disable bool `yaml:"disable"`
Address []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`

View File

@ -23,7 +23,7 @@ const (
DeploymentType = "DEPLOYMENT_TYPE"
KUBERNETES = runtimeenv.Kubernetes
ETCD = "etcd"
Standalone = "standalone"
//Standalone = "standalone"
)
const (

View File

@ -0,0 +1,11 @@
package config
var standalone bool
func SetStandalone() {
standalone = true
}
func Standalone() bool {
return standalone
}

View File

@ -31,6 +31,9 @@ import (
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
if config.Standalone() {
return standalone.GetSvcDiscoveryRegistry(), nil
}
if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
grpc.WithDefaultCallOptions(
@ -40,8 +43,6 @@ func NewDiscoveryRegister(discovery *config.Discovery, watchNames []string) (dis
}
switch discovery.Enable {
case config.Standalone:
return standalone.GetSvcDiscoveryRegistry(), nil
case config.ETCD:
return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,

View File

@ -16,6 +16,7 @@ package startrpc
import (
"context"
"errors"
"fmt"
"net"
"os"
@ -26,6 +27,7 @@ import (
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/network"
"google.golang.org/grpc/status"
@ -39,7 +41,11 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
func init() {
prommetrics.RegistryAll()
}
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error,
@ -49,10 +55,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
conf.InitNotification(notification)
}
if discovery.Enable == conf.Standalone {
return nil
}
options = append(options, mw.GrpcServer())
registerIP, err := network.GetRpcRegisterIP(registerIP)
@ -83,7 +85,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
watchConfigNames = append(watchConfigNames, conf.LogConfigFileName)
client, err := kdisc.NewDiscoveryRegister(discovery, watchServiceNames)
client, err := kdisc.NewDiscoveryRegister(disc, watchServiceNames)
if err != nil {
return err
}
@ -116,15 +118,20 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
if err != nil {
return err
}
if err := client.Register(ctx, "prometheus_"+rpcRegisterName, registerIP, prometheusPort); err != nil {
log.ZDebug(ctx, "prometheus start", "addr", prometheusListener.Addr(), "rpcRegisterName", rpcRegisterName)
target, err := jsonutil.JsonMarshal(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))
if err != nil {
return err
}
cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery)
if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
if !errors.Is(err, discovery.ErrNotSupported) {
return err
}
}
go func() {
err := prommetrics.RpcInit(cs, prometheusListener)
err := prommetrics.Start(prometheusListener)
if err == nil {
err = fmt.Errorf("serve end")
err = fmt.Errorf("listener done")
}
cancel(fmt.Errorf("prommetrics %s %w", rpcRegisterName, err))
}()

25
pkg/dbbuild/builder.go Normal file
View File

@ -0,0 +1,25 @@
package dbbuild
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/redis/go-redis/v9"
)
type Builder interface {
Mongo(ctx context.Context) (*mongoutil.Client, error)
Redis(ctx context.Context) (redis.UniversalClient, error)
}
func NewBuilder(mongoConf *config.Mongo, redisConf *config.Redis) Builder {
if config.Standalone() {
globalStandalone.setConfig(mongoConf, redisConf)
return globalStandalone
}
return &microservices{
mongo: mongoConf,
redis: redisConf,
}
}

View File

@ -0,0 +1,26 @@
package dbbuild
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"
)
type microservices struct {
mongo *config.Mongo
redis *config.Redis
}
func (x *microservices) Mongo(ctx context.Context) (*mongoutil.Client, error) {
return mongoutil.NewMongoDB(ctx, x.mongo.Build())
}
func (x *microservices) Redis(ctx context.Context) (redis.UniversalClient, error) {
if x.redis.Disable {
return nil, nil
}
return redisutil.NewRedisClient(ctx, x.redis.Build())
}

76
pkg/dbbuild/standalone.go Normal file
View File

@ -0,0 +1,76 @@
package dbbuild
import (
"context"
"sync"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"
)
const (
standaloneMongo = "mongo"
standaloneRedis = "redis"
)
var globalStandalone = &standalone{}
type standaloneConn[C any] struct {
Conn C
Err error
}
func (x *standaloneConn[C]) result() (C, error) {
return x.Conn, x.Err
}
type standalone struct {
lock sync.Mutex
mongo *config.Mongo
redis *config.Redis
conn map[string]any
}
func (x *standalone) setConfig(mongoConf *config.Mongo, redisConf *config.Redis) {
x.lock.Lock()
defer x.lock.Unlock()
x.mongo = mongoConf
x.redis = redisConf
}
func (x *standalone) Mongo(ctx context.Context) (*mongoutil.Client, error) {
x.lock.Lock()
defer x.lock.Unlock()
if x.conn == nil {
x.conn = make(map[string]any)
}
v, ok := x.conn[standaloneMongo]
if !ok {
var val standaloneConn[*mongoutil.Client]
val.Conn, val.Err = mongoutil.NewMongoDB(ctx, x.mongo.Build())
v = &val
x.conn[standaloneMongo] = v
}
return v.(*standaloneConn[*mongoutil.Client]).result()
}
func (x *standalone) Redis(ctx context.Context) (redis.UniversalClient, error) {
x.lock.Lock()
defer x.lock.Unlock()
if x.redis.Disable {
return nil, nil
}
if x.conn == nil {
x.conn = make(map[string]any)
}
v, ok := x.conn[standaloneRedis]
if !ok {
var val standaloneConn[redis.UniversalClient]
val.Conn, val.Err = redisutil.NewRedisClient(ctx, x.redis.Build())
v = &val
x.conn[standaloneRedis] = v
}
return v.(*standaloneConn[redis.UniversalClient]).result()
}

View File

@ -15,8 +15,8 @@ type Builder interface {
GetTopicConsumer(ctx context.Context, topic string) (mq.Consumer, error)
}
func NewBuilder(discovery *config.Discovery, kafka *config.Kafka) Builder {
if discovery.Enable == config.Standalone {
func NewBuilder(kafka *config.Kafka) Builder {
if config.Standalone() {
return standaloneBuilder{}
}
return &kafkaBuilder{

View File

@ -3,15 +3,16 @@ package rpccache
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
@ -51,10 +52,11 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.
x.CurrentPhase.Store(DoSubscribeOver)
x.Cond.Broadcast()
}
go func() {
x.doSubscribe(ctx, rdb, fn)
}()
if rdb != nil {
go func() {
x.doSubscribe(ctx, rdb, fn)
}()
}
return x, nil
}