mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-08 13:37:13 +08:00
fix: fix tim file rename
This commit is contained in:
parent
8dca680c3c
commit
6ee88fea7e
@ -17,13 +17,12 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
"net"
|
"net"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
|
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
@ -33,6 +32,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||||
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -65,7 +65,7 @@ func run(port int, proPort int) error {
|
|||||||
var client discoveryregistry.SvcDiscoveryRegistry
|
var client discoveryregistry.SvcDiscoveryRegistry
|
||||||
|
|
||||||
// Determine whether zk is passed according to whether it is a clustered deployment
|
// Determine whether zk is passed according to whether it is a clustered deployment
|
||||||
client, err = discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(context.Background(), "Failed to initialize discovery register", err)
|
log.ZError(context.Background(), "Failed to initialize discovery register", err)
|
||||||
|
|
||||||
@ -86,7 +86,7 @@ func run(port int, proPort int) error {
|
|||||||
router := api.NewGinRouter(client, rdb)
|
router := api.NewGinRouter(client, rdb)
|
||||||
//////////////////////////////
|
//////////////////////////////
|
||||||
if config.Config.Prometheus.Enable {
|
if config.Config.Prometheus.Enable {
|
||||||
p := ginProm.NewPrometheus("app", prom_metrics.GetGinCusMetrics("Api"))
|
p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
|
||||||
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
|
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
|
||||||
p.Use(router)
|
p.Use(router)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,7 +17,6 @@ package msggateway
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
@ -33,6 +32,7 @@ import (
|
|||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
@ -221,7 +221,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
|||||||
if !userOK {
|
if !userOK {
|
||||||
ws.clients.Set(client.UserID, client)
|
ws.clients.Set(client.UserID, client)
|
||||||
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
|
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
|
||||||
prom_metrics.OnlineUserGauge.Add(1)
|
prommetrics.OnlineUserGauge.Add(1)
|
||||||
ws.onlineUserNum.Add(1)
|
ws.onlineUserNum.Add(1)
|
||||||
ws.onlineUserConnNum.Add(1)
|
ws.onlineUserConnNum.Add(1)
|
||||||
} else {
|
} else {
|
||||||
@ -361,7 +361,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
|
|||||||
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
|
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
|
||||||
if isDeleteUser {
|
if isDeleteUser {
|
||||||
ws.onlineUserNum.Add(-1)
|
ws.onlineUserNum.Add(-1)
|
||||||
prom_metrics.OnlineUserGauge.Dec()
|
prommetrics.OnlineUserGauge.Dec()
|
||||||
}
|
}
|
||||||
ws.onlineUserConnNum.Add(-1)
|
ws.onlineUserConnNum.Add(-1)
|
||||||
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
|
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
|
||||||
|
|||||||
@ -17,16 +17,15 @@ package msgtransfer
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
|
"log"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/mw"
|
"github.com/OpenIMSDK/tools/mw"
|
||||||
|
|
||||||
@ -36,6 +35,8 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||||
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -65,7 +66,7 @@ func StartTransfer(prometheusPort int) error {
|
|||||||
if err := mongo.CreateMsgIndex(); err != nil {
|
if err := mongo.CreateMsgIndex(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
||||||
/*
|
/*
|
||||||
client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
||||||
openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
|
openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
|
||||||
@ -123,7 +124,7 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
|
|||||||
reg.MustRegister(
|
reg.MustRegister(
|
||||||
collectors.NewGoCollector(),
|
collectors.NewGoCollector(),
|
||||||
)
|
)
|
||||||
reg.MustRegister(prom_metrics.GetGrpcCusMetrics("Transfer")...)
|
reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...)
|
||||||
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
|
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
|
||||||
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil))
|
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@ package msgtransfer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
@ -27,6 +26,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||||
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
type OnlineHistoryMongoConsumerHandler struct {
|
type OnlineHistoryMongoConsumerHandler struct {
|
||||||
@ -75,9 +75,9 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
|
|||||||
"conversationID",
|
"conversationID",
|
||||||
msgFromMQ.ConversationID,
|
msgFromMQ.ConversationID,
|
||||||
)
|
)
|
||||||
prom_metrics.MsgInsertMongoFailedCounter.Inc()
|
prommetrics.MsgInsertMongoFailedCounter.Inc()
|
||||||
} else {
|
} else {
|
||||||
prom_metrics.MsgInsertMongoSuccessCounter.Inc()
|
prommetrics.MsgInsertMongoSuccessCounter.Inc()
|
||||||
}
|
}
|
||||||
var seqs []int64
|
var seqs []int64
|
||||||
for _, msg := range msgFromMQ.MsgData {
|
for _, msg := range msgFromMQ.MsgData {
|
||||||
|
|||||||
@ -19,6 +19,12 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/protocol/conversation"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
"github.com/OpenIMSDK/protocol/conversation"
|
"github.com/OpenIMSDK/protocol/conversation"
|
||||||
"github.com/OpenIMSDK/protocol/msggateway"
|
"github.com/OpenIMSDK/protocol/msggateway"
|
||||||
@ -37,8 +43,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -306,7 +311,7 @@ func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg
|
|||||||
}
|
}
|
||||||
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
|
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
prom_metrics.MsgOfflinePushFailedCounter.Inc()
|
prommetrics.MsgOfflinePushFailedCounter.Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -16,7 +16,6 @@ package auth
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
|
|
||||||
@ -35,6 +34,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -74,7 +74,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
prom_metrics.UserLoginCounter.Inc()
|
prommetrics.UserLoginCounter.Inc()
|
||||||
resp.Token = token
|
resp.Token = token
|
||||||
resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60
|
resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60
|
||||||
return &resp, nil
|
return &resp, nil
|
||||||
|
|||||||
@ -16,8 +16,8 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
@ -59,7 +59,7 @@ func (m *msgServer) sendMsgSuperGroupChat(
|
|||||||
req *pbmsg.SendMsgReq,
|
req *pbmsg.SendMsgReq,
|
||||||
) (resp *pbmsg.SendMsgResp, err error) {
|
) (resp *pbmsg.SendMsgResp, err error) {
|
||||||
if err = m.messageVerification(ctx, req); err != nil {
|
if err = m.messageVerification(ctx, req); err != nil {
|
||||||
prom_metrics.GroupChatMsgProcessFailedCounter.Inc()
|
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = callbackBeforeSendGroupMsg(ctx, req); err != nil {
|
if err = callbackBeforeSendGroupMsg(ctx, req); err != nil {
|
||||||
@ -78,7 +78,7 @@ func (m *msgServer) sendMsgSuperGroupChat(
|
|||||||
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
|
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
|
||||||
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
|
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
|
||||||
}
|
}
|
||||||
prom_metrics.GroupChatMsgProcessSuccessCounter.Inc()
|
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
|
||||||
resp = &pbmsg.SendMsgResp{}
|
resp = &pbmsg.SendMsgResp{}
|
||||||
resp.SendTime = req.MsgData.SendTime
|
resp.SendTime = req.MsgData.SendTime
|
||||||
resp.ServerMsgID = req.MsgData.ServerMsgID
|
resp.ServerMsgID = req.MsgData.ServerMsgID
|
||||||
@ -161,7 +161,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !isSend {
|
if !isSend {
|
||||||
prom_metrics.SingleChatMsgProcessFailedCounter.Inc()
|
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, nil
|
return nil, nil
|
||||||
} else {
|
} else {
|
||||||
if err = callbackBeforeSendSingleMsg(ctx, req); err != nil {
|
if err = callbackBeforeSendSingleMsg(ctx, req); err != nil {
|
||||||
@ -171,7 +171,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
|
||||||
prom_metrics.SingleChatMsgProcessFailedCounter.Inc()
|
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = callbackAfterSendSingleMsg(ctx, req)
|
err = callbackAfterSendSingleMsg(ctx, req)
|
||||||
@ -183,7 +183,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
|
|||||||
ClientMsgID: req.MsgData.ClientMsgID,
|
ClientMsgID: req.MsgData.ClientMsgID,
|
||||||
SendTime: req.MsgData.SendTime,
|
SendTime: req.MsgData.SendTime,
|
||||||
}
|
}
|
||||||
prom_metrics.SingleChatMsgProcessSuccessCounter.Inc()
|
prommetrics.SingleChatMsgProcessSuccessCounter.Inc()
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,7 +23,7 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ func InitMsgTool() (*MsgTool, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
discov, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
discov, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
||||||
/*
|
/*
|
||||||
discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
|
||||||
zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
|
zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
|
||||||
|
|||||||
@ -17,7 +17,6 @@ package controller
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
@ -31,6 +30,7 @@ import (
|
|||||||
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
|
||||||
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
pbmsg "github.com/OpenIMSDK/protocol/msg"
|
||||||
@ -376,20 +376,20 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
|
|||||||
}
|
}
|
||||||
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
|
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
prom_metrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
|
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
|
||||||
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
|
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
|
||||||
} else {
|
} else {
|
||||||
prom_metrics.MsgInsertRedisSuccessCounter.Inc()
|
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||||||
}
|
}
|
||||||
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
|
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
|
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
|
||||||
prom_metrics.SeqSetFailedCounter.Inc()
|
prommetrics.SeqSetFailedCounter.Inc()
|
||||||
}
|
}
|
||||||
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||||
prom_metrics.SeqSetFailedCounter.Inc()
|
prommetrics.SeqSetFailedCounter.Inc()
|
||||||
}
|
}
|
||||||
return lastMaxSeq, isNew, utils.Wrap(err, "")
|
return lastMaxSeq, isNew, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
package discovery_register
|
package discoveryregister
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
407
pkg/common/discoveryregister/discoveryregister_test.go
Normal file
407
pkg/common/discoveryregister/discoveryregister_test.go
Normal file
@ -0,0 +1,407 @@
|
|||||||
|
package discoveryregister
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewDiscoveryRegister(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
envType string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want discoveryregistry.SvcDiscoveryRegistry
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := NewDiscoveryRegister(tt.args.envType)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewK8sDiscoveryRegister(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
want discoveryregistry.SvcDiscoveryRegistry
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := NewK8sDiscoveryRegister()
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_Register(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
serviceName string
|
||||||
|
host string
|
||||||
|
port int
|
||||||
|
opts []grpc.DialOption
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_UnRegister(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
serviceNames []string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
key string
|
||||||
|
conf []byte
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
key string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
want []byte
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
got, err := cli.GetConfFromRegistry(tt.args.key)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_GetConns(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
serviceName string
|
||||||
|
opts []grpc.DialOption
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
want []*grpc.ClientConn
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_GetConn(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
serviceName string
|
||||||
|
opts []grpc.DialOption
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
want *grpc.ClientConn
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
if got := cli.GetSelfConnTarget(); got != tt.want {
|
||||||
|
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_AddOption(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
opts []grpc.DialOption
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
cli.AddOption(tt.args.opts...)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_CloseConn(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
cli.CloseConn(tt.args.conn)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_GetClientLocalConns(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
want map[string][]*grpc.ClientConn
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestK8sDR_Close(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
options []grpc.DialOption
|
||||||
|
rpcRegisterAddr string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
}{
|
||||||
|
// TODO: Add test cases.
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
cli := &K8sDR{
|
||||||
|
options: tt.fields.options,
|
||||||
|
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
|
||||||
|
}
|
||||||
|
cli.Close()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package ginPrometheus
|
package ginprometheus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -1,15 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package locker // import "github.com/openimsdk/open-im-server/v3/pkg/common/locker"
|
|
||||||
@ -1,72 +0,0 @@
|
|||||||
// Copyright © 2023 OpenIM. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package locker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
|
||||||
)
|
|
||||||
|
|
||||||
const GlOBALLOCK = "GLOBAL_LOCK"
|
|
||||||
|
|
||||||
type MessageLocker interface {
|
|
||||||
LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error)
|
|
||||||
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error
|
|
||||||
LockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
|
|
||||||
UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error)
|
|
||||||
}
|
|
||||||
type LockerMessage struct {
|
|
||||||
cache cache.MsgModel
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewLockerMessage(cache cache.MsgModel) *LockerMessage {
|
|
||||||
return &LockerMessage{cache: cache}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) {
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
err = l.cache.LockMessageTypeKey(ctx, clientMsgID, typeKey)
|
|
||||||
if err != nil {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *LockerMessage) LockGlobalMessage(ctx context.Context, clientMsgID string) (err error) {
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
err = l.cache.LockMessageTypeKey(ctx, clientMsgID, GlOBALLOCK)
|
|
||||||
if err != nil {
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *LockerMessage) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error {
|
|
||||||
return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, typeKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *LockerMessage) UnLockGlobalMessage(ctx context.Context, clientMsgID string) error {
|
|
||||||
return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, GlOBALLOCK)
|
|
||||||
}
|
|
||||||
@ -1,9 +1,9 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||||
)
|
)
|
||||||
@ -35,11 +35,11 @@ func GetGrpcCusMetrics(registerName string) []prometheus.Collector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetGinCusMetrics(name string) []*ginPrometheus.Metric {
|
func GetGinCusMetrics(name string) []*ginprometheus.Metric {
|
||||||
switch name {
|
switch name {
|
||||||
case "Api":
|
case "Api":
|
||||||
return []*ginPrometheus.Metric{ApiCustomCnt}
|
return []*ginprometheus.Metric{ApiCustomCnt}
|
||||||
default:
|
default:
|
||||||
return []*ginPrometheus.Metric{ApiCustomCnt}
|
return []*ginprometheus.Metric{ApiCustomCnt}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,6 +1,6 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
|
import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
|
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package prom_metrics
|
package prommetrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -16,18 +16,19 @@ package startrpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
|
|
||||||
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ func Start(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer listener.Close()
|
defer listener.Close()
|
||||||
client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return utils.Wrap1(err)
|
return utils.Wrap1(err)
|
||||||
}
|
}
|
||||||
@ -70,8 +71,8 @@ func Start(
|
|||||||
// ctx 中间件
|
// ctx 中间件
|
||||||
if config.Config.Prometheus.Enable {
|
if config.Config.Prometheus.Enable {
|
||||||
//////////////////////////
|
//////////////////////////
|
||||||
cusMetrics := prom_metrics.GetGrpcCusMetrics(rpcRegisterName)
|
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName)
|
||||||
reg, metric, err = prom_metrics.NewGrpcPromObj(cusMetrics)
|
reg, metric, err = prommetrics.NewGrpcPromObj(cusMetrics)
|
||||||
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
|
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
|
||||||
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
|
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user