mirror of
https://github.com/gogf/gf.git
synced 2025-04-05 11:18:50 +08:00
fix(net/ghttp): occasional ci failed by unit testing cases using gctp.GetFreePort
(#3982)
This commit is contained in:
parent
2e788be1d3
commit
38bffc77e2
10
.github/workflows/golangci-lint.yml
vendored
10
.github/workflows/golangci-lint.yml
vendored
@ -15,15 +15,6 @@ on:
|
||||
- enhance/**
|
||||
- fix/**
|
||||
- feat/**
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
- develop
|
||||
- personal/**
|
||||
- feature/**
|
||||
- enhance/**
|
||||
- fix/**
|
||||
- feat/**
|
||||
|
||||
jobs:
|
||||
golangci:
|
||||
@ -64,7 +55,6 @@ jobs:
|
||||
-s "prefix(github.com/gogf/gf/example)" \
|
||||
./
|
||||
- name: Check for changes
|
||||
id: check_changes
|
||||
run: |
|
||||
if [[ -n "$(git status --porcelain)" ]]; then
|
||||
echo "HAS_CHANGES=true" >> $GITHUB_ENV
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/gogf/gf/v2/container/gtype"
|
||||
"github.com/gogf/gf/v2/errors/gcode"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/net/ghttp/internal/graceful"
|
||||
"github.com/gogf/gf/v2/net/goai"
|
||||
"github.com/gogf/gf/v2/net/gsvc"
|
||||
"github.com/gogf/gf/v2/os/gcache"
|
||||
@ -34,7 +35,7 @@ type (
|
||||
instance string // Instance name of current HTTP server.
|
||||
config ServerConfig // Server configuration.
|
||||
plugins []Plugin // Plugin array to extend server functionality.
|
||||
servers []*gracefulServer // Underlying http.Server array.
|
||||
servers []*graceful.Server // Underlying http.Server array.
|
||||
serverCount *gtype.Int // Underlying http.Server number for internal usage.
|
||||
closeChan chan struct{} // Used for underlying server closing event notification.
|
||||
serveTree map[string]interface{} // The route maps tree.
|
||||
@ -69,7 +70,7 @@ type (
|
||||
Method string // Handler method name.
|
||||
Route string // Route URI.
|
||||
Priority int // Just for reference.
|
||||
IsServiceHandler bool // Is service handler.
|
||||
IsServiceHandler bool // Is a service handler.
|
||||
}
|
||||
|
||||
// HandlerFunc is request handler function.
|
||||
@ -127,42 +128,41 @@ type (
|
||||
|
||||
const (
|
||||
// FreePortAddress marks the server listens using random free port.
|
||||
FreePortAddress = ":0"
|
||||
FreePortAddress = graceful.FreePortAddress
|
||||
)
|
||||
|
||||
const (
|
||||
HeaderXUrlPath = "x-url-path" // Used for custom route handler, which does not change URL.Path.
|
||||
HookBeforeServe HookName = "HOOK_BEFORE_SERVE" // Hook handler before route handler/file serving.
|
||||
HookAfterServe HookName = "HOOK_AFTER_SERVE" // Hook handler after route handler/file serving.
|
||||
HookBeforeOutput HookName = "HOOK_BEFORE_OUTPUT" // Hook handler before response output.
|
||||
HookAfterOutput HookName = "HOOK_AFTER_OUTPUT" // Hook handler after response output.
|
||||
ServerStatusStopped ServerStatus = 0
|
||||
ServerStatusRunning ServerStatus = 1
|
||||
DefaultServerName = "default"
|
||||
DefaultDomainName = "default"
|
||||
HandlerTypeHandler HandlerType = "handler"
|
||||
HandlerTypeObject HandlerType = "object"
|
||||
HandlerTypeMiddleware HandlerType = "middleware"
|
||||
HandlerTypeHook HandlerType = "hook"
|
||||
HeaderXUrlPath = "x-url-path" // Used for custom route handler, which does not change URL.Path.
|
||||
HookBeforeServe HookName = "HOOK_BEFORE_SERVE" // Hook handler before route handler/file serving.
|
||||
HookAfterServe HookName = "HOOK_AFTER_SERVE" // Hook handler after route handler/file serving.
|
||||
HookBeforeOutput HookName = "HOOK_BEFORE_OUTPUT" // Hook handler before response output.
|
||||
HookAfterOutput HookName = "HOOK_AFTER_OUTPUT" // Hook handler after response output.
|
||||
DefaultServerName = "default"
|
||||
DefaultDomainName = "default"
|
||||
HandlerTypeHandler HandlerType = "handler"
|
||||
HandlerTypeObject HandlerType = "object"
|
||||
HandlerTypeMiddleware HandlerType = "middleware"
|
||||
HandlerTypeHook HandlerType = "hook"
|
||||
ServerStatusStopped = graceful.ServerStatusStopped
|
||||
ServerStatusRunning = graceful.ServerStatusRunning
|
||||
)
|
||||
|
||||
const (
|
||||
supportedHttpMethods = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE"
|
||||
defaultMethod = "ALL"
|
||||
routeCacheDuration = time.Hour
|
||||
ctxKeyForRequest gctx.StrKey = "gHttpRequestObject"
|
||||
contentTypeXml = "text/xml"
|
||||
contentTypeHtml = "text/html"
|
||||
contentTypeJson = "application/json"
|
||||
contentTypeJavascript = "application/javascript"
|
||||
swaggerUIPackedPath = "/goframe/swaggerui"
|
||||
responseHeaderTraceID = "Trace-ID"
|
||||
responseHeaderContentLength = "Content-Length"
|
||||
specialMethodNameInit = "Init"
|
||||
specialMethodNameShut = "Shut"
|
||||
specialMethodNameIndex = "Index"
|
||||
defaultEndpointPort = 80
|
||||
noPrintInternalRoute = "internalMiddlewareServerTracing"
|
||||
supportedHttpMethods = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE"
|
||||
defaultMethod = "ALL"
|
||||
routeCacheDuration = time.Hour
|
||||
ctxKeyForRequest gctx.StrKey = "gHttpRequestObject"
|
||||
contentTypeXml = "text/xml"
|
||||
contentTypeHtml = "text/html"
|
||||
contentTypeJson = "application/json"
|
||||
contentTypeJavascript = "application/javascript"
|
||||
swaggerUIPackedPath = "/goframe/swaggerui"
|
||||
responseHeaderTraceID = "Trace-ID"
|
||||
specialMethodNameInit = "Init"
|
||||
specialMethodNameShut = "Shut"
|
||||
specialMethodNameIndex = "Index"
|
||||
defaultEndpointPort = 80
|
||||
noPrintInternalRoute = "internalMiddlewareServerTracing"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/gogf/gf/v2/errors/gcode"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/internal/intlog"
|
||||
"github.com/gogf/gf/v2/net/ghttp/internal/graceful"
|
||||
"github.com/gogf/gf/v2/net/ghttp/internal/swaggerui"
|
||||
"github.com/gogf/gf/v2/net/goai"
|
||||
"github.com/gogf/gf/v2/net/gsvc"
|
||||
@ -97,7 +98,7 @@ func GetServer(name ...interface{}) *Server {
|
||||
s := &Server{
|
||||
instance: serverName,
|
||||
plugins: make([]Plugin, 0),
|
||||
servers: make([]*gracefulServer, 0),
|
||||
servers: make([]*graceful.Server, 0),
|
||||
closeChan: make(chan struct{}, 10000),
|
||||
serverCount: gtype.NewInt(),
|
||||
statusHandlerMap: make(map[string][]HandlerFunc),
|
||||
@ -535,9 +536,9 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
if fd > 0 {
|
||||
s.servers = append(s.servers, s.newGracefulServer(itemFunc, fd))
|
||||
} else {
|
||||
s.servers = append(s.servers, s.newGracefulServer(itemFunc))
|
||||
s.servers = append(s.servers, s.newGracefulServer(itemFunc, 0))
|
||||
}
|
||||
s.servers[len(s.servers)-1].isHttps = true
|
||||
s.servers[len(s.servers)-1].SetIsHttps(true)
|
||||
}
|
||||
}
|
||||
// HTTP
|
||||
@ -570,7 +571,7 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
if fd > 0 {
|
||||
s.servers = append(s.servers, s.newGracefulServer(itemFunc, fd))
|
||||
} else {
|
||||
s.servers = append(s.servers, s.newGracefulServer(itemFunc))
|
||||
s.servers = append(s.servers, s.newGracefulServer(itemFunc, 0))
|
||||
}
|
||||
}
|
||||
// Start listening asynchronously.
|
||||
@ -583,11 +584,11 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Server) startGracefulServer(ctx context.Context, wg *sync.WaitGroup, server *gracefulServer) {
|
||||
func (s *Server) startGracefulServer(ctx context.Context, wg *sync.WaitGroup, server *graceful.Server) {
|
||||
s.serverCount.Add(1)
|
||||
var err error
|
||||
// Create listener.
|
||||
if server.isHttps {
|
||||
if server.IsHttps() {
|
||||
err = server.CreateListenerTLS(
|
||||
s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig,
|
||||
)
|
||||
@ -621,7 +622,7 @@ func (s *Server) Status() ServerStatus {
|
||||
}
|
||||
// If any underlying server is running, the server status is running.
|
||||
for _, v := range s.servers {
|
||||
if v.status.Val() == ServerStatusRunning {
|
||||
if v.Status() == ServerStatusRunning {
|
||||
return ServerStatusRunning
|
||||
}
|
||||
}
|
||||
@ -636,8 +637,8 @@ func (s *Server) getListenerFdMap() map[string]string {
|
||||
"http": "",
|
||||
}
|
||||
for _, v := range s.servers {
|
||||
str := v.address + "#" + gconv.String(v.Fd()) + ","
|
||||
if v.isHttps {
|
||||
str := v.GetAddress() + "#" + gconv.String(v.Fd()) + ","
|
||||
if v.IsHttps() {
|
||||
if len(m["https"]) > 0 {
|
||||
m["https"] += ","
|
||||
}
|
||||
@ -653,12 +654,29 @@ func (s *Server) getListenerFdMap() map[string]string {
|
||||
}
|
||||
|
||||
// GetListenedPort retrieves and returns one port which is listened by current server.
|
||||
// It returns the normal HTTP port in most priority if both HTTP and HTTPS are enabled.
|
||||
func (s *Server) GetListenedPort() int {
|
||||
ports := s.GetListenedPorts()
|
||||
if len(ports) > 0 {
|
||||
return ports[0]
|
||||
for _, server := range s.servers {
|
||||
if !server.IsHttps() {
|
||||
return server.GetListenedPort()
|
||||
}
|
||||
}
|
||||
return 0
|
||||
for _, server := range s.servers {
|
||||
if server.IsHttps() {
|
||||
return server.GetListenedPort()
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// GetListenedHTTPSPort retrieves and returns one port which is listened using TLS by current server.
|
||||
func (s *Server) GetListenedHTTPSPort() int {
|
||||
for _, server := range s.servers {
|
||||
if server.IsHttps() {
|
||||
return server.GetListenedPort()
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// GetListenedPorts retrieves and returns the ports which are listened by current server.
|
||||
|
@ -95,7 +95,7 @@ func (s *Server) Shutdown() error {
|
||||
// Only shut down current servers.
|
||||
// It may have multiple underlying http servers.
|
||||
for _, v := range s.servers {
|
||||
v.shutdown(ctx)
|
||||
v.Shutdown(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ func shutdownWebServersGracefully(ctx context.Context, signal os.Signal) {
|
||||
server := v.(*Server)
|
||||
server.doServiceDeregister()
|
||||
for _, s := range server.servers {
|
||||
s.shutdown(ctx)
|
||||
s.Shutdown(ctx)
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -279,7 +279,7 @@ func forceCloseWebServers(ctx context.Context) {
|
||||
serverMapping.RLockFunc(func(m map[string]interface{}) {
|
||||
for _, v := range m {
|
||||
for _, s := range v.(*Server).servers {
|
||||
s.close(ctx)
|
||||
s.Close(ctx)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -399,6 +399,7 @@ func (s *Server) SetAddr(address string) {
|
||||
}
|
||||
|
||||
// SetPort sets the listening ports for the server.
|
||||
// It uses random port if the port is 0.
|
||||
// The listening ports can be multiple like: SetPort(80, 8080).
|
||||
func (s *Server) SetPort(port ...int) {
|
||||
if len(port) > 0 {
|
||||
@ -418,6 +419,7 @@ func (s *Server) SetHTTPSAddr(address string) {
|
||||
}
|
||||
|
||||
// SetHTTPSPort sets the HTTPS listening ports for the server.
|
||||
// It uses random port if the port is 0.
|
||||
// The listening ports can be multiple like: SetHTTPSPort(443, 500).
|
||||
func (s *Server) SetHTTPSPort(port ...int) {
|
||||
if len(port) > 0 {
|
||||
|
@ -6,269 +6,24 @@
|
||||
|
||||
package ghttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/container/gtype"
|
||||
"github.com/gogf/gf/v2/errors/gcode"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/os/gproc"
|
||||
"github.com/gogf/gf/v2/os/gres"
|
||||
"github.com/gogf/gf/v2/text/gstr"
|
||||
)
|
||||
|
||||
// gracefulServer wraps the net/http.Server with graceful reload/restart feature.
|
||||
type gracefulServer struct {
|
||||
server *Server // Belonged server.
|
||||
fd uintptr // File descriptor for passing to the child process when graceful reload.
|
||||
address string // Listening address like:":80", ":8080".
|
||||
httpServer *http.Server // Underlying http.Server.
|
||||
rawListener net.Listener // Underlying net.Listener.
|
||||
rawLnMu sync.RWMutex // Concurrent safety mutex for `rawListener`.
|
||||
listener net.Listener // Wrapped net.Listener.
|
||||
isHttps bool // Is HTTPS.
|
||||
status *gtype.Int // Status of current server. Using `gtype` to ensure concurrent safety.
|
||||
}
|
||||
import "github.com/gogf/gf/v2/net/ghttp/internal/graceful"
|
||||
|
||||
// newGracefulServer creates and returns a graceful http server with a given address.
|
||||
// The optional parameter `fd` specifies the file descriptor which is passed from parent server.
|
||||
func (s *Server) newGracefulServer(address string, fd ...int) *gracefulServer {
|
||||
// Change port to address like: 80 -> :80
|
||||
if gstr.IsNumeric(address) {
|
||||
address = ":" + address
|
||||
}
|
||||
gs := &gracefulServer{
|
||||
server: s,
|
||||
address: address,
|
||||
httpServer: s.newHttpServer(address),
|
||||
status: gtype.NewInt(),
|
||||
}
|
||||
if len(fd) > 0 && fd[0] > 0 {
|
||||
gs.fd = uintptr(fd[0])
|
||||
}
|
||||
if s.config.Listeners != nil {
|
||||
addrArray := gstr.SplitAndTrim(address, ":")
|
||||
addrPort, err := strconv.Atoi(addrArray[len(addrArray)-1])
|
||||
if err == nil {
|
||||
for _, v := range s.config.Listeners {
|
||||
if listenerPort := (v.Addr().(*net.TCPAddr)).Port; listenerPort == addrPort {
|
||||
gs.rawListener = v
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return gs
|
||||
}
|
||||
|
||||
// newHttpServer creates and returns an underlying http.Server with a given address.
|
||||
func (s *Server) newHttpServer(address string) *http.Server {
|
||||
server := &http.Server{
|
||||
Addr: address,
|
||||
Handler: http.HandlerFunc(s.config.Handler),
|
||||
ReadTimeout: s.config.ReadTimeout,
|
||||
WriteTimeout: s.config.WriteTimeout,
|
||||
IdleTimeout: s.config.IdleTimeout,
|
||||
MaxHeaderBytes: s.config.MaxHeaderBytes,
|
||||
ErrorLog: log.New(&errorLogger{logger: s.config.Logger}, "", 0),
|
||||
}
|
||||
server.SetKeepAlivesEnabled(s.config.KeepAlive)
|
||||
return server
|
||||
}
|
||||
|
||||
// Fd retrieves and returns the file descriptor of the current server.
|
||||
// It is available ony in *nix like operating systems like linux, unix, darwin.
|
||||
func (s *gracefulServer) Fd() uintptr {
|
||||
if ln := s.getRawListener(); ln != nil {
|
||||
file, err := ln.(*net.TCPListener).File()
|
||||
if err == nil {
|
||||
return file.Fd()
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// CreateListener creates listener on configured address.
|
||||
func (s *gracefulServer) CreateListener() error {
|
||||
ln, err := s.getNetListener()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.listener = ln
|
||||
s.setRawListener(ln)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateListenerTLS creates listener on configured address with HTTPS.
|
||||
// The parameter `certFile` and `keyFile` specify the necessary certification and key files for HTTPS.
|
||||
// The optional parameter `tlsConfig` specifies the custom TLS configuration.
|
||||
func (s *gracefulServer) CreateListenerTLS(certFile, keyFile string, tlsConfig ...*tls.Config) error {
|
||||
var config *tls.Config
|
||||
if len(tlsConfig) > 0 && tlsConfig[0] != nil {
|
||||
config = tlsConfig[0]
|
||||
} else if s.httpServer.TLSConfig != nil {
|
||||
config = s.httpServer.TLSConfig
|
||||
} else {
|
||||
config = &tls.Config{}
|
||||
}
|
||||
if config.NextProtos == nil {
|
||||
config.NextProtos = []string{"http/1.1"}
|
||||
}
|
||||
var err error
|
||||
if len(config.Certificates) == 0 {
|
||||
config.Certificates = make([]tls.Certificate, 1)
|
||||
if gres.Contains(certFile) {
|
||||
config.Certificates[0], err = tls.X509KeyPair(
|
||||
gres.GetContent(certFile),
|
||||
gres.GetContent(keyFile),
|
||||
)
|
||||
} else {
|
||||
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return gerror.Wrapf(err, `open certFile "%s" and keyFile "%s" failed`, certFile, keyFile)
|
||||
}
|
||||
ln, err := s.getNetListener()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.listener = tls.NewListener(ln, config)
|
||||
s.setRawListener(ln)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve starts the serving with blocking way.
|
||||
func (s *gracefulServer) Serve(ctx context.Context) error {
|
||||
if s.rawListener == nil {
|
||||
return gerror.NewCode(gcode.CodeInvalidOperation, `call CreateListener/CreateListenerTLS before Serve`)
|
||||
}
|
||||
|
||||
action := "started"
|
||||
if s.fd != 0 {
|
||||
action = "reloaded"
|
||||
}
|
||||
s.server.Logger().Infof(
|
||||
ctx,
|
||||
`pid[%d]: %s server %s listening on [%s]`,
|
||||
gproc.Pid(), s.getProto(), action, s.GetListenedAddress(),
|
||||
)
|
||||
s.status.Set(ServerStatusRunning)
|
||||
err := s.httpServer.Serve(s.listener)
|
||||
s.status.Set(ServerStatusStopped)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetListenedAddress retrieves and returns the address string which are listened by current server.
|
||||
func (s *gracefulServer) GetListenedAddress() string {
|
||||
if !gstr.Contains(s.address, FreePortAddress) {
|
||||
return s.address
|
||||
}
|
||||
func (s *Server) newGracefulServer(address string, fd int) *graceful.Server {
|
||||
var (
|
||||
address = s.address
|
||||
listenedPort = s.GetListenedPort()
|
||||
)
|
||||
address = gstr.Replace(address, FreePortAddress, fmt.Sprintf(`:%d`, listenedPort))
|
||||
return address
|
||||
}
|
||||
|
||||
// GetListenedPort retrieves and returns one port which is listened to by current server.
|
||||
// Note that this method is only available if the server is listening on one port.
|
||||
func (s *gracefulServer) GetListenedPort() int {
|
||||
if ln := s.getRawListener(); ln != nil {
|
||||
return ln.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// getProto retrieves and returns the proto string of current server.
|
||||
func (s *gracefulServer) getProto() string {
|
||||
proto := "http"
|
||||
if s.isHttps {
|
||||
proto = "https"
|
||||
}
|
||||
return proto
|
||||
}
|
||||
|
||||
// getNetListener retrieves and returns the wrapped net.Listener.
|
||||
func (s *gracefulServer) getNetListener() (net.Listener, error) {
|
||||
if s.rawListener != nil {
|
||||
return s.rawListener, nil
|
||||
}
|
||||
var (
|
||||
ln net.Listener
|
||||
err error
|
||||
)
|
||||
if s.fd > 0 {
|
||||
f := os.NewFile(s.fd, "")
|
||||
ln, err = net.FileListener(f)
|
||||
if err != nil {
|
||||
err = gerror.Wrap(err, "net.FileListener failed")
|
||||
return nil, err
|
||||
loggerWriter = &errorLogger{logger: s.config.Logger}
|
||||
serverConfig = graceful.ServerConfig{
|
||||
Listeners: s.config.Listeners,
|
||||
Handler: s.config.Handler,
|
||||
ReadTimeout: s.config.ReadTimeout,
|
||||
WriteTimeout: s.config.WriteTimeout,
|
||||
IdleTimeout: s.config.IdleTimeout,
|
||||
GracefulShutdownTimeout: s.config.GracefulTimeout,
|
||||
MaxHeaderBytes: s.config.MaxHeaderBytes,
|
||||
KeepAlive: s.config.KeepAlive,
|
||||
Logger: s.config.Logger,
|
||||
}
|
||||
} else {
|
||||
ln, err = net.Listen("tcp", s.httpServer.Addr)
|
||||
if err != nil {
|
||||
err = gerror.Wrapf(err, `net.Listen address "%s" failed`, s.httpServer.Addr)
|
||||
}
|
||||
}
|
||||
return ln, err
|
||||
}
|
||||
|
||||
// shutdown shuts down the server gracefully.
|
||||
func (s *gracefulServer) shutdown(ctx context.Context) {
|
||||
if s.status.Val() == ServerStatusStopped {
|
||||
return
|
||||
}
|
||||
timeoutCtx, cancelFunc := context.WithTimeout(
|
||||
ctx,
|
||||
time.Duration(s.server.config.GracefulShutdownTimeout)*time.Second,
|
||||
)
|
||||
defer cancelFunc()
|
||||
if err := s.httpServer.Shutdown(timeoutCtx); err != nil {
|
||||
s.server.Logger().Errorf(
|
||||
ctx,
|
||||
"%d: %s server [%s] shutdown error: %v",
|
||||
gproc.Pid(), s.getProto(), s.address, err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// setRawListener sets `rawListener` with given net.Listener.
|
||||
func (s *gracefulServer) setRawListener(ln net.Listener) {
|
||||
s.rawLnMu.Lock()
|
||||
defer s.rawLnMu.Unlock()
|
||||
s.rawListener = ln
|
||||
}
|
||||
|
||||
// setRawListener returns the `rawListener` of current server.
|
||||
func (s *gracefulServer) getRawListener() net.Listener {
|
||||
s.rawLnMu.RLock()
|
||||
defer s.rawLnMu.RUnlock()
|
||||
return s.rawListener
|
||||
}
|
||||
|
||||
// close shuts down the server forcibly.
|
||||
// for graceful shutdown, please use gracefulServer.shutdown.
|
||||
func (s *gracefulServer) close(ctx context.Context) {
|
||||
if s.status.Val() == ServerStatusStopped {
|
||||
return
|
||||
}
|
||||
if err := s.httpServer.Close(); err != nil {
|
||||
s.server.Logger().Errorf(
|
||||
ctx,
|
||||
"%d: %s server [%s] closed error: %v",
|
||||
gproc.Pid(), s.getProto(), s.address, err,
|
||||
)
|
||||
}
|
||||
return graceful.New(address, fd, loggerWriter, serverConfig)
|
||||
}
|
||||
|
@ -14,26 +14,13 @@ import (
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/net/ghttp"
|
||||
"github.com/gogf/gf/v2/net/gtcp"
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
"github.com/gogf/gf/v2/text/gstr"
|
||||
"github.com/gogf/gf/v2/util/guid"
|
||||
)
|
||||
|
||||
func Test_SetSingleCustomListener(t *testing.T) {
|
||||
var (
|
||||
p1 int
|
||||
ln1 net.Listener
|
||||
)
|
||||
for i := 0; i < 1000; i++ {
|
||||
p1, _ = gtcp.GetFreePort()
|
||||
if ln1 == nil {
|
||||
ln1, _ = net.Listen("tcp", fmt.Sprintf(":%d", p1))
|
||||
}
|
||||
if ln1 != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
ln1, _ := net.Listen("tcp", ":0")
|
||||
s := g.Server(guid.S())
|
||||
s.Group("/", func(group *ghttp.RouterGroup) {
|
||||
group.GET("/test", func(r *ghttp.Request) {
|
||||
@ -60,25 +47,8 @@ func Test_SetSingleCustomListener(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_SetMultipleCustomListeners(t *testing.T) {
|
||||
var (
|
||||
p1 int
|
||||
p2 int
|
||||
ln1 net.Listener
|
||||
ln2 net.Listener
|
||||
)
|
||||
for i := 0; i < 1000; i++ {
|
||||
p1, _ = gtcp.GetFreePort()
|
||||
p2, _ = gtcp.GetFreePort()
|
||||
if ln1 == nil {
|
||||
ln1, _ = net.Listen("tcp", fmt.Sprintf(":%d", p1))
|
||||
}
|
||||
if ln2 == nil {
|
||||
ln2, _ = net.Listen("tcp", fmt.Sprintf(":%d", p2))
|
||||
}
|
||||
if ln1 != nil && ln2 != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
ln1, _ := net.Listen("tcp", ":0")
|
||||
ln2, _ := net.Listen("tcp", ":0")
|
||||
s := g.Server(guid.S())
|
||||
s.Group("/", func(group *ghttp.RouterGroup) {
|
||||
group.GET("/test", func(r *ghttp.Request) {
|
||||
@ -95,15 +65,18 @@ func Test_SetMultipleCustomListeners(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
ports := s.GetListenedPorts()
|
||||
t.Assert(len(ports), 2)
|
||||
|
||||
c := g.Client()
|
||||
c.SetPrefix(fmt.Sprintf("http://127.0.0.1:%d", p1))
|
||||
c.SetPrefix(fmt.Sprintf("http://127.0.0.1:%d", ports[0]))
|
||||
|
||||
t.Assert(
|
||||
gstr.Trim(c.GetContent(ctx, "/test")),
|
||||
"test",
|
||||
)
|
||||
|
||||
c.SetPrefix(fmt.Sprintf("http://127.0.0.1:%d", p2))
|
||||
c.SetPrefix(fmt.Sprintf("http://127.0.0.1:%d", ports[1]))
|
||||
|
||||
t.Assert(
|
||||
gstr.Trim(c.GetContent(ctx, "/test")),
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/net/ghttp"
|
||||
"github.com/gogf/gf/v2/net/gtcp"
|
||||
"github.com/gogf/gf/v2/os/gfile"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
@ -90,10 +89,6 @@ func Test_HTTPS_Resource(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_HTTPS_HTTP_Basic(t *testing.T) {
|
||||
var (
|
||||
portHttp, _ = gtcp.GetFreePort()
|
||||
portHttps, _ = gtcp.GetFreePort()
|
||||
)
|
||||
s := g.Server(gtime.TimestampNanoStr())
|
||||
s.Group("/", func(group *ghttp.RouterGroup) {
|
||||
group.GET("/test", func(r *ghttp.Request) {
|
||||
@ -104,8 +99,8 @@ func Test_HTTPS_HTTP_Basic(t *testing.T) {
|
||||
gtest.DataPath("https", "files", "server.crt"),
|
||||
gtest.DataPath("https", "files", "server.key"),
|
||||
)
|
||||
s.SetPort(portHttp)
|
||||
s.SetHTTPSPort(portHttps)
|
||||
s.SetPort(0)
|
||||
s.SetHTTPSPort(0)
|
||||
s.SetDumpRouterMap(false)
|
||||
s.Start()
|
||||
defer s.Shutdown()
|
||||
@ -115,14 +110,14 @@ func Test_HTTPS_HTTP_Basic(t *testing.T) {
|
||||
// HTTP
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
c := g.Client()
|
||||
c.SetPrefix(fmt.Sprintf("http://127.0.0.1:%d", portHttp))
|
||||
c.SetPrefix(fmt.Sprintf("http://127.0.0.1:%d", s.GetListenedPort()))
|
||||
t.Assert(c.GetContent(ctx, "/"), "Not Found")
|
||||
t.Assert(c.GetContent(ctx, "/test"), "test")
|
||||
})
|
||||
// HTTPS
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
c := g.Client()
|
||||
c.SetPrefix(fmt.Sprintf("https://127.0.0.1:%d", portHttps))
|
||||
c.SetPrefix(fmt.Sprintf("https://127.0.0.1:%d", s.GetListenedHTTPSPort()))
|
||||
t.Assert(c.GetContent(ctx, "/"), "Not Found")
|
||||
t.Assert(c.GetContent(ctx, "/test"), "test")
|
||||
})
|
||||
|
378
net/ghttp/internal/graceful/graceful.go
Normal file
378
net/ghttp/internal/graceful/graceful.go
Normal file
@ -0,0 +1,378 @@
|
||||
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
// Package graceful implements graceful reload/restart features for HTTP servers.
|
||||
// It provides the ability to gracefully shutdown or restart HTTP servers without
|
||||
// interrupting existing connections. This is particularly useful for zero-downtime
|
||||
// deployments and maintenance operations.
|
||||
//
|
||||
// The package wraps the standard net/http.Server and provides additional functionality
|
||||
// for graceful server management, including:
|
||||
// - Graceful server shutdown with timeout
|
||||
// - Support for both HTTP and HTTPS servers
|
||||
// - File descriptor inheritance for server reload/restart
|
||||
// - Connection management during shutdown
|
||||
package graceful
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/container/gtype"
|
||||
"github.com/gogf/gf/v2/errors/gcode"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/gproc"
|
||||
"github.com/gogf/gf/v2/os/gres"
|
||||
"github.com/gogf/gf/v2/text/gstr"
|
||||
)
|
||||
|
||||
// ServerStatus is the server status enum type.
|
||||
type ServerStatus = int
|
||||
|
||||
const (
|
||||
// FreePortAddress marks the server listens using random free port.
|
||||
FreePortAddress = ":0"
|
||||
// ServerStatusStopped indicates the server is stopped.
|
||||
ServerStatusStopped ServerStatus = 0
|
||||
// ServerStatusRunning indicates the server is running.
|
||||
ServerStatusRunning ServerStatus = 1
|
||||
)
|
||||
|
||||
// Server wraps the net/http.Server with graceful reload/restart feature.
|
||||
type Server struct {
|
||||
fd uintptr // File descriptor for passing to the child process when graceful reload.
|
||||
address string // Listening address like ":80", ":8080".
|
||||
httpServer *http.Server // Underlying http.Server.
|
||||
rawListener net.Listener // Underlying net.Listener.
|
||||
rawLnMu sync.RWMutex // Concurrent safety mutex for rawListener.
|
||||
listener net.Listener // Wrapped net.Listener with TLS support if necessary.
|
||||
isHttps bool // Whether server is running in HTTPS mode.
|
||||
status *gtype.Int // Server status using gtype for concurrent safety.
|
||||
config ServerConfig // Server configuration.
|
||||
}
|
||||
|
||||
// ServerConfig is the graceful Server configuration manager.
|
||||
type ServerConfig struct {
|
||||
// Listeners specifies the custom listeners.
|
||||
Listeners []net.Listener `json:"listeners"`
|
||||
|
||||
// Handler the handler for HTTP request.
|
||||
Handler func(w http.ResponseWriter, r *http.Request) `json:"-"`
|
||||
|
||||
// ReadTimeout is the maximum duration for reading the entire
|
||||
// request, including the body.
|
||||
//
|
||||
// Because ReadTimeout does not let Handlers make per-request
|
||||
// decisions on each request body's acceptable deadline or
|
||||
// upload rate, most users will prefer to use
|
||||
// ReadHeaderTimeout. It is valid to use them both.
|
||||
ReadTimeout time.Duration `json:"readTimeout"`
|
||||
|
||||
// WriteTimeout is the maximum duration before timing out
|
||||
// writes of the response. It is reset whenever a new
|
||||
// request's header is read. Like ReadTimeout, it does not
|
||||
// let Handlers make decisions on a per-request basis.
|
||||
WriteTimeout time.Duration `json:"writeTimeout"`
|
||||
|
||||
// IdleTimeout is the maximum amount of time to wait for the
|
||||
// next request when keep-alive are enabled. If IdleTimeout
|
||||
// is zero, the value of ReadTimeout is used. If both are
|
||||
// zero, there is no timeout.
|
||||
IdleTimeout time.Duration `json:"idleTimeout"`
|
||||
|
||||
// GracefulShutdownTimeout set the maximum survival time (seconds) before stopping the server.
|
||||
GracefulShutdownTimeout int `json:"gracefulShutdownTimeout"`
|
||||
|
||||
// MaxHeaderBytes controls the maximum number of bytes the
|
||||
// server will read parsing the request header's keys and
|
||||
// values, including the request line. It does not limit the
|
||||
// size of the request body.
|
||||
//
|
||||
// It can be configured in configuration file using string like: 1m, 10m, 500kb etc.
|
||||
// It's 10240 bytes in default.
|
||||
MaxHeaderBytes int `json:"maxHeaderBytes"`
|
||||
|
||||
// KeepAlive enables HTTP keep-alive.
|
||||
KeepAlive bool `json:"keepAlive"`
|
||||
|
||||
// Logger specifies the logger for server.
|
||||
Logger *glog.Logger `json:"logger"`
|
||||
}
|
||||
|
||||
// New creates and returns a graceful http server with a given address.
|
||||
// The optional parameter `fd` specifies the file descriptor which is passed from parent server.
|
||||
func New(
|
||||
address string,
|
||||
fd int,
|
||||
loggerWriter io.Writer,
|
||||
config ServerConfig,
|
||||
) *Server {
|
||||
// Change port to address like: 80 -> :80
|
||||
if gstr.IsNumeric(address) {
|
||||
address = ":" + address
|
||||
}
|
||||
gs := &Server{
|
||||
address: address,
|
||||
httpServer: newHttpServer(address, loggerWriter, config),
|
||||
status: gtype.NewInt(),
|
||||
config: config,
|
||||
}
|
||||
if fd != 0 {
|
||||
gs.fd = uintptr(fd)
|
||||
}
|
||||
if len(config.Listeners) > 0 {
|
||||
addrArray := gstr.SplitAndTrim(address, ":")
|
||||
addrPort, err := strconv.Atoi(addrArray[len(addrArray)-1])
|
||||
if err == nil {
|
||||
for _, v := range config.Listeners {
|
||||
if listenerPort := (v.Addr().(*net.TCPAddr)).Port; listenerPort == addrPort {
|
||||
gs.rawListener = v
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return gs
|
||||
}
|
||||
|
||||
// newHttpServer creates and returns an underlying http.Server with a given address.
|
||||
func newHttpServer(
|
||||
address string,
|
||||
loggerWriter io.Writer,
|
||||
config ServerConfig,
|
||||
) *http.Server {
|
||||
server := &http.Server{
|
||||
Addr: address,
|
||||
Handler: http.HandlerFunc(config.Handler),
|
||||
ReadTimeout: config.ReadTimeout,
|
||||
WriteTimeout: config.WriteTimeout,
|
||||
IdleTimeout: config.IdleTimeout,
|
||||
MaxHeaderBytes: config.MaxHeaderBytes,
|
||||
ErrorLog: log.New(loggerWriter, "", 0),
|
||||
}
|
||||
server.SetKeepAlivesEnabled(config.KeepAlive)
|
||||
return server
|
||||
}
|
||||
|
||||
// Fd retrieves and returns the file descriptor of the current server.
|
||||
// It is available ony in *nix like operating systems like linux, unix, darwin.
|
||||
func (s *Server) Fd() uintptr {
|
||||
if ln := s.getRawListener(); ln != nil {
|
||||
file, err := ln.(*net.TCPListener).File()
|
||||
if err == nil {
|
||||
return file.Fd()
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// CreateListener creates listener on configured address.
|
||||
func (s *Server) CreateListener() error {
|
||||
ln, err := s.getNetListener()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.listener = ln
|
||||
s.setRawListener(ln)
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsHttps returns whether the server is running in HTTPS mode.
|
||||
func (s *Server) IsHttps() bool {
|
||||
return s.isHttps
|
||||
}
|
||||
|
||||
// GetAddress returns the server's configured address.
|
||||
func (s *Server) GetAddress() string {
|
||||
return s.address
|
||||
}
|
||||
|
||||
// SetIsHttps sets the HTTPS mode for the server.
|
||||
// The parameter isHttps determines whether to enable HTTPS mode.
|
||||
func (s *Server) SetIsHttps(isHttps bool) {
|
||||
s.isHttps = isHttps
|
||||
}
|
||||
|
||||
// CreateListenerTLS creates listener on configured address with HTTPS.
|
||||
// The parameter `certFile` and `keyFile` specify the necessary certification and key files for HTTPS.
|
||||
// The optional parameter `tlsConfig` specifies the custom TLS configuration.
|
||||
func (s *Server) CreateListenerTLS(certFile, keyFile string, tlsConfig ...*tls.Config) error {
|
||||
var config *tls.Config
|
||||
if len(tlsConfig) > 0 && tlsConfig[0] != nil {
|
||||
config = tlsConfig[0]
|
||||
} else if s.httpServer.TLSConfig != nil {
|
||||
config = s.httpServer.TLSConfig
|
||||
} else {
|
||||
config = &tls.Config{}
|
||||
}
|
||||
if config.NextProtos == nil {
|
||||
config.NextProtos = []string{"http/1.1"}
|
||||
}
|
||||
var err error
|
||||
if len(config.Certificates) == 0 {
|
||||
config.Certificates = make([]tls.Certificate, 1)
|
||||
if gres.Contains(certFile) {
|
||||
config.Certificates[0], err = tls.X509KeyPair(
|
||||
gres.GetContent(certFile),
|
||||
gres.GetContent(keyFile),
|
||||
)
|
||||
} else {
|
||||
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return gerror.Wrapf(err, `open certFile "%s" and keyFile "%s" failed`, certFile, keyFile)
|
||||
}
|
||||
ln, err := s.getNetListener()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.listener = tls.NewListener(ln, config)
|
||||
s.setRawListener(ln)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve starts the serving with blocking way.
|
||||
func (s *Server) Serve(ctx context.Context) error {
|
||||
if s.rawListener == nil {
|
||||
return gerror.NewCode(gcode.CodeInvalidOperation, `call CreateListener/CreateListenerTLS before Serve`)
|
||||
}
|
||||
|
||||
var action = "started"
|
||||
if s.fd != 0 {
|
||||
action = "reloaded"
|
||||
}
|
||||
s.config.Logger.Infof(
|
||||
ctx,
|
||||
`pid[%d]: %s server %s listening on [%s]`,
|
||||
gproc.Pid(), s.getProto(), action, s.GetListenedAddress(),
|
||||
)
|
||||
s.status.Set(ServerStatusRunning)
|
||||
err := s.httpServer.Serve(s.listener)
|
||||
s.status.Set(ServerStatusStopped)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetListenedAddress retrieves and returns the address string which are listened by current server.
|
||||
func (s *Server) GetListenedAddress() string {
|
||||
if !gstr.Contains(s.address, FreePortAddress) {
|
||||
return s.address
|
||||
}
|
||||
var (
|
||||
address = s.address
|
||||
listenedPort = s.GetListenedPort()
|
||||
)
|
||||
address = gstr.Replace(address, FreePortAddress, fmt.Sprintf(`:%d`, listenedPort))
|
||||
return address
|
||||
}
|
||||
|
||||
// GetListenedPort retrieves and returns one port which is listened to by current server.
|
||||
// Note that this method is only available if the server is listening on one port.
|
||||
func (s *Server) GetListenedPort() int {
|
||||
if ln := s.getRawListener(); ln != nil {
|
||||
return ln.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// Status returns the current status of the server.
|
||||
// It returns either ServerStatusStopped or ServerStatusRunning.
|
||||
func (s *Server) Status() ServerStatus {
|
||||
return s.status.Val()
|
||||
}
|
||||
|
||||
// getProto retrieves and returns the proto string of current server.
|
||||
func (s *Server) getProto() string {
|
||||
proto := "http"
|
||||
if s.isHttps {
|
||||
proto = "https"
|
||||
}
|
||||
return proto
|
||||
}
|
||||
|
||||
// getNetListener retrieves and returns the wrapped net.Listener.
|
||||
func (s *Server) getNetListener() (net.Listener, error) {
|
||||
if s.rawListener != nil {
|
||||
return s.rawListener, nil
|
||||
}
|
||||
var (
|
||||
ln net.Listener
|
||||
err error
|
||||
)
|
||||
if s.fd > 0 {
|
||||
f := os.NewFile(s.fd, "")
|
||||
ln, err = net.FileListener(f)
|
||||
if err != nil {
|
||||
err = gerror.Wrap(err, "net.FileListener failed")
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
ln, err = net.Listen("tcp", s.httpServer.Addr)
|
||||
if err != nil {
|
||||
err = gerror.Wrapf(err, `net.Listen address "%s" failed`, s.httpServer.Addr)
|
||||
}
|
||||
}
|
||||
return ln, err
|
||||
}
|
||||
|
||||
// Shutdown shuts down the server gracefully.
|
||||
func (s *Server) Shutdown(ctx context.Context) {
|
||||
if s.status.Val() == ServerStatusStopped {
|
||||
return
|
||||
}
|
||||
timeoutCtx, cancelFunc := context.WithTimeout(
|
||||
ctx,
|
||||
time.Duration(s.config.GracefulShutdownTimeout)*time.Second,
|
||||
)
|
||||
defer cancelFunc()
|
||||
if err := s.httpServer.Shutdown(timeoutCtx); err != nil {
|
||||
s.config.Logger.Errorf(
|
||||
ctx,
|
||||
"%d: %s server [%s] shutdown error: %v",
|
||||
gproc.Pid(), s.getProto(), s.address, err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// setRawListener sets `rawListener` with given net.Listener.
|
||||
func (s *Server) setRawListener(ln net.Listener) {
|
||||
s.rawLnMu.Lock()
|
||||
defer s.rawLnMu.Unlock()
|
||||
s.rawListener = ln
|
||||
}
|
||||
|
||||
// getRawListener returns the `rawListener` of current server.
|
||||
func (s *Server) getRawListener() net.Listener {
|
||||
s.rawLnMu.RLock()
|
||||
defer s.rawLnMu.RUnlock()
|
||||
return s.rawListener
|
||||
}
|
||||
|
||||
// Close shuts down the server forcibly.
|
||||
// for graceful shutdown, please use Server.shutdown.
|
||||
func (s *Server) Close(ctx context.Context) {
|
||||
if s.status.Val() == ServerStatusStopped {
|
||||
return
|
||||
}
|
||||
if err := s.httpServer.Close(); err != nil {
|
||||
s.config.Logger.Errorf(
|
||||
ctx,
|
||||
"%d: %s server [%s] closed error: %v",
|
||||
gproc.Pid(), s.getProto(), s.address, err,
|
||||
)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user