diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index b8941cc6f..3581a361e 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "time" ) var ( @@ -33,4 +34,15 @@ func init() { func Run() { go rpcServer.run() go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) + go stat() +} +func stat() { + t := time.NewTicker(time.Second * 10) + defer t.Stop() + for { + select { + case <-t.C: + } + log.Debug("", "10 second handle msg to mongo is ") + } } diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 41c943c4b..fa147e762 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -7,16 +7,12 @@ package logic import ( - push "Open_IM/internal/push/jpush" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" - "Open_IM/pkg/utils" "context" - "encoding/json" "strings" ) @@ -34,7 +30,7 @@ type AtContent struct { func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingleMsgToUser - isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) + //isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) //Online push message @@ -51,66 +47,66 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } } log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData) - if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { - for _, v := range wsResult { - if v.ResultCode == 0 { - continue - } - //supported terminal - for _, t := range pushTerminal { - if v.RecvPlatFormID == t { - //Use offline push messaging - var UIDList []string - UIDList = append(UIDList, v.RecvID) - customContent := OpenIMContent{ - SessionType: int(pushMsg.MsgData.SessionType), - From: pushMsg.MsgData.SendID, - To: pushMsg.MsgData.RecvID, - Seq: pushMsg.MsgData.Seq, - } - bCustomContent, _ := json.Marshal(customContent) - jsonCustomContent := string(bCustomContent) - var content string - if pushMsg.MsgData.OfflinePushInfo != nil { - content = pushMsg.MsgData.OfflinePushInfo.Title - - } else { - switch pushMsg.MsgData.ContentType { - case constant.Text: - content = constant.ContentType2PushContent[constant.Text] - case constant.Picture: - content = constant.ContentType2PushContent[constant.Picture] - case constant.Voice: - content = constant.ContentType2PushContent[constant.Voice] - case constant.Video: - content = constant.ContentType2PushContent[constant.Video] - case constant.File: - content = constant.ContentType2PushContent[constant.File] - case constant.AtText: - a := AtContent{} - _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) - if utils.IsContain(v.RecvID, a.AtUserList) { - content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - } else { - content = constant.ContentType2PushContent[constant.GroupMsg] - } - default: - content = constant.ContentType2PushContent[constant.Common] - } - } - - pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t)) - if err != nil { - log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t)) - } else { - log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t)) - } - - } - } - } - - } + //if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { + // for _, v := range wsResult { + // if v.ResultCode == 0 { + // continue + // } + // //supported terminal + // for _, t := range pushTerminal { + // if v.RecvPlatFormID == t { + // //Use offline push messaging + // var UIDList []string + // UIDList = append(UIDList, v.RecvID) + // customContent := OpenIMContent{ + // SessionType: int(pushMsg.MsgData.SessionType), + // From: pushMsg.MsgData.SendID, + // To: pushMsg.MsgData.RecvID, + // Seq: pushMsg.MsgData.Seq, + // } + // bCustomContent, _ := json.Marshal(customContent) + // jsonCustomContent := string(bCustomContent) + // var content string + // if pushMsg.MsgData.OfflinePushInfo != nil { + // content = pushMsg.MsgData.OfflinePushInfo.Title + // + // } else { + // switch pushMsg.MsgData.ContentType { + // case constant.Text: + // content = constant.ContentType2PushContent[constant.Text] + // case constant.Picture: + // content = constant.ContentType2PushContent[constant.Picture] + // case constant.Voice: + // content = constant.ContentType2PushContent[constant.Voice] + // case constant.Video: + // content = constant.ContentType2PushContent[constant.Video] + // case constant.File: + // content = constant.ContentType2PushContent[constant.File] + // case constant.AtText: + // a := AtContent{} + // _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) + // if utils.IsContain(v.RecvID, a.AtUserList) { + // content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + // } else { + // content = constant.ContentType2PushContent[constant.GroupMsg] + // } + // default: + // content = constant.ContentType2PushContent[constant.Common] + // } + // } + // + // pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t)) + // if err != nil { + // log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t)) + // } else { + // log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t)) + // } + // + // } + // } + // } + // + //} } //func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index 51ff4c871..eb018fbf0 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -1,20 +1,33 @@ package statistics -import "time" +import ( + "Open_IM/pkg/common/log" + "time" +) type Statistics struct { - Count *uint64 - Dr int + Count *uint64 + ModuleName string + PrintArgs string + SleepTime int } func (s *Statistics) output() { + t := time.NewTicker(time.Duration(s.SleepTime) * time.Second) + defer t.Stop() + var sum uint64 for { - time.Sleep(time.Duration(s.Dr) * time.Second) + sum = *s.Count + select { + case <-t.C: + } + log.Debug(s.ModuleName, s.PrintArgs, *s.Count-sum) } } -func NewStatistics(count *uint64, dr int) *Statistics { - p := &Statistics{Count: count} +func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics { + p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs} go p.output() + return p }