diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index a98027561..b827f9928 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -23,8 +23,8 @@ func Init(rpcPort, wsPort int) { log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName) rwLock = new(sync.RWMutex) validate = validator.New() - statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 10) - statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 10) + statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 300) + statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 300) ws.onInit(wsPort) rpcSvr.onInit(rpcPort) } diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index 294b5ed70..1281b1732 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -26,8 +26,8 @@ type HistoryConsumerHandler struct { } func (mc *HistoryConsumerHandler) Init() { - statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10) - statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10) + statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 300) + statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 300) mc.msgHandle = make(map[string]fcb) mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 02d96fda4..6f6f061b8 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -31,7 +31,7 @@ func Init(rpcPort int) { } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) - statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 10), 10) + statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 300), 300) } func Run() { diff --git a/pkg/grpc-etcdv3/getcdv3/register.go b/pkg/grpc-etcdv3/getcdv3/register.go index 3230590b6..898a4b8c0 100644 --- a/pkg/grpc-etcdv3/getcdv3/register.go +++ b/pkg/grpc-etcdv3/getcdv3/register.go @@ -78,6 +78,16 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin log.Debug("", "KeepAlive kresp ok", pv) } else { log.Error("", "KeepAlive kresp failed", pv) + t := time.NewTicker(time.Duration(ttl) * time.Second) + for { + select { + case <-t.C: + } + if _, err := cli.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(resp.ID)); err != nil { + log.Error("", "etcd Put failed ", err.Error(), serviceKey, serviceValue, resp.ID) + } + log.Info("", "etcd Put ok", serviceKey, serviceValue, resp.ID) + } } } } diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index 2711c5d91..bea4479bd 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -1,6 +1,7 @@ package statistics import ( + "Open_IM/pkg/common/log" "time" ) @@ -14,13 +15,13 @@ type Statistics struct { func (s *Statistics) output() { t := time.NewTicker(time.Duration(s.SleepTime) * time.Second) defer t.Stop() - //var sum uint64 + var sum uint64 for { - //sum = *s.Count + sum = *s.Count select { case <-t.C: } - //log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count) + log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count) } }