push update

This commit is contained in:
Gordon 2023-01-02 19:07:05 +08:00
parent d53507b7b6
commit 1bf4fa0d87
5 changed files with 93 additions and 1115 deletions

View File

@ -225,6 +225,54 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
SinglePushResult: singleUserResult,
}, nil
}
func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
//r.GetBatchMsgForPush(req.OperationID,req.MsgData,req.PushToUserIDList,)
msgBytes, _ := proto.Marshal(req.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: req.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(req.OperationID, "data encode err", err.Error())
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil && userConn.IsBackground {
temp := &pbRelay.SingleMsgToUserPlatform{
RecvID: v,
RecvPlatFormID: int32(platform),
}
if constant.PlatformIDToClass(int(userConn.PlatformID)) == constant.TerminalPC || userConn.PlatformID == constant.WebPlatformID {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
}
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList

View File

@ -205,11 +205,25 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
return
}
var onlineSuccessUserIDList []string
var WebAndPcBackgroundUserIDList []string
onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID)
for _, v := range wsResult {
if v.OnlinePush && v.UserID != pushMsg.MsgData.SendID {
onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID)
}
if !v.OnlinePush {
if len(v.Resp) != 0 {
for _, singleResult := range v.Resp {
if singleResult.ResultCode == -2 {
if constant.PlatformIDToClass(int(singleResult.RecvPlatFormID)) == constant.TerminalPC ||
singleResult.RecvPlatFormID == constant.WebPlatformID {
WebAndPcBackgroundUserIDList = append(WebAndPcBackgroundUserIDList, v.UserID)
}
}
}
}
}
}
onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, pushToUserIDList)
//Use offline push messaging
@ -280,6 +294,22 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
promePkg.PromeInc(promePkg.MsgOfflinePushSuccessCounter)
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
needBackgroupPushUserID := utils.IntersectString(needOfflinePushUserIDList, WebAndPcBackgroundUserIDList)
grpcCons := getcdv3.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID)
if len(needBackgroupPushUserID) > 0 {
//Online push message
log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
for _, v := range grpcCons {
msgClient := pbRelay.NewRelayClient(v)
_, err := msgClient.SuperGroupBackgroundOnlinePush(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData,
PushToUserIDList: needBackgroupPushUserID})
if err != nil {
log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
continue
}
}
}
}
}
}

View File

@ -57,7 +57,7 @@ var PlatformName2ID = map[string]int{
IPadPlatformStr: IPadPlatformID,
AdminPlatformStr: AdminPlatformID,
}
var Platform2class = map[string]string{
var PlatformName2class = map[string]string{
IOSPlatformStr: TerminalMobile,
AndroidPlatformStr: TerminalMobile,
MiniWebPlatformStr: WebPlatformStr,
@ -66,6 +66,15 @@ var Platform2class = map[string]string{
OSXPlatformStr: TerminalPC,
LinuxPlatformStr: TerminalPC,
}
var PlatformID2class = map[int]string{
IOSPlatformID: TerminalMobile,
AndroidPlatformID: TerminalMobile,
MiniWebPlatformID: WebPlatformStr,
WebPlatformID: WebPlatformStr,
WindowsPlatformID: TerminalPC,
OSXPlatformID: TerminalPC,
LinuxPlatformID: TerminalPC,
}
func PlatformIDToName(num int) string {
return PlatformID2Name[num]
@ -74,5 +83,8 @@ func PlatformNameToID(name string) int {
return PlatformName2ID[name]
}
func PlatformNameToClass(name string) string {
return Platform2class[name]
return PlatformName2class[name]
}
func PlatformIDToClass(num int) string {
return PlatformID2class[num]
}

File diff suppressed because it is too large Load Diff

View File

@ -98,5 +98,6 @@ service relay {
rpc SuperGroupOnlineBatchPushOneMsg(OnlineBatchPushOneMsgReq) returns(OnlineBatchPushOneMsgResp);
rpc KickUserOffline(KickUserOfflineReq) returns(KickUserOfflineResp);
rpc MultiTerminalLoginCheck(MultiTerminalLoginCheckReq) returns(MultiTerminalLoginCheckResp);
rpc SuperGroupBackgroundOnlinePush(OnlineBatchPushOneMsgReq) returns(OnlineBatchPushOneMsgResp);
}