mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-26 03:26:57 +08:00
Merge remote-tracking branch 'origin/tuoyun' into tuoyun
This commit is contained in:
commit
6c03211ac4
@ -1 +1 @@
|
||||
Subproject commit 992f76df0ee500a0377523b0780d3a85f2275755
|
||||
Subproject commit 1c6c7af5393b3e9eefbaf16b673519ca863a6c2c
|
@ -8,6 +8,7 @@ import (
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbMsg "Open_IM/pkg/proto/chat"
|
||||
pbPush "Open_IM/pkg/proto/push"
|
||||
"Open_IM/pkg/statistics"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"github.com/Shopify/sarama"
|
||||
@ -20,9 +21,14 @@ type fcb func(msg []byte, msgKey string)
|
||||
type HistoryConsumerHandler struct {
|
||||
msgHandle map[string]fcb
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
singleMsgCount uint64
|
||||
groupMsgCount uint64
|
||||
}
|
||||
|
||||
func (mc *HistoryConsumerHandler) Init() {
|
||||
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10)
|
||||
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10)
|
||||
|
||||
mc.msgHandle = make(map[string]fcb)
|
||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||
@ -55,6 +61,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
||||
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
mc.singleMsgCount++
|
||||
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
|
||||
}
|
||||
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
||||
@ -70,6 +77,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
||||
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
||||
return
|
||||
}
|
||||
mc.groupMsgCount++
|
||||
}
|
||||
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
||||
default:
|
||||
|
@ -21,7 +21,7 @@ func (s *Statistics) output() {
|
||||
select {
|
||||
case <-t.C:
|
||||
}
|
||||
log.Debug(s.ModuleName, s.PrintArgs, *s.Count-sum)
|
||||
log.NewWarn(s.ModuleName, s.PrintArgs, *s.Count-sum)
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user