refactor: unified naming for module startup functions.

This commit is contained in:
Gordon 2024-03-19 11:52:38 +08:00
parent 817153be37
commit 993e97019a
25 changed files with 83 additions and 61 deletions

6
go.mod
View File

@ -5,7 +5,7 @@ go 1.19
require (
firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.56
github.com/OpenIMSDK/tools v0.0.43
github.com/OpenIMSDK/tools v0.0.46-alpha.4
github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.18.0
@ -91,6 +91,9 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect
github.com/klauspost/compress v1.17.4 // indirect
@ -144,6 +147,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect
gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/gorm v1.25.8 // indirect
stathat.com/c/consistent v1.0.0 // indirect
)

12
go.sum
View File

@ -20,8 +20,8 @@ github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/OpenIMSDK/protocol v0.0.56 h1:mbVFyDBachEsmJLfYW5AU1z2KL8AUEpoHG8RPCIxjgg=
github.com/OpenIMSDK/protocol v0.0.56/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.43 h1:ijDnAvEyu4jkWGiklidc/QQulqzqKkxEhIpIjznd6dY=
github.com/OpenIMSDK/tools v0.0.43/go.mod h1:GMOTbvwdexVk1KDeVuFSAr4dj9O6qVVlw7i+gepy2r4=
github.com/OpenIMSDK/tools v0.0.46-alpha.4 h1:fkr6T0rxQPJZMakR95zLDTXPwO0V1qTMGGPmhSXEqpg=
github.com/OpenIMSDK/tools v0.0.46-alpha.4/go.mod h1:kGgLtr1egGJnnGaBsF7SNcylx7KB4Md5MyRqz+GUMS0=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@ -198,6 +198,12 @@ github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
@ -531,6 +537,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.25.8 h1:WAGEZ/aEcznN4D03laj8DKnehe1e9gYQAjW8xyPRdeo=
gorm.io/gorm v1.25.8/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -48,21 +48,7 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.WrapMsg(err, func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(gz)
gzipBuffer := bytes.Buffer{}
gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error writing data")
}
if err := gz.Close(); err != nil {
return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
}
return gzipBuffer.Bytes(), nil
}"GzipCompressor.Compress: writing to gzip writer failed")
return nil, errs.WrapMsg(err, "GzipCompressor.Compress: writing to gzip writer failed")
}
if err := gz.Close(); err != nil {

View File

@ -45,7 +45,7 @@ func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistr
}
func (s *Server) Start(conf *config.GlobalConfig) error {
return startrpc.Start(
return startrpc.Start(context.Background(),
s.rpcPort,
conf.RpcRegisterName.OpenImMessageGatewayName,
s.prometheusPort,

View File

@ -40,7 +40,7 @@ func StartTask(config *config.GlobalConfig) error {
msgTool.convertTools()
rdb, err := cache.NewRedis(config)
rdb, err := cache.NewRedis(&config.Redis)
if err != nil {
return err
}

View File

@ -65,11 +65,11 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle
}
func InitMsgTool(config *config.GlobalConfig) (*MsgTool, error) {
rdb, err := cache.NewRedis(config)
rdb, err := cache.NewRedis(&config.Redis)
if err != nil {
return nil, err
}
mongo, err := unrelation.NewMongo(config)
mongo, err := unrelation.NewMongo(&config.Mongo)
if err != nil {
return nil, err
}
@ -116,7 +116,7 @@ func InitMsgTool(config *config.GlobalConfig) (*MsgTool, error) {
cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
ctxTx,
)
msgRpcClient := rpcclient.NewMessageRpcClient(discov, config)
msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.RpcRegisterName.OpenImMsgName)
msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
return msgTool, nil

View File

@ -16,6 +16,7 @@ package cmd
import (
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/spf13/cobra"
"github.com/openimsdk/open-im-server/v3/internal/api"
@ -26,7 +27,7 @@ type ApiCmd struct {
}
func NewApiCmd(name string) *ApiCmd {
ret := &ApiCmd{RootCmd: NewRootCmd(name)}
ret := &ApiCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name)}
ret.SetRootCmdPt(ret)
ret.addPreRun()
ret.addRunE()

View File

@ -15,6 +15,7 @@
package cmd
import (
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/spf13/cobra"
"github.com/openimsdk/open-im-server/v3/internal/tools"
@ -27,7 +28,7 @@ type CronTaskCmd struct {
}
func NewCronTaskCmd(name string) *CronTaskCmd {
ret := &CronTaskCmd{RootCmd: NewRootCmd(name, WithCronTaskLogName()),
ret := &CronTaskCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name, WithCronTaskLogName()),
initFunc: tools.StartTask}
ret.addRunE()
ret.SetRootCmdPt(ret)

View File

@ -15,6 +15,7 @@
package cmd
import (
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"log"
"github.com/OpenIMSDK/protocol/constant"
@ -28,7 +29,7 @@ type MsgGatewayCmd struct {
}
func NewMsgGatewayCmd(name string) *MsgGatewayCmd {
ret := &MsgGatewayCmd{NewRootCmd(name)}
ret := &MsgGatewayCmd{NewRootCmd(genutil.GetProcessName(), name)}
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret

View File

@ -16,6 +16,7 @@ package cmd
import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/OpenIMSDK/protocol/constant"
"github.com/spf13/cobra"
@ -28,7 +29,7 @@ type MsgTransferCmd struct {
}
func NewMsgTransferCmd(name string) *MsgTransferCmd {
ret := &MsgTransferCmd{NewRootCmd(name)}
ret := &MsgTransferCmd{NewRootCmd(genutil.GetProcessName(), name)}
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret

View File

@ -33,6 +33,7 @@ type RootCmdPt interface {
type RootCmd struct {
Command cobra.Command
Name string
processName string
port int
prometheusPort int
cmdItf RootCmdPt
@ -59,8 +60,8 @@ func WithLogName(logName string) func(*CmdOpts) {
}
}
func NewRootCmd(name string, opts ...func(*CmdOpts)) *RootCmd {
rootCmd := &RootCmd{Name: name, config: config.NewGlobalConfig()}
func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd {
rootCmd := &RootCmd{processName: processName, Name: name, config: config.NewGlobalConfig()}
cmd := cobra.Command{
Use: "Start openIM application",
Short: fmt.Sprintf(`Start %s `, name),
@ -104,7 +105,7 @@ func (rc *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
logConfig := rc.config.Log
return log.InitFromConfig(
err := log.InitFromConfig(
cmdOpts.loggerPrefixName,
rc.Name,
@ -115,6 +116,11 @@ func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
logConfig.RemainRotationCount,
logConfig.RotationTime,
)
if err != nil {
return errs.Wrap(err)
}
return errs.Wrap(log.InitConsoleLogger(rc.Name, logConfig.RemainLogLevel, logConfig.IsJson))
}
func defaultCmdOpts() *CmdOpts {

View File

@ -15,11 +15,12 @@
package cmd
import (
"context"
"errors"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/spf13/cobra"
"google.golang.org/grpc"
@ -33,10 +34,12 @@ type RpcCmd struct {
*RootCmd
RpcRegisterName string
initFunc rpcInitFuc
ctx context.Context
}
func NewRpcCmd(name string, initFunc rpcInitFuc) *RpcCmd {
ret := &RpcCmd{RootCmd: NewRootCmd(name), initFunc: initFunc}
ret := &RpcCmd{RootCmd: NewRootCmd(genutil.GetProcessName(), name), initFunc: initFunc}
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.addPreRun()
ret.addRunE()
ret.SetRootCmdPt(ret)
@ -69,7 +72,7 @@ func (a *RpcCmd) StartSvr(name string, rpcFn func(config *config2.GlobalConfig,
if a.GetPortFlag() == 0 {
return errs.Wrap(errors.New("port is required"))
}
return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), a.config, rpcFn)
return startrpc.Start(a.ctx, a.GetPortFlag(), name, a.GetPrometheusPortFlag(), a.config, rpcFn)
}
func (a *RpcCmd) GetPortFromConfig(portType string) int {

View File

@ -391,7 +391,7 @@ func (g *GroupCacheRedis) GetGroupOwner(ctx context.Context, groupID string) (*r
return nil, err
}
if len(members) == 0 {
return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("group %s owner not found", groupID))
return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("group %s owner not found", groupID))
}
return members[0], nil
}

View File

@ -78,7 +78,7 @@ func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) {
if err != nil {
errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", redisConf.Address, redisConf.Username,
redisConf.Password, redisConf.ClusterMode, redisConf.EnablePipeline)
return nil, errs.Wrap(err, errMsg)
return nil, errs.WrapMsg(err, errMsg)
}
redisClient = rdb
return rdb, err

View File

@ -168,7 +168,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
err = json.Unmarshal([]byte(v), &t)
if err != nil {
errInfo := fmt.Sprintf("cache json.Unmarshal failed, key:%s, value:%s, expire:%s", key, v, expire)
return t, errs.Wrap(err, errInfo)
return t, errs.WrapMsg(err, errInfo)
}
return t, nil

View File

@ -146,9 +146,9 @@ func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheMo
}
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) {
cacheModel := cache.NewMsgCacheModel(rdb, config)
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
msgDocModel := unrelation.NewMsgMongoDriver(database)
return NewCommonMsgDatabase(msgDocModel, cacheModel, config)
return NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka)
}
type commonMsgDatabase struct {

View File

@ -221,7 +221,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
}
md5Sum := md5.Sum([]byte(strings.Join([]string{uploadInfo.ETag}, partSeparator)))
if md5val := hex.EncodeToString(md5Sum[:]); md5val != upload.Hash {
return nil, errs.ErrArgs.Wrap(fmt.Sprintf("md5 mismatching %s != %s", md5val, upload.Hash))
return nil, errs.ErrArgs.WrapMsg(fmt.Sprintf("md5 mismatching %s != %s", md5val, upload.Hash))
}
// Prevents concurrent operations at this time that cause files to be overwritten
copyInfo, err := c.impl.CopyObject(ctx, uploadInfo.Key, upload.Key+"."+c.UUID())

View File

@ -56,7 +56,7 @@ func NewMongo(mongoConf *config.Mongo) (*Mongo, error) {
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil {
if err = mongoClient.Ping(ctx, nil); err != nil {
return nil, errs.Wrap(err, uri)
return nil, errs.WrapMsg(err, uri)
}
return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil
}
@ -65,7 +65,7 @@ func NewMongo(mongoConf *config.Mongo) (*Mongo, error) {
continue
}
}
return nil, errs.Wrap(err, uri)
return nil, errs.WrapMsg(err, uri)
}
func buildMongoURI(mongoConf *config.Mongo) string {

View File

@ -130,7 +130,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, doc
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}},
)
if err != nil {
return errs.Wrap(err, fmt.Sprintf("docID is %s, seqIndex is %d", docID, seqIndex))
return errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqIndex is %d", docID, seqIndex))
}
return nil
}
@ -156,12 +156,12 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex(
findOpts,
)
if err != nil {
return nil, errs.Wrap(err, fmt.Sprintf("conversationID is %s", conversationID))
return nil, errs.WrapMsg(err, fmt.Sprintf("conversationID is %s", conversationID))
}
var msgs []table.MsgDocModel
err = cursor.All(ctx, &msgs)
if err != nil {
return nil, errs.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
return nil, errs.WrapMsg(err, fmt.Sprintf("cursor is %s", cursor.Current.String()))
}
if len(msgs) > 0 {
return &msgs[0], nil
@ -212,7 +212,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st
}
_, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates)
if err != nil {
return errs.Wrap(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
}
return nil
}
@ -279,7 +279,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
defer cur.Close(ctx)
var msgDocModel []table.MsgDocModel
if err := cur.All(ctx, &msgDocModel); err != nil {
return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
}
if len(msgDocModel) == 0 {
return nil, errs.Wrap(mongo.ErrNoDocuments)
@ -306,14 +306,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
}
data, err := json.Marshal(&revokeContent)
if err != nil {
return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
}
elem := sdkws.NotificationElem{
Detail: string(data),
}
content, err := json.Marshal(&elem)
if err != nil {
return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
return nil, errs.WrapMsg(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
}
msg.Msg.ContentType = constant.MsgRevokeNotification
msg.Msg.Content = string(content)
@ -326,7 +326,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) {
count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID})
if err != nil {
return false, errs.Wrap(err, fmt.Sprintf("docID is %s", docID))
return false, errs.WrapMsg(err, fmt.Sprintf("docID is %s", docID))
}
return count > 0, nil
}
@ -351,7 +351,7 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID st
updates = append(updates, updateModel)
}
_, err := m.MsgCollection.BulkWrite(ctx, updates)
return errs.Wrap(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
return errs.WrapMsg(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
}
// RangeUserSendCount

View File

@ -109,7 +109,7 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
}
if len(connections) == 0 {
return nil, errs.Wrap(errors.New("no connections found for service"), "serviceName", serviceName)
return nil, errs.WrapMsg(errors.New("no connections found for service"), "serviceName", serviceName)
}
return connections, nil
}
@ -120,7 +120,7 @@ func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...g
&cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)
address, ok := addresses[serviceName]
if !ok {
return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName)
return nil, errs.WrapMsg(errors.New("unknown service name"), "serviceName", serviceName)
}
var result string
for _, addr := range address {

View File

@ -23,8 +23,6 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
@ -42,7 +40,7 @@ func NewZookeeperDiscoveryRegister(zkConf *config.Zookeeper) (discoveryregistry.
openkeeper.WithUserNameAndPassword(username, password),
openkeeper.WithRoundRobin(),
openkeeper.WithTimeout(10),
openkeeper.WithLogger(log.NewZkLogger()),
//openkeeper.WithLogger(log.NewZkLogger()),
)
if err != nil {
uriFormat := "address:%s, username:%s, password:%s, schema:%s."
@ -51,7 +49,7 @@ func NewZookeeperDiscoveryRegister(zkConf *config.Zookeeper) (discoveryregistry.
zkConf.Username,
zkConf.Password,
zkConf.Schema)
return nil, errs.Wrap(err, errInfo)
return nil, errs.WrapMsg(err, errInfo)
}
return zk, nil
}

