mirror of
https://github.com/gogf/gf.git
synced 2025-04-05 11:18:50 +08:00
improve address configuration for grpc server (#2982)
This commit is contained in:
parent
569cbb09bf
commit
30040332a7
@ -400,11 +400,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/polarismesh/polaris-go v1.5.3 h1:RL1m6FThsYCzKYGOLp5HXNCnzeqa5NEsgO0h5kxZXRM=
|
||||
github.com/polarismesh/polaris-go v1.5.3/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8=
|
||||
github.com/polarismesh/polaris-go v1.5.4 h1:Y/FaZk7OpdjVeRh3b4ZHYXF6xtjTkP0oCmVNVdm/GoQ=
|
||||
github.com/polarismesh/polaris-go v1.5.4/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8=
|
||||
github.com/polarismesh/specification v1.4.0 h1:fm7sUtFZC2g9+lLmRCtjGrUow47CY5JDFoZXwwCQGGY=
|
||||
github.com/polarismesh/specification v1.4.0/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
|
||||
github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A=
|
||||
github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
|
||||
|
@ -400,11 +400,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/polarismesh/polaris-go v1.5.3 h1:RL1m6FThsYCzKYGOLp5HXNCnzeqa5NEsgO0h5kxZXRM=
|
||||
github.com/polarismesh/polaris-go v1.5.3/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8=
|
||||
github.com/polarismesh/polaris-go v1.5.4 h1:Y/FaZk7OpdjVeRh3b4ZHYXF6xtjTkP0oCmVNVdm/GoQ=
|
||||
github.com/polarismesh/polaris-go v1.5.4/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8=
|
||||
github.com/polarismesh/specification v1.4.0 h1:fm7sUtFZC2g9+lLmRCtjGrUow47CY5JDFoZXwwCQGGY=
|
||||
github.com/polarismesh/specification v1.4.0/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
|
||||
github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A=
|
||||
github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
|
||||
|
@ -22,8 +22,8 @@ type (
|
||||
)
|
||||
|
||||
const (
|
||||
// FreePortAddress marks the server listens using random free port.
|
||||
FreePortAddress = ":0"
|
||||
FreePortAddress = ":0" // FreePortAddress marks the server listens using random free port.
|
||||
defaultListenAddress = ":0" // Default listening address for grpc server if no address configured.
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/net/gipv4"
|
||||
"github.com/gogf/gf/v2/net/gsvc"
|
||||
"github.com/gogf/gf/v2/net/gtcp"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/gproc"
|
||||
@ -37,6 +36,7 @@ type GrpcServer struct {
|
||||
services []gsvc.Service
|
||||
waitGroup sync.WaitGroup
|
||||
registrar gsvc.Registrar
|
||||
serviceMu sync.Mutex
|
||||
}
|
||||
|
||||
// Service implements gsvc.Service interface.
|
||||
@ -59,11 +59,7 @@ func (s modServer) New(conf ...*GrpcServerConfig) *GrpcServer {
|
||||
config = s.NewConfig()
|
||||
}
|
||||
if config.Address == "" {
|
||||
randomPort, err := gtcp.GetFreePort()
|
||||
if err != nil {
|
||||
g.Log().Fatalf(ctx, `%+v`, err)
|
||||
}
|
||||
config.Address = fmt.Sprintf(`:%d`, randomPort)
|
||||
config.Address = defaultListenAddress
|
||||
}
|
||||
if !gstr.Contains(config.Address, ":") {
|
||||
g.Log().Fatal(ctx, "invalid service address, should contain listening port")
|
||||
@ -94,6 +90,8 @@ func (s modServer) New(conf ...*GrpcServerConfig) *GrpcServer {
|
||||
// Service binds service list to current server.
|
||||
// Server will automatically register the service list after it starts.
|
||||
func (s *GrpcServer) Service(services ...gsvc.Service) {
|
||||
s.serviceMu.Lock()
|
||||
defer s.serviceMu.Unlock()
|
||||
s.services = append(s.services, services...)
|
||||
}
|
||||
|
||||
@ -148,6 +146,8 @@ func (s *GrpcServer) doServiceRegister() {
|
||||
if s.registrar == nil {
|
||||
return
|
||||
}
|
||||
s.serviceMu.Lock()
|
||||
defer s.serviceMu.Unlock()
|
||||
if len(s.services) == 0 {
|
||||
s.services = []gsvc.Service{&gsvc.LocalService{
|
||||
Name: s.config.Name,
|
||||
@ -186,6 +186,8 @@ func (s *GrpcServer) doServiceDeregister() {
|
||||
if s.registrar == nil {
|
||||
return
|
||||
}
|
||||
s.serviceMu.Lock()
|
||||
defer s.serviceMu.Unlock()
|
||||
var ctx = gctx.GetInitCtx()
|
||||
for _, service := range s.services {
|
||||
s.Logger().Debugf(ctx, `service deregister: %+v`, service)
|
||||
@ -257,19 +259,17 @@ func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoi
|
||||
)
|
||||
if len(addrArray) == 1 {
|
||||
configItemName := "address"
|
||||
if len(addresses) != 0 {
|
||||
if len(s.config.Endpoints) != 0 {
|
||||
configItemName = "endpoint"
|
||||
}
|
||||
panic(gerror.NewCodef(
|
||||
gcode.CodeInvalidConfiguration,
|
||||
`invalid %s configuration "%s", missing port`,
|
||||
`invalid "%s" configuration "%s", missing port`,
|
||||
configItemName, address,
|
||||
))
|
||||
}
|
||||
// IPs.
|
||||
switch addrArray[0] {
|
||||
case "127.0.0.1":
|
||||
// Nothing to do.
|
||||
case "0.0.0.0", "":
|
||||
intranetIps, err := gipv4.GetIntranetIpArray()
|
||||
if err != nil {
|
||||
@ -305,7 +305,10 @@ func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoi
|
||||
}
|
||||
for _, ip := range listenedIps {
|
||||
for _, port := range listenedPorts {
|
||||
endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)))
|
||||
endpoints = append(
|
||||
endpoints,
|
||||
gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
|
||||
// GrpcServerConfig is the configuration for server.
|
||||
type GrpcServerConfig struct {
|
||||
Address string // (optional) Address for server listening.
|
||||
Address string // (optional) Single address for server listening, use `:0` or `ip:0` to serve random port.
|
||||
Name string // (optional) Name for current service.
|
||||
Logger *glog.Logger // (optional) Logger for server.
|
||||
LogPath string // (optional) LogPath specifies the directory for storing logging files.
|
||||
|
@ -1,11 +1,47 @@
|
||||
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
package grpcx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
"github.com/gogf/gf/v2/text/gstr"
|
||||
)
|
||||
|
||||
func Test_Grpcx_Grpc_Server(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
s := Server.New()
|
||||
s.Start()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
defer s.Stop()
|
||||
s.serviceMu.Lock()
|
||||
defer s.serviceMu.Unlock()
|
||||
t.Assert(len(s.services) != 0, true)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Grpcx_Grpc_Server_Address(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
c := Server.NewConfig()
|
||||
c.Address = "127.0.0.1:0"
|
||||
s := Server.New(c)
|
||||
s.Start()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
defer s.Stop()
|
||||
|
||||
s.serviceMu.Lock()
|
||||
defer s.serviceMu.Unlock()
|
||||
t.Assert(len(s.services) != 0, true)
|
||||
t.Assert(gstr.Contains(s.services[0].GetEndpoints().String(), "127.0.0.1:"), true)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Grpcx_Grpc_Server_Config(t *testing.T) {
|
||||
cfg := Server.NewConfig()
|
||||
addr := "10.0.0.29:80"
|
||||
|
@ -24,6 +24,7 @@ var (
|
||||
// Use internal variable to guarantee concurrent safety
|
||||
// when multiple Listen happen.
|
||||
signalChan = make(chan os.Signal, 1)
|
||||
signalHandlerMu sync.Mutex
|
||||
signalHandlerMap = make(map[os.Signal][]SigHandler)
|
||||
shutdownSignalMap = map[os.Signal]struct{}{
|
||||
syscall.SIGINT: {},
|
||||
@ -42,6 +43,8 @@ func init() {
|
||||
|
||||
// AddSigHandler adds custom signal handler for custom one or more signals.
|
||||
func AddSigHandler(handler SigHandler, signals ...os.Signal) {
|
||||
signalHandlerMu.Lock()
|
||||
defer signalHandlerMu.Unlock()
|
||||
for _, sig := range signals {
|
||||
signalHandlerMap[sig] = append(signalHandlerMap[sig], handler)
|
||||
}
|
||||
@ -54,6 +57,8 @@ func AddSigHandler(handler SigHandler, signals ...os.Signal) {
|
||||
// syscall.SIGTERM,
|
||||
// syscall.SIGABRT.
|
||||
func AddSigHandlerShutdown(handler ...SigHandler) {
|
||||
signalHandlerMu.Lock()
|
||||
defer signalHandlerMu.Unlock()
|
||||
for _, h := range handler {
|
||||
for sig := range shutdownSignalMap {
|
||||
signalHandlerMap[sig] = append(signalHandlerMap[sig], h)
|
||||
@ -64,19 +69,16 @@ func AddSigHandlerShutdown(handler ...SigHandler) {
|
||||
// Listen blocks and does signal listening and handling.
|
||||
func Listen() {
|
||||
var (
|
||||
signals = make([]os.Signal, 0)
|
||||
signals = getHandlerSignals()
|
||||
ctx = context.Background()
|
||||
wg = sync.WaitGroup{}
|
||||
sig os.Signal
|
||||
)
|
||||
for s := range signalHandlerMap {
|
||||
signals = append(signals, s)
|
||||
}
|
||||
signal.Notify(signalChan, signals...)
|
||||
for {
|
||||
sig = <-signalChan
|
||||
intlog.Printf(ctx, `signal received: %s`, sig.String())
|
||||
if handlers, ok := signalHandlerMap[sig]; ok {
|
||||
if handlers := getHandlersBySignal(sig); len(handlers) > 0 {
|
||||
for _, handler := range handlers {
|
||||
wg.Add(1)
|
||||
var (
|
||||
@ -105,3 +107,19 @@ func Listen() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getHandlerSignals() []os.Signal {
|
||||
signalHandlerMu.Lock()
|
||||
defer signalHandlerMu.Unlock()
|
||||
var signals = make([]os.Signal, 0)
|
||||
for s := range signalHandlerMap {
|
||||
signals = append(signals, s)
|
||||
}
|
||||
return signals
|
||||
}
|
||||
|
||||
func getHandlersBySignal(sig os.Signal) []SigHandler {
|
||||
signalHandlerMu.Lock()
|
||||
defer signalHandlerMu.Unlock()
|
||||
return signalHandlerMap[sig]
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user