mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-06-26 06:15:20 +08:00
Merge branch 'openimsdk:main' into feat/etcd-pwd
This commit is contained in:
commit
fbb63d91b4
17
cmd/main.go
17
cmd/main.go
@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -39,7 +38,6 @@ import (
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/system/program"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/openimsdk/tools/utils/network"
|
||||
"github.com/spf13/viper"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
@ -250,23 +248,12 @@ func (x *cmds) run(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
ip, err := network.GetLocalIP()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
||||
if err != nil {
|
||||
return fmt.Errorf("prometheus listen %d error %w", port, err)
|
||||
}
|
||||
defer listener.Close()
|
||||
log.ZDebug(ctx, "prometheus start", "addr", listener.Addr())
|
||||
target, err := json.Marshal(prommetrics.BuildDefaultTarget(ip, listener.Addr().(*net.TCPAddr).Port))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := standalone.GetKeyValue().SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
err := prommetrics.Start(listener)
|
||||
if err == nil {
|
||||
@ -342,7 +329,7 @@ func (x *cmds) run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
|
||||
func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error) {
|
||||
name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
|
||||
if index := strings.Index(name, "."); index >= 0 {
|
||||
name = name[:index]
|
||||
@ -352,7 +339,7 @@ func putCmd[C any](cmd *cmds, block bool, fn func(ctx context.Context, config *C
|
||||
if err := cmd.parseConf(&conf); err != nil {
|
||||
return err
|
||||
}
|
||||
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar())
|
||||
return fn(ctx, &conf, standalone.GetSvcDiscoveryRegistry(), standalone.GetServiceRegistrar())
|
||||
})
|
||||
}
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -13,7 +13,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.12
|
||||
github.com/openimsdk/tools v0.0.50-alpha.84
|
||||
github.com/openimsdk/tools v0.0.50-alpha.85
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
|
4
go.sum
4
go.sum
@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b
|
||||
github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
|
||||
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.84 h1:jN60Ys/0edZjL/TDmm/5VSJFP4pGYRipkWqhILJbq/8=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.84/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.85 h1:OqTUYx6r7Zp/eH8FKB08XeNjPV405TUIG9QT6QQ+F+s=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.85/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
|
@ -39,7 +39,7 @@ type Config struct {
|
||||
Index conf.Index
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
|
||||
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, int(config.Index))
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -6,35 +6,29 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/tools/apiresp"
|
||||
"github.com/openimsdk/tools/discovery"
|
||||
"github.com/openimsdk/tools/discovery/etcd"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
type PrometheusDiscoveryApi struct {
|
||||
config *Config
|
||||
client *clientv3.Client
|
||||
kv discovery.KeyValue
|
||||
}
|
||||
|
||||
func NewPrometheusDiscoveryApi(config *Config, client discovery.Conn) *PrometheusDiscoveryApi {
|
||||
func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi {
|
||||
api := &PrometheusDiscoveryApi{
|
||||
config: config,
|
||||
}
|
||||
if config.Discovery.Enable == conf.ETCD {
|
||||
api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
|
||||
kv: client,
|
||||
}
|
||||
return api
|
||||
}
|
||||
|
||||
func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
||||
value, err := p.kv.GetKey(c, prommetrics.BuildDiscoveryKey(key))
|
||||
value, err := p.kv.GetKeyWithPrefix(c, prommetrics.BuildDiscoveryKeyPrefix(key))
|
||||
if err != nil {
|
||||
if errors.Is(err, discovery.ErrNotSupportedKeyValue) {
|
||||
if errors.Is(err, discovery.ErrNotSupported) {
|
||||
c.JSON(http.StatusOK, []struct{}{})
|
||||
return
|
||||
}
|
||||
@ -46,10 +40,17 @@ func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) {
|
||||
return
|
||||
}
|
||||
var resp prommetrics.RespTarget
|
||||
if err := json.Unmarshal(value, &resp); err != nil {
|
||||
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
|
||||
return
|
||||
for i := range value {
|
||||
var tmp prommetrics.Target
|
||||
if err = json.Unmarshal(value[i], &tmp); err != nil {
|
||||
apiresp.GinError(c, errs.WrapMsg(err, "json unmarshal err"))
|
||||
return
|
||||
}
|
||||
|
||||
resp.Targets = append(resp.Targets, tmp.Target)
|
||||
resp.Labels = tmp.Labels // default label is fixed. See prommetrics.BuildDefaultTarget
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, []*prommetrics.RespTarget{&resp})
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func prommetricsGin() gin.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin.Engine, error) {
|
||||
func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) {
|
||||
authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -39,7 +39,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// Start run ws server.
|
||||
func Start(ctx context.Context, conf *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", runtimeenv.RuntimeEnvironment(),
|
||||
"rpcPorts", conf.MsgGateway.RPC.Ports,
|
||||
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
|
||||
|
124
internal/msgtransfer/callback.go
Normal file
124
internal/msgtransfer/callback.go
Normal file
@ -0,0 +1,124 @@
|
||||
package msgtransfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
)
|
||||
|
||||
func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq {
|
||||
return cbapi.CommonCallbackReq{
|
||||
SendID: msg.SendID,
|
||||
ServerMsgID: msg.ServerMsgID,
|
||||
CallbackCommand: command,
|
||||
ClientMsgID: msg.ClientMsgID,
|
||||
OperationID: mcontext.GetOperationID(ctx),
|
||||
SenderPlatformID: msg.SenderPlatformID,
|
||||
SenderNickname: msg.SenderNickname,
|
||||
SessionType: msg.SessionType,
|
||||
MsgFrom: msg.MsgFrom,
|
||||
ContentType: msg.ContentType,
|
||||
Status: msg.Status,
|
||||
SendTime: msg.SendTime,
|
||||
CreateTime: msg.CreateTime,
|
||||
AtUserIDList: msg.AtUserIDList,
|
||||
SenderFaceURL: msg.SenderFaceURL,
|
||||
Content: GetContent(msg),
|
||||
Seq: uint32(msg.Seq),
|
||||
Ex: msg.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func GetContent(msg *sdkws.MsgData) string {
|
||||
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
|
||||
var tips sdkws.TipsComm
|
||||
_ = proto.Unmarshal(msg.Content, &tips)
|
||||
content := tips.JsonDetail
|
||||
return content
|
||||
} else {
|
||||
return string(msg.Content)
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||
if msg.ContentType == constant.Typing {
|
||||
return
|
||||
}
|
||||
if !filterAfterMsg(msg, after) {
|
||||
return
|
||||
}
|
||||
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||
RecvID: msg.RecvID,
|
||||
}
|
||||
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
||||
}
|
||||
|
||||
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
|
||||
if msg.ContentType == constant.Typing {
|
||||
return
|
||||
}
|
||||
if !filterAfterMsg(msg, after) {
|
||||
return
|
||||
}
|
||||
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
|
||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
|
||||
GroupID: msg.GroupID,
|
||||
}
|
||||
|
||||
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
|
||||
}
|
||||
|
||||
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
|
||||
keyMsgData := apistruct.KeyMsgData{
|
||||
SendID: msg.SendID,
|
||||
RecvID: msg.RecvID,
|
||||
GroupID: msg.GroupID,
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
|
||||
}
|
||||
}
|
||||
|
||||
func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool {
|
||||
return filterMsg(msg, after.AttentionIds, after.DeniedTypes)
|
||||
}
|
||||
|
||||
func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool {
|
||||
// According to the attentionIds configuration, only some users are sent
|
||||
if len(attentionIds) != 0 && !datautil.Contain(msg.RecvID, attentionIds...) {
|
||||
return false
|
||||
}
|
||||
|
||||
if defaultDeniedTypes(msg.ContentType) {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func defaultDeniedTypes(contentType int32) bool {
|
||||
if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd {
|
||||
return true
|
||||
}
|
||||
if contentType == constant.Typing {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
@ -58,7 +58,7 @@ type Config struct {
|
||||
Index conf.Index
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
||||
|
||||
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
|
||||
@ -134,7 +134,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase)
|
||||
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase,config)
|
||||
|
||||
msgTransfer := &MsgTransfer{
|
||||
historyConsumer: historyConsumer,
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
pbmsg "github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@ -26,11 +28,15 @@ import (
|
||||
|
||||
type OnlineHistoryMongoConsumerHandler struct {
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
config *Config
|
||||
webhookClient *webhook.Client
|
||||
}
|
||||
|
||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase) *OnlineHistoryMongoConsumerHandler {
|
||||
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler {
|
||||
return &OnlineHistoryMongoConsumerHandler{
|
||||
msgTransferDatabase: database,
|
||||
config: config,
|
||||
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,6 +59,16 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(ctx context.Cont
|
||||
} else {
|
||||
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
||||
}
|
||||
|
||||
for _, msgData := range msgFromMQ.MsgData {
|
||||
switch msgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
|
||||
case constant.ReadGroupChatType:
|
||||
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
|
||||
}
|
||||
}
|
||||
|
||||
//var seqs []int64
|
||||
//for _, msg := range msgFromMQ.MsgData {
|
||||
// seqs = append(seqs, msg.Seq)
|
||||
|
@ -50,7 +50,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
|
||||
return &pbpush.DelUserPushTokenResp{}, nil
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
||||
rdb, err := dbb.Redis(ctx)
|
||||
if err != nil {
|
||||
|
@ -59,7 +59,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongoConfig, &config.RedisConfig)
|
||||
rdb, err := dbb.Redis(ctx)
|
||||
if err != nil {
|
||||
|
@ -69,7 +69,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
@ -76,7 +76,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
@ -86,7 +86,8 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq,
|
||||
go m.setConversationAtInfo(ctx, req.MsgData)
|
||||
}
|
||||
|
||||
m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
||||
// m.webhookAfterSendGroupMsg(ctx, &m.config.WebhooksConfig.AfterSendGroupMsg, req)
|
||||
|
||||
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
||||
resp = &pbmsg.SendMsgResp{}
|
||||
resp.SendTime = req.MsgData.SendTime
|
||||
@ -192,7 +193,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||
return nil, err
|
||||
}
|
||||
m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
||||
|
||||
// m.webhookAfterSendSingleMsg(ctx, &m.config.WebhooksConfig.AfterSendSingleMsg, req)
|
||||
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
||||
return &pbmsg.SendMsgResp{
|
||||
ServerMsgID: req.MsgData.ServerMsgID,
|
||||
|
@ -78,7 +78,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
|
||||
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
builder := mqbuild.NewBuilder(&config.KafkaConfig)
|
||||
redisProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToRedisTopic)
|
||||
if err != nil {
|
||||
|
@ -66,7 +66,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
@ -64,7 +64,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
@ -79,7 +79,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
|
||||
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
|
||||
mgocli, err := dbb.Mongo(ctx)
|
||||
if err != nil {
|
||||
|
@ -25,7 +25,7 @@ type Config struct {
|
||||
Discovery config.Discovery
|
||||
}
|
||||
|
||||
func Start(ctx context.Context, conf *Config, client discovery.Conn, service grpc.ServiceRegistrar) error {
|
||||
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
|
||||
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
|
||||
if conf.CronTask.RetainChatRecords < 1 {
|
||||
log.ZInfo(ctx, "disable cron")
|
||||
@ -58,6 +58,11 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
||||
cm.Watch(ctx)
|
||||
}
|
||||
|
||||
locker, err := NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srv := &cronServer{
|
||||
ctx: ctx,
|
||||
config: conf,
|
||||
@ -65,6 +70,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
||||
msgClient: msg.NewMsgClient(msgConn),
|
||||
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
||||
thirdClient: third.NewThirdClient(thirdConn),
|
||||
locker: locker,
|
||||
}
|
||||
|
||||
if err := srv.registerClearS3(); err != nil {
|
||||
@ -81,6 +87,8 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
||||
log.ZDebug(ctx, "cron task server is running")
|
||||
<-ctx.Done()
|
||||
log.ZDebug(ctx, "cron task server is shutting down")
|
||||
srv.cron.Stop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -91,6 +99,7 @@ type cronServer struct {
|
||||
msgClient msg.MsgClient
|
||||
conversationClient pbconversation.ConversationClient
|
||||
thirdClient third.ThirdClient
|
||||
locker *EtcdLocker
|
||||
}
|
||||
|
||||
func (c *cronServer) registerClearS3() error {
|
||||
@ -98,7 +107,9 @@ func (c *cronServer) registerClearS3() error {
|
||||
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
|
||||
return nil
|
||||
}
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3)
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||
c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3)
|
||||
})
|
||||
return errs.WrapMsg(err, "failed to register clear s3 cron task")
|
||||
}
|
||||
|
||||
@ -107,11 +118,15 @@ func (c *cronServer) registerDeleteMsg() error {
|
||||
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
|
||||
return nil
|
||||
}
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg)
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||
c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg)
|
||||
})
|
||||
return errs.WrapMsg(err, "failed to register delete msg cron task")
|
||||
}
|
||||
|
||||
func (c *cronServer) registerClearUserMsg() error {
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||
c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg)
|
||||
})
|
||||
return errs.WrapMsg(err, "failed to register clear user msg cron task")
|
||||
}
|
||||
|
89
internal/tools/cron/dist_look.go
Normal file
89
internal/tools/cron/dist_look.go
Normal file
@ -0,0 +1,89 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/tools/log"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
const (
|
||||
lockLeaseTTL = 300
|
||||
)
|
||||
|
||||
type EtcdLocker struct {
|
||||
client *clientv3.Client
|
||||
instanceID string
|
||||
}
|
||||
|
||||
// NewEtcdLocker creates a new etcd distributed lock
|
||||
func NewEtcdLocker(client *clientv3.Client) (*EtcdLocker, error) {
|
||||
hostname, _ := os.Hostname()
|
||||
pid := os.Getpid()
|
||||
instanceID := fmt.Sprintf("%s-pid-%d-%d", hostname, pid, time.Now().UnixNano())
|
||||
|
||||
locker := &EtcdLocker{
|
||||
client: client,
|
||||
instanceID: instanceID,
|
||||
}
|
||||
|
||||
return locker, nil
|
||||
}
|
||||
|
||||
func (e *EtcdLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) {
|
||||
session, err := concurrency.NewSession(e.client, concurrency.WithTTL(lockLeaseTTL))
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "Failed to create etcd session", err,
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID)
|
||||
return
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
lockKey := fmt.Sprintf("openim/crontask/%s", taskName)
|
||||
mutex := concurrency.NewMutex(session, lockKey)
|
||||
|
||||
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
err = mutex.TryLock(ctxWithTimeout)
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
log.ZDebug(ctx, "Task is being executed by another instance, skipping",
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID)
|
||||
} else {
|
||||
log.ZWarn(ctx, "Failed to acquire task lock", err,
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := mutex.Unlock(ctx); err != nil {
|
||||
log.ZWarn(ctx, "Failed to release task lock", err,
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID)
|
||||
} else {
|
||||
log.ZInfo(ctx, "Successfully released task lock",
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID)
|
||||
}
|
||||
}()
|
||||
|
||||
log.ZInfo(ctx, "Successfully acquired task lock, starting execution",
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID,
|
||||
"sessionID", session.Lease())
|
||||
|
||||
task()
|
||||
|
||||
log.ZInfo(ctx, "Task execution completed",
|
||||
"taskName", taskName,
|
||||
"instanceID", e.instanceID)
|
||||
}
|
@ -19,6 +19,7 @@ import (
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/internal/api"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
||||
"github.com/openimsdk/open-im-server/v3/version"
|
||||
"github.com/openimsdk/tools/system/program"
|
||||
@ -84,7 +85,7 @@ func (a *ApiCmd) runE() error {
|
||||
a.apiConfig.API.Api.ListenIP, "",
|
||||
a.apiConfig.API.Prometheus.AutoSetPorts,
|
||||
nil, int(a.apiConfig.Index),
|
||||
a.apiConfig.Discovery.RpcService.MessageGateway,
|
||||
prommetrics.APIKeyName,
|
||||
&a.apiConfig.Notification,
|
||||
a.apiConfig,
|
||||
[]string{},
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
||||
"github.com/openimsdk/open-im-server/v3/version"
|
||||
"github.com/openimsdk/tools/system/program"
|
||||
@ -65,7 +66,7 @@ func (m *MsgTransferCmd) runE() error {
|
||||
"", "",
|
||||
true,
|
||||
nil, int(m.msgTransferConfig.Index),
|
||||
"",
|
||||
prommetrics.MessageTransferKeyName,
|
||||
nil,
|
||||
m.msgTransferConfig,
|
||||
[]string{},
|
||||
|
@ -328,7 +328,7 @@ type Redis struct {
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
ClusterMode bool `yaml:"clusterMode"`
|
||||
DB int `yaml:"storage"`
|
||||
DB int `yaml:"db"`
|
||||
MaxRetry int `yaml:"maxRetry"`
|
||||
PoolSize int `yaml:"poolSize"`
|
||||
}
|
||||
|
@ -85,6 +85,8 @@ func Start(listener net.Listener) error {
|
||||
const (
|
||||
APIKeyName = "api"
|
||||
MessageTransferKeyName = "message-transfer"
|
||||
|
||||
TTL = 300
|
||||
)
|
||||
|
||||
type Target struct {
|
||||
@ -97,10 +99,14 @@ type RespTarget struct {
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
func BuildDiscoveryKey(name string) string {
|
||||
func BuildDiscoveryKeyPrefix(name string) string {
|
||||
return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name)
|
||||
}
|
||||
|
||||
func BuildDiscoveryKey(name string, index int) string {
|
||||
return fmt.Sprintf("%s/%s/%s/%d", "openim", "prometheus_discovery", name, index)
|
||||
}
|
||||
|
||||
func BuildDefaultTarget(host string, ip int) Target {
|
||||
return Target{
|
||||
Target: fmt.Sprintf("%s:%d", host, ip),
|
||||
|
@ -50,7 +50,7 @@ func init() {
|
||||
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
|
||||
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
|
||||
watchConfigNames []string, watchServiceNames []string,
|
||||
rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error,
|
||||
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error,
|
||||
options ...grpc.ServerOption) error {
|
||||
|
||||
if notification != nil {
|
||||
@ -148,9 +148,11 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := client.SetKey(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), target); err != nil {
|
||||
if !errors.Is(err, discovery.ErrNotSupportedKeyValue) {
|
||||
return err
|
||||
if autoSetPorts {
|
||||
if err = client.SetWithLease(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName, index), target, prommetrics.TTL); err != nil {
|
||||
if !errors.Is(err, discovery.ErrNotSupported) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
|
@ -1,6 +1,6 @@
|
||||
serviceBinaries:
|
||||
openim-api: 1
|
||||
openim-crontask: 1
|
||||
openim-crontask: 4
|
||||
openim-rpc-user: 1
|
||||
openim-msggateway: 1
|
||||
openim-push: 8
|
||||
|
Loading…
x
Reference in New Issue
Block a user