This commit is contained in:
withchao 2025-02-07 18:43:59 +08:00
parent 60e65a4de6
commit 1f337b416c
7 changed files with 137 additions and 96 deletions

View File

@ -45,17 +45,17 @@ func main() {
}
}
cmd := newCmds(configPath)
putCmd1(cmd, auth.Start)
putCmd1(cmd, conversation.Start)
putCmd1(cmd, relation.Start)
putCmd1(cmd, group.Start)
putCmd1(cmd, msg.Start)
putCmd1(cmd, third.Start)
putCmd1(cmd, user.Start)
putCmd1(cmd, push.Start)
putCmd2(cmd, msggateway.Start)
putCmd2(cmd, msgtransfer.Start)
putCmd2(cmd, api.Start)
putCmd1(cmd, false, auth.Start)
putCmd1(cmd, false, conversation.Start)
putCmd1(cmd, false, relation.Start)
putCmd1(cmd, false, group.Start)
putCmd1(cmd, false, msg.Start)
putCmd1(cmd, false, third.Start)
putCmd1(cmd, false, user.Start)
putCmd1(cmd, false, push.Start)
putCmd3(cmd, true, msggateway.Start)
putCmd2(cmd, true, msgtransfer.Start)
putCmd2(cmd, true, api.Start)
ctx := context.Background()
if err := cmd.run(ctx); err != nil {
fmt.Println(err)
@ -176,8 +176,8 @@ func (x *cmds) parseConf(conf any) error {
return nil
}
func (x *cmds) add(name string, fn func(ctx context.Context) error) {
x.cmds = append(x.cmds, cmdName{Name: name, Func: fn})
func (x *cmds) add(name string, block bool, fn func(ctx context.Context) error) {
x.cmds = append(x.cmds, cmdName{Name: name, Block: block, Func: fn})
}
func (x *cmds) run(ctx context.Context) error {
@ -193,13 +193,14 @@ func (x *cmds) run(ctx context.Context) error {
for i := range x.cmds {
cmd := x.cmds[i]
go func() {
fmt.Println("start", cmd.Name)
//fmt.Println("start", cmd.Name)
if err := cmd.Func(ctx); err != nil {
fmt.Println("start failed", cmd.Name, err)
cancel(err)
cancel(fmt.Errorf("server %s exit %w", cmd.Name, err))
return
}
fmt.Println("start end", cmd.Name)
if cmd.Block {
cancel(fmt.Errorf("server %s exit", cmd.Name))
}
}()
}
<-ctx.Done()
@ -207,8 +208,9 @@ func (x *cmds) run(ctx context.Context) error {
}
type cmdName struct {
Name string
Func func(ctx context.Context) error
Name string
Func func(ctx context.Context) error
Block bool
}
func getFuncPacketName(fn any) string {
@ -219,8 +221,8 @@ func getFuncPacketName(fn any) string {
return name
}
func putCmd1[C any](cmd *cmds, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
cmd.add(getFuncPacketName(fn), func(ctx context.Context) error {
func putCmd1[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
@ -229,8 +231,8 @@ func putCmd1[C any](cmd *cmds, fn func(ctx context.Context, config *C, client di
})
}
func putCmd2[C any](cmd *cmds, fn func(ctx context.Context, index int, config *C) error) {
cmd.add(getFuncPacketName(fn), func(ctx context.Context) error {
func putCmd2[C any](cmd *cmds, block bool, fn func(ctx context.Context, index int, config *C) error) {
cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
@ -238,3 +240,13 @@ func putCmd2[C any](cmd *cmds, fn func(ctx context.Context, index int, config *C
return fn(ctx, 0, &conf)
})
}
func putCmd3[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar, index int) error) {
cmd.add(getFuncPacketName(fn), block, func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
}
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar(), 0)
})
}

3
go.mod
View File

@ -220,4 +220,5 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
)
replace github.com/openimsdk/tools => github.com/withchao/tools v0.0.50-alpha.38.0.20250207092500-cc9b95a0cdcc
//replace github.com/openimsdk/tools => github.com/withchao/tools v0.0.50-alpha.38.0.20250207092500-cc9b95a0cdcc
replace github.com/openimsdk/tools => /Users/chao/Desktop/code/tools

View File

@ -22,7 +22,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws"
@ -51,26 +50,26 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover
return nil
}
func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index,
conf.Discovery.RpcService.MessageGateway,
nil,
conf,
[]string{
conf.Share.GetConfigFileName(),
conf.Discovery.GetConfigFileName(),
conf.MsgGateway.GetConfigFileName(),
conf.WebhooksConfig.GetConfigFileName(),
conf.RedisConfig.GetConfigFileName(),
},
[]string{
conf.Discovery.RpcService.MessageGateway,
},
s.InitServer,
)
}
//func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
// return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
// conf.MsgGateway.RPC.RegisterIP,
// conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index,
// conf.Discovery.RpcService.MessageGateway,
// nil,
// conf,
// []string{
// conf.Share.GetConfigFileName(),
// conf.Discovery.GetConfigFileName(),
// conf.MsgGateway.GetConfigFileName(),
// conf.WebhooksConfig.GetConfigFileName(),
// conf.RedisConfig.GetConfigFileName(),
// },
// []string{
// conf.Discovery.RpcService.MessageGateway,
// },
// s.InitServer,
// )
//}
type Server struct {
msggateway.UnimplementedMsgGatewayServer

View File

@ -21,8 +21,10 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
"github.com/openimsdk/tools/log"
)
@ -36,7 +38,7 @@ type Config struct {
}
// Start run ws server.
func Start(ctx context.Context, index int, conf *Config) error {
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar, index int) error {
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
"rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
@ -63,12 +65,55 @@ func Start(ctx context.Context, index int, conf *Config) error {
return err
})
if err := hubServer.InitServer(ctx, conf, client, server); err != nil {
return err
}
go longServer.ChangeOnlineStatus(4)
netDone := make(chan error)
go func() {
err = hubServer.Start(ctx, index, conf)
netDone <- err
}()
return hubServer.LongConnServer.Run(netDone)
//netDone := make(chan error)
//go func() {
// err = hubServer.Start(ctx, index, conf)
// netDone <- err
//}()
return hubServer.LongConnServer.Run(ctx)
}
//
//// Start run ws server.
//func Start(ctx context.Context, index int, conf *Config) error {
// log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
// "rpcPorts", conf.MsgGateway.RPC.Ports,
// "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
// wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)
// if err != nil {
// return err
// }
//
// rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build())
// if err != nil {
// return err
// }
// longServer := NewWsServer(
// conf,
// WithPort(wsPort),
// WithMaxConnNum(int64(conf.MsgGateway.LongConnSvr.WebsocketMaxConnNum)),
// WithHandshakeTimeout(time.Duration(conf.MsgGateway.LongConnSvr.WebsocketTimeout)*time.Second),
// WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen),
// )
//
// hubServer := NewServer(longServer, conf, func(srv *Server) error {
// var err error
// longServer.online, err = rpccache.NewOnlineCache(srv.userClient, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges)
// return err
// })
//
// go longServer.ChangeOnlineStatus(4)
//
// netDone := make(chan error)
// go func() {
// err = hubServer.Start(ctx, index, conf)
// netDone <- err
// }()
// return hubServer.LongConnServer.Run(netDone)
//}

View File

@ -2,7 +2,6 @@ package msggateway
import (
"context"
"errors"
"fmt"
"net/http"
"sync"
@ -11,7 +10,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
pbAuth "github.com/openimsdk/protocol/auth"
@ -30,7 +28,7 @@ import (
)
type LongConnServer interface {
Run(done chan error) error
Run(ctx context.Context) error
wsHandler(w http.ResponseWriter, r *http.Request)
GetUserAllCons(userID string) ([]*Client, bool)
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
@ -158,19 +156,14 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
}
}
func (ws *WsServer) Run(done chan error) error {
var (
client *Client
netErr error
shutdownDone = make(chan struct{}, 1)
)
server := http.Server{Addr: ":" + stringutil.IntToString(ws.port), Handler: nil}
func (ws *WsServer) Run(ctx context.Context) error {
var client *Client
ctx, cancel := context.WithCancelCause(ctx)
go func() {
for {
select {
case <-shutdownDone:
case <-ctx.Done():
return
case client = <-ws.registerChan:
ws.registerClient(client)
@ -181,42 +174,31 @@ func (ws *WsServer) Run(done chan error) error {
}
}
}()
netDone := make(chan struct{}, 1)
server := http.Server{Addr: fmt.Sprintf(":%d", ws.port), Handler: nil}
go func() {
http.HandleFunc("/", ws.wsHandler)
err := server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, "ws start err", server.Addr)
netDone <- struct{}{}
if err == nil {
err = fmt.Errorf("http server closed")
}
cancel(fmt.Errorf("msg gateway %w", err))
}()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
shutDown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
sErr := server.Shutdown(ctx)
if sErr != nil {
return errs.WrapMsg(sErr, "shutdown err")
}
close(shutdownDone)
return nil
}
etcd.RegisterShutDown(shutDown)
defer cancel()
var err error
select {
case err = <-done:
if err := shutDown(); err != nil {
return err
}
if err != nil {
return err
}
case <-netDone:
}
return netErr
<-ctx.Done()
return shutDown()
}
var concurrentRequest = 3
const concurrentRequest = 3
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Discovery.RpcService.MessageGateway)

View File

@ -2,7 +2,7 @@ package push
import (
"context"
"errors"
"fmt"
"sync"
"github.com/openimsdk/protocol/msggateway"
@ -40,18 +40,17 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
return nil
}
func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher {
func NewOnlinePusher(disCov discovery.Conn, config *Config) (OnlinePusher, error) {
if runtimeenv.RuntimeEnvironment() == conf.KUBERNETES {
return NewDefaultAllNode(disCov, config)
return NewDefaultAllNode(disCov, config), nil
}
switch config.Discovery.Enable {
case conf.ETCD:
return NewDefaultAllNode(disCov, config)
return NewDefaultAllNode(disCov, config), nil
case conf.Standalone:
return nil
return NewDefaultAllNode(disCov, config), nil
default:
log.ZWarn(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable)
return nil
return nil, errs.New(fmt.Sprintf("unsupported discovery type %s", config.Discovery.Enable))
}
}

View File

@ -45,8 +45,6 @@ type ConsumerHandler struct {
}
func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, client discovery.Conn) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
var err error
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return nil, err
@ -63,13 +61,18 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller
if err != nil {
return nil, err
}
onlinePusher, err := NewOnlinePusher(client, config)
if err != nil {
return nil, err
}
var consumerHandler ConsumerHandler
consumerHandler.userClient = rpcli.NewUserClient(userConn)
consumerHandler.groupClient = rpcli.NewGroupClient(groupConn)
consumerHandler.msgClient = rpcli.NewMsgClient(msgConn)
consumerHandler.conversationClient = rpcli.NewConversationClient(conversationConn)
consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config)
consumerHandler.onlinePusher = onlinePusher
consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupClient, &config.LocalCacheConfig, rdb)
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationClient, &config.LocalCacheConfig, rdb)
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)