1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 11:18:50 +08:00

enhance from issue #1589

This commit is contained in:
John Guo 2022-02-16 20:51:39 +08:00
parent fa39b9ee54
commit 948cb9ff7c
3 changed files with 77 additions and 58 deletions

View File

@ -9,6 +9,7 @@ package gproc
import (
"context"
"fmt"
"sync"
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/errors/gerror"
@ -20,10 +21,10 @@ import (
// MsgRequest is the request structure for process communication.
type MsgRequest struct {
SendPid int // Sender PID.
RecvPid int // Receiver PID.
Group string // Message group name.
Data []byte // Request data.
SenderPid int // Sender PID.
ReceiverPid int // Receiver PID.
Group string // Message group name.
Data []byte // Request data.
}
// MsgResponse is the response structure for process communication.
@ -47,37 +48,11 @@ var (
// commPidFolderPath specifies the folder path storing pid to port mapping files.
commPidFolderPath string
// commPidFolderPathOnce is used for lazy calculation for `commPidFolderPath` is necessary.
commPidFolderPathOnce sync.Once
)
func init() {
availablePaths := []string{
"/var/tmp",
"/var/run",
}
if homePath, _ := gfile.Home(); homePath != "" {
availablePaths = append(availablePaths, gfile.Join(homePath, ".config"))
}
availablePaths = append(availablePaths, gfile.Temp())
for _, availablePath := range availablePaths {
checkPath := gfile.Join(availablePath, defaultFolderNameForProcComm)
if !gfile.Exists(checkPath) && gfile.Mkdir(checkPath) != nil {
continue
}
if gfile.IsWritable(checkPath) {
commPidFolderPath = checkPath
break
}
}
if commPidFolderPath == "" {
intlog.Errorf(
context.TODO(),
`cannot find available folder for storing pid to port mapping files in paths: %+v, process communication feature will fail`,
availablePaths,
)
}
}
// getConnByPid creates and returns a TCP connection for specified pid.
func getConnByPid(pid int) (*gtcp.PoolConn, error) {
port := getPortByPid(pid)
@ -94,10 +69,49 @@ func getConnByPid(pid int) (*gtcp.PoolConn, error) {
// getPortByPid returns the listening port for specified pid.
// It returns 0 if no port found for the specified pid.
func getPortByPid(pid int) int {
return gconv.Int(gfile.GetContentsWithCache(getCommFilePath(pid)))
path := getCommFilePath(pid)
if path == "" {
return 0
}
return gconv.Int(gfile.GetContentsWithCache(path))
}
// getCommFilePath returns the pid to port mapping file path for given pid.
func getCommFilePath(pid int) string {
return gfile.Join(commPidFolderPath, gconv.String(pid))
path, err := getCommPidFolderPath()
if err != nil {
intlog.Errorf(context.TODO(), `%+v`, err)
return ""
}
return gfile.Join(path, gconv.String(pid))
}
// getCommPidFolderPath retrieves and returns the available directory for storing pid mapping files.
func getCommPidFolderPath() (folderPath string, err error) {
commPidFolderPathOnce.Do(func() {
availablePaths := []string{
"/var/tmp",
"/var/run",
}
if path, _ := gfile.Home(".config"); path != "" {
availablePaths = append(availablePaths, path)
}
availablePaths = append(availablePaths, gfile.Temp())
for _, availablePath := range availablePaths {
checkPath := gfile.Join(availablePath, defaultFolderNameForProcComm)
if !gfile.Exists(checkPath) && gfile.Mkdir(checkPath) != nil {
continue
}
if gfile.IsWritable(checkPath) {
commPidFolderPath = checkPath
break
}
}
err = gerror.Newf(
`cannot find available folder for storing pid to port mapping files in paths: %+v`,
availablePaths,
)
})
folderPath = commPidFolderPath
return
}

View File

@ -13,6 +13,7 @@ import (
"github.com/gogf/gf/v2/container/gqueue"
"github.com/gogf/gf/v2/container/gtype"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/internal/json"
"github.com/gogf/gf/v2/net/gtcp"
"github.com/gogf/gf/v2/os/gfile"
@ -51,26 +52,27 @@ func Receive(group ...string) *MsgRequest {
// receiveTcpListening scans local for available port and starts listening.
func receiveTcpListening() {
var listen *net.TCPListener
// Scan the available port for listening.
for i := defaultTcpPortForProcComm; ; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", i))
if err != nil {
continue
}
listen, err = net.ListenTCP("tcp", addr)
if err != nil {
continue
}
// Save the port to the pid file.
if err := gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)); err != nil {
panic(err)
}
break
var (
listen *net.TCPListener
conn net.Conn
port = gtcp.MustGetFreePort()
address = fmt.Sprintf("127.0.0.1:%d", port)
)
tcpAddress, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
panic(gerror.Wrap(err, `net.ResolveTCPAddr failed`))
}
listen, err = net.ListenTCP("tcp", tcpAddress)
if err != nil {
panic(gerror.Wrapf(err, `net.ListenTCP failed for address "%s"`, address))
}
// Save the port to the pid file.
if err = gfile.PutContents(getCommFilePath(Pid()), gconv.String(port)); err != nil {
panic(err)
}
// Start listening.
for {
if conn, err := listen.Accept(); err != nil {
if conn, err = listen.Accept(); err != nil {
glog.Error(context.TODO(), err)
} else if conn != nil {
go receiveTcpHandler(gtcp.NewConnByNetConn(conn))
@ -96,9 +98,12 @@ func receiveTcpHandler(conn *gtcp.Conn) {
if err = json.UnmarshalUseNumber(buffer, msg); err != nil {
continue
}
if msg.RecvPid != Pid() {
if msg.ReceiverPid != Pid() {
// Not mine package.
response.Message = fmt.Sprintf("receiver pid not match, target: %d, current: %d", msg.RecvPid, Pid())
response.Message = fmt.Sprintf(
"receiver pid not match, target: %d, current: %d",
msg.ReceiverPid, Pid(),
)
} else if v := commReceiveQueues.Get(msg.Group); v == nil {
// Group check.
response.Message = fmt.Sprintf("group [%s] does not exist", msg.Group)

View File

@ -17,10 +17,10 @@ import (
// Send sends data to specified process of given pid.
func Send(pid int, data []byte, group ...string) error {
msg := MsgRequest{
SendPid: Pid(),
RecvPid: pid,
Group: defaultGroupNameForProcComm,
Data: data,
SenderPid: Pid(),
ReceiverPid: pid,
Group: defaultGroupNameForProcComm,
Data: data,
}
if len(group) > 0 {
msg.Group = group[0]