super group push

This commit is contained in:
Gordon 2022-06-07 18:39:43 +08:00
parent 5fa62c9fbd
commit a87e41eecd

View File

@ -12,6 +12,7 @@ import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbCache "Open_IM/pkg/proto/cache"
pbPush "Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay" pbRelay "Open_IM/pkg/proto/relay"
pbRtc "Open_IM/pkg/proto/rtc" pbRtc "Open_IM/pkg/proto/rtc"
@ -138,102 +139,116 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
} }
func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
return var wsResult []*pbRelay.SingelMsgToUserResultList
//var wsResult []*pbRelay.SingelMsgToUserResultList isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
//isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) log.Debug(pushMsg.OperationID, "Get super group msg from msg_transfer And push msg", pushMsg.String())
//log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String()) getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID}
//if len(grpcCons) == 0 { etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
// log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ") client := pbCache.NewCacheClient(etcdConn)
// grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq)
//} if err != nil {
////Online push message log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error())
//log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) return
//for _, v := range grpcCons { }
// msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) if cacheResp.CommonResp.ErrCode != 0 {
// reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: pushMsg.PushToUserID}) log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
// if err != nil { return
// log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err) }
// continue if len(grpcCons) == 0 {
// } log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ")
// if reply != nil && reply.SinglePushResult != nil { grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
// wsResult = append(wsResult, reply.SinglePushResult...) }
// } //Online push message
//} log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
//log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) for _, v := range grpcCons {
//successCount++ msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v)
//if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: cacheResp.UserIDList})
// for _, v := range wsResult { if err != nil {
// if v.ResultCode == 0 { log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
// if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { continue
// break }
// } if reply != nil && reply.SinglePushResult != nil {
// continue wsResult = append(wsResult, reply.SinglePushResult...)
// } }
// if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { }
// //Use offline push messaging log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
// var UIDList []string successCount++
// UIDList = append(UIDList, v.RecvID) if isOfflinePush {
// customContent := OpenIMContent{ var onlineSuccessUserIDList []string
// SessionType: int(pushMsg.MsgData.SessionType), onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID)
// From: pushMsg.MsgData.SendID, for _, v := range wsResult {
// To: pushMsg.MsgData.RecvID, if v.OnlinePush && v.UserID != pushMsg.MsgData.SendID {
// Seq: pushMsg.MsgData.Seq, onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID)
// } }
// bCustomContent, _ := json.Marshal(customContent) }
// jsonCustomContent := string(bCustomContent) onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, cacheResp.UserIDList)
// var content string //Use offline push messaging
// if pushMsg.MsgData.OfflinePushInfo != nil { customContent := OpenIMContent{
// content = pushMsg.MsgData.OfflinePushInfo.Title SessionType: int(pushMsg.MsgData.SessionType),
// From: pushMsg.MsgData.SendID,
// } else { To: pushMsg.MsgData.RecvID,
// switch pushMsg.MsgData.ContentType { Seq: pushMsg.MsgData.Seq,
// case constant.Text: }
// content = constant.ContentType2PushContent[constant.Text] bCustomContent, _ := json.Marshal(customContent)
// case constant.Picture: jsonCustomContent := string(bCustomContent)
// content = constant.ContentType2PushContent[constant.Picture] var content string
// case constant.Voice: if pushMsg.MsgData.OfflinePushInfo != nil {
// content = constant.ContentType2PushContent[constant.Voice] content = pushMsg.MsgData.OfflinePushInfo.Title
// case constant.Video:
// content = constant.ContentType2PushContent[constant.Video] } else {
// case constant.File: switch pushMsg.MsgData.ContentType {
// content = constant.ContentType2PushContent[constant.File] case constant.Text:
// case constant.AtText: content = constant.ContentType2PushContent[constant.Text]
// a := AtContent{} case constant.Picture:
// _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) content = constant.ContentType2PushContent[constant.Picture]
// if utils.IsContain(v.RecvID, a.AtUserList) { case constant.Voice:
// content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] content = constant.ContentType2PushContent[constant.Voice]
// } else { case constant.Video:
// content = constant.ContentType2PushContent[constant.GroupMsg] content = constant.ContentType2PushContent[constant.Video]
// } case constant.File:
// default: content = constant.ContentType2PushContent[constant.File]
// content = constant.ContentType2PushContent[constant.Common] case constant.AtText:
// } a := AtContent{}
// } _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
// callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData.OfflinePushInfo, v.RecvPlatFormID) if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) {
// log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
// if callbackResp.ErrCode != 0 { } else {
// log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) content = constant.ContentType2PushContent[constant.GroupMsg]
// } }
// if callbackResp.ActionCode != constant.ActionAllow { case constant.SignalingNotification:
// log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") content = constant.ContentType2PushContent[constant.SignalMsg]
// break default:
// } content = constant.ContentType2PushContent[constant.Common]
//
// if offlinePusher == nil { }
// offlinePusher = jpush.JPushClient }
// }
// pushResult, err := offlinePusher.Push(UIDList, content, jsonCustomContent, pushMsg.OperationID) callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList[0], pushMsg.MsgData)
// if err != nil { log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
// log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) if callbackResp.ErrCode != 0 {
// } else { log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
// log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) }
// } if callbackResp.ActionCode != constant.ActionAllow {
// break log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop")
// } return
// }
// } if offlinePusher == nil {
// return
//} }
opts, err := GetOfflinePushOpts(pushMsg)
if err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error())
}
log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), onlineFailedUserIDList, content, jsonCustomContent, "opts:", opts)
pushResult, err := offlinePusher.Push(onlineFailedUserIDList, content, jsonCustomContent, pushMsg.OperationID, opts)
if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error())
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
}
}
} }
func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) { func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) {