remove mysql for test

This commit is contained in:
Gordon 2022-02-25 19:39:42 +08:00
parent 2427671093
commit 4f01b0b2ad
3 changed files with 92 additions and 71 deletions

View File

@ -11,6 +11,7 @@ import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka" "Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"time"
) )
var ( var (
@ -33,4 +34,15 @@ func init() {
func Run() { func Run() {
go rpcServer.run() go rpcServer.run()
go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh)
go stat()
}
func stat() {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
for {
select {
case <-t.C:
}
log.Debug("", "10 second handle msg to mongo is ")
}
} }

View File

@ -7,16 +7,12 @@
package logic package logic
import ( import (
push "Open_IM/internal/push/jpush"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbPush "Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"
"context" "context"
"encoding/json"
"strings" "strings"
) )
@ -34,7 +30,7 @@ type AtContent struct {
func MsgToUser(pushMsg *pbPush.PushMsgReq) { func MsgToUser(pushMsg *pbPush.PushMsgReq) {
var wsResult []*pbRelay.SingleMsgToUser var wsResult []*pbRelay.SingleMsgToUser
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) //isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String())
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
//Online push message //Online push message
@ -51,66 +47,66 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
} }
} }
log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData) log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData)
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { //if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
for _, v := range wsResult { // for _, v := range wsResult {
if v.ResultCode == 0 { // if v.ResultCode == 0 {
continue // continue
} // }
//supported terminal // //supported terminal
for _, t := range pushTerminal { // for _, t := range pushTerminal {
if v.RecvPlatFormID == t { // if v.RecvPlatFormID == t {
//Use offline push messaging // //Use offline push messaging
var UIDList []string // var UIDList []string
UIDList = append(UIDList, v.RecvID) // UIDList = append(UIDList, v.RecvID)
customContent := OpenIMContent{ // customContent := OpenIMContent{
SessionType: int(pushMsg.MsgData.SessionType), // SessionType: int(pushMsg.MsgData.SessionType),
From: pushMsg.MsgData.SendID, // From: pushMsg.MsgData.SendID,
To: pushMsg.MsgData.RecvID, // To: pushMsg.MsgData.RecvID,
Seq: pushMsg.MsgData.Seq, // Seq: pushMsg.MsgData.Seq,
} // }
bCustomContent, _ := json.Marshal(customContent) // bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent) // jsonCustomContent := string(bCustomContent)
var content string // var content string
if pushMsg.MsgData.OfflinePushInfo != nil { // if pushMsg.MsgData.OfflinePushInfo != nil {
content = pushMsg.MsgData.OfflinePushInfo.Title // content = pushMsg.MsgData.OfflinePushInfo.Title
//
} else { // } else {
switch pushMsg.MsgData.ContentType { // switch pushMsg.MsgData.ContentType {
case constant.Text: // case constant.Text:
content = constant.ContentType2PushContent[constant.Text] // content = constant.ContentType2PushContent[constant.Text]
case constant.Picture: // case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture] // content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice: // case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice] // content = constant.ContentType2PushContent[constant.Voice]
case constant.Video: // case constant.Video:
content = constant.ContentType2PushContent[constant.Video] // content = constant.ContentType2PushContent[constant.Video]
case constant.File: // case constant.File:
content = constant.ContentType2PushContent[constant.File] // content = constant.ContentType2PushContent[constant.File]
case constant.AtText: // case constant.AtText:
a := AtContent{} // a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) // _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(v.RecvID, a.AtUserList) { // if utils.IsContain(v.RecvID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] // content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else { // } else {
content = constant.ContentType2PushContent[constant.GroupMsg] // content = constant.ContentType2PushContent[constant.GroupMsg]
} // }
default: // default:
content = constant.ContentType2PushContent[constant.Common] // content = constant.ContentType2PushContent[constant.Common]
} // }
} // }
//
pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t)) // pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t))
if err != nil { // if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t)) // log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t))
} else { // } else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t)) // log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t))
} // }
//
} // }
} // }
} // }
//
} //}
} }
//func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { //func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {

View File

@ -1,20 +1,33 @@
package statistics package statistics
import "time" import (
"Open_IM/pkg/common/log"
"time"
)
type Statistics struct { type Statistics struct {
Count *uint64 Count *uint64
Dr int ModuleName string
PrintArgs string
SleepTime int
} }
func (s *Statistics) output() { func (s *Statistics) output() {
t := time.NewTicker(time.Duration(s.SleepTime) * time.Second)
defer t.Stop()
var sum uint64
for { for {
time.Sleep(time.Duration(s.Dr) * time.Second) sum = *s.Count
select {
case <-t.C:
}
log.Debug(s.ModuleName, s.PrintArgs, *s.Count-sum)
} }
} }
func NewStatistics(count *uint64, dr int) *Statistics { func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics {
p := &Statistics{Count: count} p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs}
go p.output() go p.output()
return p
} }