View File

@ -118,7 +118,7 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
return nil
}
log.ZWarn(ctx, "callback network failed", err, "url", url, "input", input)
return errs.ErrNetwork.Wrap(err.Error())
return errs.ErrNetwork.WrapMsg(err.Error())
}
if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {

View File

@ -52,7 +52,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str
SetupTLSConfig(consumerGroupConfig, tlsConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
if err != nil {
return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, consumerConfig.UserName, consumerConfig.Password)
return nil, errs.WrapMsg(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, consumerConfig.UserName, consumerConfig.Password)
}
return &MConsumerGroup{

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/log"
"net"
"net/http"
"os"
@ -46,6 +47,7 @@ import (
// Start rpc server.
func Start(
ctx context.Context,
rpcPort int,
rpcRegisterName string,
prometheusPort int,
@ -53,8 +55,8 @@ func Start(
rpcFn func(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption,
) error {
fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n",
rpcRegisterName, rpcPort, prometheusPort, config2.Version)
log.CInfo(ctx, "rpc server starting", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort,
"prometheusPort", prometheusPort)
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Rpc.ListenIP), strconv.Itoa(rpcPort))
listener, err := net.Listen(
"tcp",
@ -80,7 +82,7 @@ func Start(
var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics
if config.Prometheus.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, config)
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, &config.RpcRegisterName)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))

View File

@ -18,6 +18,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/OpenIMSDK/tools/errs"
@ -55,6 +56,16 @@ func ExitWithError(err error) {
fmt.Fprintf(os.Stderr, "%s exit -1: %+v\n", progName, err)
os.Exit(-1)
}
func GetProcessName() string {
args := os.Args
if len(args) > 0 {
segments := strings.Split(args[0], "/")
if len(segments) > 0 {
return segments[len(segments)-1]
}
}
return ""
}
func SIGTERMExit() {
progName := filepath.Base(os.Args[0])