From c754ec6e97e457540375109158bff93fc5a70f14 Mon Sep 17 00:00:00 2001
From: Brabem <69128477+luhaoling@users.noreply.github.com>
Date: Sun, 18 Feb 2024 20:16:47 +0800
Subject: [PATCH] fix: refactoring code of graceful exits (#1885)

* fix: plant a layer

* fix: print chanal

* fix: print sigs

* fix: print the sigs

* fix: reconstruct exit gracefully

* fix: fix the timeout

* fix: fix the netDone

* fix: fix the process exit

* fix: refactor the elegant startup code

* fix: fix the Signal.Notify

* fix: fix the code

* fix: remove not used header import.

* Update init.go

* fix: fix the InitConfig error

* fix: fix branch name

* fix: fix the signal value

* fix: replace the signal with SIGTERM

* fix: fix the script

* fix: fix the unsolve error

* fix: return the SIGTERM received,shutting down

* fix: fix the tranfer exit error

* fix: fix the error

* fix: replace the SIGnal

* fix: del the error return in tranfer

* fix: fix SIGTERM error

* fix: del the unreachalbe code

* fix: fix the make stop print  error

---------

Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
---
 cmd/openim-api/main.go                        |  44 +++++---
 cmd/openim-cmdutils/main.go                   |   7 +-
 cmd/openim-crontask/main.go                   |   7 +-
 cmd/openim-msggateway/main.go                 |   7 +-
 cmd/openim-msgtransfer/main.go                |   7 +-
 cmd/openim-push/main.go                       |   7 +-
 cmd/openim-rpc/openim-rpc-auth/main.go        |   8 +-
 .../openim-rpc-conversation/main.go           |   7 +-
 cmd/openim-rpc/openim-rpc-friend/main.go      |   7 +-
 cmd/openim-rpc/openim-rpc-group/main.go       |   7 +-
 cmd/openim-rpc/openim-rpc-msg/main.go         |   7 +-
 cmd/openim-rpc/openim-rpc-third/main.go       |   7 +-
 cmd/openim-rpc/openim-rpc-user/main.go        |   7 +-
 internal/msggateway/init.go                   |  21 +---
 internal/msggateway/n_ws_server.go            |  69 ++++++------
 internal/msgtransfer/init.go                  |  91 +++++++---------
 internal/push/callback.go                     |   2 +-
 internal/push/consumer_init.go                |  10 +-
 internal/push/push_rpc_server.go              |  23 ++--
 internal/tools/cron_task.go                   |   2 +-
 pkg/common/ginprometheus/ginprometheus.go     |  22 ++--
 pkg/common/kafka/consumer_group.go            |  31 ++----
 pkg/common/startrpc/start.go                  | 100 +++++++++++-------
 pkg/util/genutil/genutil.go                   |  15 +++
 scripts/githooks/pre-commit.sh                |   2 +-
 scripts/install/openim-push.sh                |   2 +-
 scripts/lib/util.sh                           |   4 +-
 27 files changed, 248 insertions(+), 275 deletions(-)

diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go
index bbb5eb968..c8746bc20 100644
--- a/cmd/openim-api/main.go
+++ b/cmd/openim-api/main.go
@@ -17,6 +17,7 @@ package main
 import (
 	"context"
 	"fmt"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 	"net"
 	"net/http"
 	_ "net/http/pprof"
@@ -46,8 +47,7 @@ func main() {
 	apiCmd.AddPrometheusPortFlag()
 	apiCmd.AddApi(run)
 	if err := apiCmd.Execute(); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
 
@@ -76,12 +76,21 @@ func run(port int, proPort int) error {
 	if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
 		return err
 	}
-
+	var (
+		netDone = make(chan struct{}, 1)
+		netErr  error
+	)
 	router := api.NewGinRouter(client, rdb)
 	if config.Config.Prometheus.Enable {
-		p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
-		p.SetListenAddress(fmt.Sprintf(":%d", proPort))
-		p.Use(router)
+		go func() {
+			p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
+			p.SetListenAddress(fmt.Sprintf(":%d", proPort))
+			if err = p.Use(router); err != nil && err != http.ErrServerClosed {
+				netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", proPort))
+				netDone <- struct{}{}
+			}
+		}()
+
 	}
 
 	var address string
@@ -92,24 +101,31 @@ func run(port int, proPort int) error {
 	}
 
 	server := http.Server{Addr: address, Handler: router}
+
 	go func() {
 		err = server.ListenAndServe()
 		if err != nil && err != http.ErrServerClosed {
-			os.Exit(1)
+			netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr))
+			netDone <- struct{}{}
+
 		}
 	}()
 
 	sigs := make(chan os.Signal, 1)
-	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	<-sigs
+	signal.Notify(sigs, syscall.SIGTERM)
 
 	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
 	defer cancel()
-
-	// graceful shutdown operation.
-	if err := server.Shutdown(ctx); err != nil {
-		return err
+	select {
+	case <-sigs:
+		util.SIGUSR1Exit()
+		err := server.Shutdown(ctx)
+		if err != nil {
+			return errs.Wrap(err, "shutdown err")
+		}
+	case <-netDone:
+		close(netDone)
+		return netErr
 	}
-
 	return nil
 }
diff --git a/cmd/openim-cmdutils/main.go b/cmd/openim-cmdutils/main.go
index 45b324766..f6b788933 100644
--- a/cmd/openim-cmdutils/main.go
+++ b/cmd/openim-cmdutils/main.go
@@ -15,10 +15,8 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -57,7 +55,6 @@ func main() {
 	// openIM clear msg --clearAll
 	msgUtilsCmd.AddCommand(&getCmd.Command, &fixCmd.Command, &clearCmd.Command)
 	if err := msgUtilsCmd.Execute(); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-crontask/main.go b/cmd/openim-crontask/main.go
index 324001690..b284fd773 100644
--- a/cmd/openim-crontask/main.go
+++ b/cmd/openim-crontask/main.go
@@ -15,17 +15,14 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/tools"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
 	cronTaskCmd := cmd.NewCronTaskCmd()
 	if err := cronTaskCmd.Exec(tools.StartTask); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-msggateway/main.go b/cmd/openim-msggateway/main.go
index 5339891c8..ed67b8f5d 100644
--- a/cmd/openim-msggateway/main.go
+++ b/cmd/openim-msggateway/main.go
@@ -15,10 +15,8 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -28,7 +26,6 @@ func main() {
 	msgGatewayCmd.AddPrometheusPortFlag()
 
 	if err := msgGatewayCmd.Exec(); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-msgtransfer/main.go b/cmd/openim-msgtransfer/main.go
index cf1b44a55..84fbbd2ea 100644
--- a/cmd/openim-msgtransfer/main.go
+++ b/cmd/openim-msgtransfer/main.go
@@ -15,10 +15,8 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -26,7 +24,6 @@ func main() {
 	msgTransferCmd.AddPrometheusPortFlag()
 	msgTransferCmd.AddTransferProgressFlag()
 	if err := msgTransferCmd.Exec(); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-push/main.go b/cmd/openim-push/main.go
index 77f75cb4e..e0539fa52 100644
--- a/cmd/openim-push/main.go
+++ b/cmd/openim-push/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/push"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := pushCmd.StartSvr(config.Config.RpcRegisterName.OpenImPushName, push.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-rpc/openim-rpc-auth/main.go b/cmd/openim-rpc/openim-rpc-auth/main.go
index b29efd484..b526c3b86 100644
--- a/cmd/openim-rpc/openim-rpc-auth/main.go
+++ b/cmd/openim-rpc/openim-rpc-auth/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,7 @@ func main() {
 		panic(err.Error())
 	}
 	if err := authCmd.StartSvr(config.Config.RpcRegisterName.OpenImAuthName, auth.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
+
 }
diff --git a/cmd/openim-rpc/openim-rpc-conversation/main.go b/cmd/openim-rpc/openim-rpc-conversation/main.go
index f9ac8cd27..bde191c51 100644
--- a/cmd/openim-rpc/openim-rpc-conversation/main.go
+++ b/cmd/openim-rpc/openim-rpc-conversation/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImConversationName, conversation.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-rpc/openim-rpc-friend/main.go b/cmd/openim-rpc/openim-rpc-friend/main.go
index 82d71d522..8eeb9c8e1 100644
--- a/cmd/openim-rpc/openim-rpc-friend/main.go
+++ b/cmd/openim-rpc/openim-rpc-friend/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImFriendName, friend.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-rpc/openim-rpc-group/main.go b/cmd/openim-rpc/openim-rpc-group/main.go
index 360042f84..a5842ffd1 100644
--- a/cmd/openim-rpc/openim-rpc-group/main.go
+++ b/cmd/openim-rpc/openim-rpc-group/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/group"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImGroupName, group.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-rpc/openim-rpc-msg/main.go b/cmd/openim-rpc/openim-rpc-msg/main.go
index bed57f522..b3895a502 100644
--- a/cmd/openim-rpc/openim-rpc-msg/main.go
+++ b/cmd/openim-rpc/openim-rpc-msg/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImMsgName, msg.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-rpc/openim-rpc-third/main.go b/cmd/openim-rpc/openim-rpc-third/main.go
index 4868ce149..8f390bb6a 100644
--- a/cmd/openim-rpc/openim-rpc-third/main.go
+++ b/cmd/openim-rpc/openim-rpc-third/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImThirdName, third.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/cmd/openim-rpc/openim-rpc-user/main.go b/cmd/openim-rpc/openim-rpc-user/main.go
index a77a2f768..6994ea2b1 100644
--- a/cmd/openim-rpc/openim-rpc-user/main.go
+++ b/cmd/openim-rpc/openim-rpc-user/main.go
@@ -15,12 +15,10 @@
 package main
 
 import (
-	"fmt"
-	"os"
-
 	"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 )
 
 func main() {
@@ -31,7 +29,6 @@ func main() {
 		panic(err.Error())
 	}
 	if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil {
-		fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err)
-		os.Exit(-1)
+		util.ExitWithError(err)
 	}
 }
diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go
index aeba0a24a..321407f7e 100644
--- a/internal/msggateway/init.go
+++ b/internal/msggateway/init.go
@@ -18,9 +18,6 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/OpenIMSDK/tools/utils"
-	"golang.org/x/sync/errgroup"
-
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 )
 
@@ -46,20 +43,12 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
 	}
 
 	hubServer := NewServer(rpcPort, prometheusPort, longServer)
-
-	wg := errgroup.Group{}
-	wg.Go(func() error {
+	netDone := make(chan error)
+	go func() {
 		err = hubServer.Start()
 		if err != nil {
-			return utils.Wrap1(err)
+			netDone <- err
 		}
-		return err
-	})
-
-	wg.Go(func() error {
-		return hubServer.LongConnServer.Run()
-	})
-
-	err = wg.Wait()
-	return err
+	}()
+	return hubServer.LongConnServer.Run(netDone)
 }
diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go
index 01d92b92a..c16da7c64 100644
--- a/internal/msggateway/n_ws_server.go
+++ b/internal/msggateway/n_ws_server.go
@@ -20,12 +20,9 @@ import (
 	"errors"
 	"fmt"
 	"net/http"
-	"os"
-	"os/signal"
 	"strconv"
 	"sync"
 	"sync/atomic"
-	"syscall"
 	"time"
 
 	"github.com/OpenIMSDK/tools/apiresp"
@@ -49,7 +46,7 @@ import (
 )
 
 type LongConnServer interface {
-	Run() error
+	Run(done chan error) error
 	wsHandler(w http.ResponseWriter, r *http.Request)
 	GetUserAllCons(userID string) ([]*Client, bool)
 	GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
@@ -169,23 +166,20 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
 	}, nil
 }
 
-func (ws *WsServer) Run() error {
+func (ws *WsServer) Run(done chan error) error {
 	var (
-		client *Client
-		wg     errgroup.Group
-
-		sigs = make(chan os.Signal, 1)
-		done = make(chan struct{}, 1)
+		client       *Client
+		netErr       error
+		shutdownDone = make(chan struct{}, 1)
 	)
 
 	server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil}
 
-	wg.Go(func() error {
+	go func() {
 		for {
 			select {
-			case <-done:
-				return nil
-
+			case <-shutdownDone:
+				return
 			case client = <-ws.registerChan:
 				ws.registerClient(client)
 			case client = <-ws.unregisterChan:
@@ -194,33 +188,32 @@ func (ws *WsServer) Run() error {
 				ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
 			}
 		}
-	})
-
-	wg.Go(func() error {
-		http.HandleFunc("/", ws.wsHandler)
-		return server.ListenAndServe()
-	})
-
-	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	<-sigs
-
-	go func() {
-		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
-		defer cancel()
-
-		// graceful exit operation for server
-		_ = server.Shutdown(ctx)
-		_ = wg.Wait()
-		close(done)
 	}()
-
+	netDone := make(chan struct{}, 1)
+	go func() {
+		http.HandleFunc("/", ws.wsHandler)
+		err := server.ListenAndServe()
+		if err != nil && err != http.ErrServerClosed {
+			netErr = errs.Wrap(err, "ws start err", server.Addr)
+			close(netDone)
+		}
+	}()
+	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+	defer cancel()
+	var err error
 	select {
-	case <-done:
-		return nil
-
-	case <-time.After(15 * time.Second):
-		return utils.Wrap1(errors.New("timeout exit"))
+	case err = <-done:
+		sErr := server.Shutdown(ctx)
+		if sErr != nil {
+			return errs.Wrap(sErr, "shutdown err")
+		}
+		close(shutdownDone)
+		if err != nil {
+			return err
+		}
+	case <-netDone:
 	}
+	return netErr
 
 }
 
diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go
index 65518c324..062017f44 100644
--- a/internal/msgtransfer/init.go
+++ b/internal/msgtransfer/init.go
@@ -18,23 +18,11 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"net/http"
-	"os"
-	"os/signal"
-	"sync"
-	"syscall"
-	"time"
-
 	"github.com/OpenIMSDK/tools/errs"
-
 	"github.com/OpenIMSDK/tools/log"
-	"github.com/OpenIMSDK/tools/mw"
-	"github.com/prometheus/client_golang/prometheus"
-	"github.com/prometheus/client_golang/prometheus/collectors"
-	"github.com/prometheus/client_golang/prometheus/promhttp"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/credentials/insecure"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 
+	"github.com/OpenIMSDK/tools/mw"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
@@ -42,12 +30,22 @@ import (
 	kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/collectors"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"net/http"
+	"os"
+	"os/signal"
+	"syscall"
 )
 
 type MsgTransfer struct {
 	historyCH      *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
 	historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
-	// modifyCH       *ModifyMsgConsumerHandler          // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
+	ctx            context.Context
+	cancel         context.CancelFunc
 }
 
 func StartTransfer(prometheusPort int) error {
@@ -65,10 +63,6 @@ func StartTransfer(prometheusPort int) error {
 		return err
 	}
 	client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
-	/*
-		client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
-			openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
-				config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/
 	if err != nil {
 		return err
 	}
@@ -109,27 +103,22 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli
 }
 
 func (m *MsgTransfer) Start(prometheusPort int) error {
-	ctx := context.Background()
 	fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
 	if prometheusPort <= 0 {
 		return errs.Wrap(errors.New("prometheusPort not correct"))
 	}
+	m.ctx, m.cancel = context.WithCancel(context.Background())
 
-	var wg sync.WaitGroup
+	var (
+		netDone = make(chan struct{}, 1)
+		netErr  error
+	)
 
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-
-		m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH)
-	}()
-
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-
-		m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH)
-	}()
+	onError := func(ctx context.Context, err error, errInfo string) {
+		log.ZWarn(ctx, errInfo, err)
+	}
+	go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH, onError)
+	go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH, onError)
 
 	if config.Config.Prometheus.Enable {
 		go func() {
@@ -141,30 +130,28 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
 			http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
 			err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
 			if err != nil && err != http.ErrServerClosed {
-				panic(err)
+				netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", prometheusPort))
+				netDone <- struct{}{}
 			}
 		}()
 	}
 
 	sigs := make(chan os.Signal, 1)
-	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	<-sigs
-
-	// graceful close kafka client.
-	go m.historyCH.historyConsumerGroup.Close()
-	go m.historyMongoCH.historyConsumerGroup.Close()
-
-	done := make(chan struct{}, 1)
-	go func() {
-		wg.Wait()
-		close(done)
-	}()
-
+	signal.Notify(sigs, syscall.SIGTERM)
 	select {
-	case <-done:
-		log.ZInfo(context.Background(), "msgtrasfer exit successfully")
-	case <-time.After(15 * time.Second):
-		log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil)
+	case <-sigs:
+		util.SIGUSR1Exit()
+		// graceful close kafka client.
+		m.cancel()
+		m.historyCH.historyConsumerGroup.Close()
+		m.historyMongoCH.historyConsumerGroup.Close()
+
+	case <-netDone:
+		m.cancel()
+		m.historyCH.historyConsumerGroup.Close()
+		m.historyMongoCH.historyConsumerGroup.Close()
+		close(netDone)
+		return netErr
 	}
 
 	return nil
diff --git a/internal/push/callback.go b/internal/push/callback.go
index 99a58fb07..a572fa572 100644
--- a/internal/push/callback.go
+++ b/internal/push/callback.go
@@ -130,7 +130,7 @@ func callbackBeforeSuperGroupOnlinePush(
 	if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
 		return err
 	}
-	return nil
+
 	if len(resp.UserIDs) != 0 {
 		*pushToUserIDs = resp.UserIDs
 	}
diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go
index ceab86165..80478de99 100644
--- a/internal/push/consumer_init.go
+++ b/internal/push/consumer_init.go
@@ -14,7 +14,10 @@
 
 package push
 
-import "context"
+import (
+	"context"
+	"github.com/OpenIMSDK/tools/log"
+)
 
 type Consumer struct {
 	pushCh       ConsumerHandler
@@ -32,6 +35,9 @@ func NewConsumer(pusher *Pusher) (*Consumer, error) {
 }
 
 func (c *Consumer) Start() {
+	onError := func(ctx context.Context, err error, errInfo string) {
+		log.ZWarn(ctx, errInfo, err)
+	}
+	go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh, onError)
 
-	go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh)
 }
diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go
index c1226ce6b..caaf95525 100644
--- a/internal/push/push_rpc_server.go
+++ b/internal/push/push_rpc_server.go
@@ -16,8 +16,6 @@ package push
 
 import (
 	"context"
-	"sync"
-
 	"github.com/OpenIMSDK/tools/utils"
 
 	"google.golang.org/grpc"
@@ -58,23 +56,18 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
 		&groupRpcClient,
 		&msgRpcClient,
 	)
-	var wg sync.WaitGroup
-	wg.Add(2)
-	go func() {
-		defer wg.Done()
-		pbpush.RegisterPushMsgServiceServer(server, &pushServer{
-			pusher: pusher,
-		})
-	}()
+
+	pbpush.RegisterPushMsgServiceServer(server, &pushServer{
+		pusher: pusher,
+	})
+
 	consumer, err := NewConsumer(pusher)
 	if err != nil {
 		return err
 	}
-	go func() {
-		defer wg.Done()
-		consumer.Start()
-	}()
-	wg.Wait()
+
+	consumer.Start()
+
 	return nil
 }
 
diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go
index 40e1c0a87..decc1aa82 100644
--- a/internal/tools/cron_task.go
+++ b/internal/tools/cron_task.go
@@ -64,7 +64,7 @@ func StartTask() error {
 	crontab.Start()
 
 	sigs := make(chan os.Signal, 1)
-	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	signal.Notify(sigs, syscall.SIGTERM)
 	<-sigs
 
 	// stop crontab, Wait for the running task to exit.
diff --git a/pkg/common/ginprometheus/ginprometheus.go b/pkg/common/ginprometheus/ginprometheus.go
index 1ee8f8e34..c2e6bdcca 100644
--- a/pkg/common/ginprometheus/ginprometheus.go
+++ b/pkg/common/ginprometheus/ginprometheus.go
@@ -197,30 +197,32 @@ func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Eng
 }
 
 // SetMetricsPath set metrics paths.
-func (p *Prometheus) SetMetricsPath(e *gin.Engine) {
+func (p *Prometheus) SetMetricsPath(e *gin.Engine) error {
 
 	if p.listenAddress != "" {
 		p.router.GET(p.MetricsPath, prometheusHandler())
-		p.runServer()
+		return p.runServer()
 	} else {
 		e.GET(p.MetricsPath, prometheusHandler())
+		return nil
 	}
 }
 
 // SetMetricsPathWithAuth set metrics paths with authentication.
-func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) {
+func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error {
 
 	if p.listenAddress != "" {
 		p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
-		p.runServer()
+		return p.runServer()
 	} else {
 		e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler())
+		return nil
 	}
 
 }
 
-func (p *Prometheus) runServer() {
-	go p.router.Run(p.listenAddress)
+func (p *Prometheus) runServer() error {
+	return p.router.Run(p.listenAddress)
 }
 
 func (p *Prometheus) getMetrics() []byte {
@@ -366,15 +368,15 @@ func (p *Prometheus) registerMetrics(subsystem string) {
 }
 
 // Use adds the middleware to a gin engine.
-func (p *Prometheus) Use(e *gin.Engine) {
+func (p *Prometheus) Use(e *gin.Engine) error {
 	e.Use(p.HandlerFunc())
-	p.SetMetricsPath(e)
+	return p.SetMetricsPath(e)
 }
 
 // UseWithAuth adds the middleware to a gin engine with BasicAuth.
-func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) {
+func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error {
 	e.Use(p.HandlerFunc())
-	p.SetMetricsPathWithAuth(e, accounts)
+	return p.SetMetricsPathWithAuth(e, accounts)
 }
 
 // HandlerFunc defines handler function for middleware.
diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go
index 5bff50d88..908b8f088 100644
--- a/pkg/common/kafka/consumer_group.go
+++ b/pkg/common/kafka/consumer_group.go
@@ -17,19 +17,16 @@ package kafka
 import (
 	"context"
 	"errors"
-	"strings"
-
+	"fmt"
 	"github.com/IBM/sarama"
 	"github.com/OpenIMSDK/tools/errs"
 	"github.com/OpenIMSDK/tools/log"
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	"strings"
 )
 
 type MConsumerGroup struct {
-	ctx    context.Context
-	cancel context.CancelFunc
-
 	sarama.ConsumerGroup
 	groupID string
 	topics  []string
@@ -57,9 +54,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str
 		return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password)
 	}
 
-	ctx, cancel := context.WithCancel(context.Background())
 	return &MConsumerGroup{
-		ctx, cancel,
 		consumerGroup,
 		groupID,
 		topics,
@@ -70,27 +65,21 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex
 	return GetContextWithMQHeader(cMsg.Headers)
 }
 
-func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
-	log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID)
+func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler, onError func(context.Context, error, string)) {
+	log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID)
 	for {
-		err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler)
-		if errors.Is(err, sarama.ErrClosedConsumerGroup) {
+		err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
+		if errors.Is(err, sarama.ErrClosedConsumerGroup) || errors.Is(err, context.Canceled) {
 			return
 		}
-		if mc.ctx.Err() != nil {
-			return
-		}
-
 		if err != nil {
-			log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
-		}
-		if ctx.Err() != nil {
+			errInfo := fmt.Sprintf("consume err: %v, topic: %v, groupID: %s", err, strings.Join(mc.topics, ", "), mc.groupID)
+			onError(ctx, err, errInfo) // 调用回调函数处理错误
 			return
 		}
 	}
 }
 
-func (mc *MConsumerGroup) Close() {
-	mc.cancel()
-	mc.ConsumerGroup.Close()
+func (mc *MConsumerGroup) Close() error {
+	return mc.ConsumerGroup.Close()
 }
diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go
index f6cda2ffb..4b032e9d6 100644
--- a/pkg/common/startrpc/start.go
+++ b/pkg/common/startrpc/start.go
@@ -15,8 +15,11 @@
 package startrpc
 
 import (
+	"context"
 	"errors"
 	"fmt"
+	"github.com/OpenIMSDK/tools/errs"
+	util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
 	"net"
 	"net/http"
 	"os"
@@ -26,14 +29,10 @@ import (
 	"syscall"
 	"time"
 
-	"github.com/OpenIMSDK/tools/errs"
-
-	"github.com/prometheus/client_golang/prometheus"
-	"github.com/prometheus/client_golang/prometheus/promhttp"
-	"golang.org/x/sync/errgroup"
-
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
 
 	grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
 	"google.golang.org/grpc"
@@ -56,12 +55,13 @@ func Start(
 ) error {
 	fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n",
 		rpcRegisterName, rpcPort, prometheusPort, config.Version)
+	rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))
 	listener, err := net.Listen(
 		"tcp",
-		net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)),
+		rpcTcpAddr,
 	)
 	if err != nil {
-		return errs.Wrap(err, network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))
+		return errs.Wrap(err, "rpc start err", rpcTcpAddr)
 	}
 
 	defer listener.Close()
@@ -108,46 +108,64 @@ func Start(
 		return errs.Wrap(err)
 	}
 
-	var wg errgroup.Group
-
-	wg.Go(func() error {
+	var (
+		netDone    = make(chan struct{}, 2)
+		netErr     error
+		httpServer *http.Server
+	)
+	go func() {
 		if config.Config.Prometheus.Enable && prometheusPort != 0 {
 			metric.InitializeMetrics(srv)
 			// Create a HTTP server for prometheus.
-			httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
-			if err := httpServer.ListenAndServe(); err != nil {
-				fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v PrometheusPort: %d \n\n", err, prometheusPort)
-				os.Exit(-1)
+			httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
+			if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+				netErr = errs.Wrap(err, "prometheus start err", httpServer.Addr)
+				netDone <- struct{}{}
 			}
 		}
-		return nil
-	})
-
-	wg.Go(func() error {
-		return errs.Wrap(srv.Serve(listener))
-	})
-
-	sigs := make(chan os.Signal, 1)
-	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	<-sigs
-
-	var (
-		done = make(chan struct{}, 1)
-		gerr error
-	)
-
-	go func() {
-		once.Do(srv.GracefulStop)
-		gerr = wg.Wait()
-		close(done)
 	}()
 
+	go func() {
+		err := srv.Serve(listener)
+		if err != nil {
+			netErr = errs.Wrap(err, "rpc start err: ", rpcTcpAddr)
+			netDone <- struct{}{}
+		}
+	}()
+
+	sigs := make(chan os.Signal, 1)
+	signal.Notify(sigs, syscall.SIGTERM)
 	select {
-	case <-done:
-		return gerr
-
-	case <-time.After(15 * time.Second):
-		return errs.Wrap(errors.New("timeout exit"))
+	case <-sigs:
+		util.SIGUSR1Exit()
+		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+		defer cancel()
+		if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil {
+			return err
+		}
+		ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
+		defer cancel()
+		err := httpServer.Shutdown(ctx)
+		if err != nil {
+			return errs.Wrap(err, "shutdown err")
+		}
+		return errors.New("SIGTERM EXIT")
+	case <-netDone:
+		close(netDone)
+		return netErr
+	}
+}
+
+func gracefulStopWithCtx(ctx context.Context, f func()) error {
+	done := make(chan struct{}, 1)
+	go func() {
+		f()
+		close(done)
+	}()
+	select {
+	case <-ctx.Done():
+		return errs.Wrap(errors.New("timeout, ctx graceful stop"))
+	case <-done:
+		return nil
 	}
-
 }
diff --git a/pkg/util/genutil/genutil.go b/pkg/util/genutil/genutil.go
index 0948a7c49..f97b803f6 100644
--- a/pkg/util/genutil/genutil.go
+++ b/pkg/util/genutil/genutil.go
@@ -15,6 +15,7 @@
 package genutil
 
 import (
+	"errors"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -39,3 +40,17 @@ func OutDir(path string) (string, error) {
 	outDir += "/"
 	return outDir, nil
 }
+
+func ExitWithError(err error) {
+	if errors.Is(err, errors.New("SIGTERM EXIT")) {
+		os.Exit(-1)
+	}
+	progName := filepath.Base(os.Args[0])
+	fmt.Fprintf(os.Stderr, "\n\n%s exit -1: \n%+v\n\n", progName, err)
+	os.Exit(-1)
+}
+
+func SIGUSR1Exit() {
+	progName := filepath.Base(os.Args[0])
+	fmt.Printf("\n\n%s receive process terminal SIGTERM exit 0\n\n", progName)
+}
diff --git a/scripts/githooks/pre-commit.sh b/scripts/githooks/pre-commit.sh
index cc756c9ad..d8396b560 100644
--- a/scripts/githooks/pre-commit.sh
+++ b/scripts/githooks/pre-commit.sh
@@ -105,7 +105,7 @@ fi
 if [[ ! $local_branch =~ $valid_branch_regex ]]
 then
     printError "There is something wrong with your branch name. Branch names in this project must adhere to this contract: $valid_branch_regex.
-Your commit will be rejected. You should rename your branch to a valid name(feat/name OR bug/name) and try again."
+Your commit will be rejected. You should rename your branch to a valid name(feat/name OR fix/name) and try again."
     printError "For more on this, read on: https://gist.github.com/cubxxw/126b72104ac0b0ca484c9db09c3e5694"
     exit 1
 fi
\ No newline at end of file
diff --git a/scripts/install/openim-push.sh b/scripts/install/openim-push.sh
index ab12735c1..95da16c8a 100755
--- a/scripts/install/openim-push.sh
+++ b/scripts/install/openim-push.sh
@@ -73,7 +73,7 @@ function openim::push::start() {
   
   for (( i=0; i<${#OPENIM_PUSH_PORTS_ARRAY[@]}; i++ )); do
     openim::log::info "start push process, port: ${OPENIM_PUSH_PORTS_ARRAY[$i]}, prometheus port: ${PUSH_PROM_PORTS_ARRAY[$i]}"
-    nohup ${OPENIM_PUSH_BINARY} --port ${OPENIM_PUSH_PORTS_ARRAY[$i]} -c ${OPENIM_PUSH_CONFIG} --prometheus_port ${PUSH_PROM_PORTS_ARRAY[$i]} >${LOG_FILE} 2> >(tee -a "${STDERR_LOG_FILE}" "$TMP_LOG_FILE" >&2) &
+    nohup ${OPENIM_PUSH_BINARY} --port ${OPENIM_PUSH_PORTS_ARRAY[$i]} -c ${OPENIM_PUSH_CONFIG} --prometheus_port ${PUSH_PROM_PORTS_ARRAY[$i]} >> ${LOG_FILE} 2> >(tee -a "${STDERR_LOG_FILE}" "$TMP_LOG_FILE" >&2) &
   done
 
   openim::util::check_process_names ${SERVER_NAME}
diff --git a/scripts/lib/util.sh b/scripts/lib/util.sh
index 7acb1fcdd..1bdb7f640 100755
--- a/scripts/lib/util.sh
+++ b/scripts/lib/util.sh
@@ -486,7 +486,7 @@ openim::util::stop_services_on_ports() {
         local pid=$(echo $line | awk '{print $2}')
         
         # Try to stop the service by killing its process.
-        if kill -10 $pid; then
+        if kill -15 $pid; then
           stopped+=($port)
         else
           not_stopped+=($port)
@@ -561,7 +561,7 @@ openim::util::stop_services_with_name() {
             # If there's a Process ID, it means the service with the name is running.
             if [[ -n $pid ]]; then
                 # Try to stop the service by killing its process.
-                if kill -10 $pid 2>/dev/null; then
+                if kill -15 $pid 2>/dev/null; then
                     stopped_this_time=true
                 fi
             fi