From 6ce1d60ebb03ec6c47797fe73b1a72d80571bdb8 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 7 Jun 2022 18:09:11 +0800 Subject: [PATCH 1/3] log --- internal/rpc/group/group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index c2b2d27ca..28ffa119d 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1539,7 +1539,7 @@ func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMu // errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupAdmin " + req.GroupID + req.OpUserID + err.Error() // return &pbGroup.CancelMuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil //} - + log.Debug(req.OperationID, "UpdateGroupInfoDefaultZero ", req.GroupID, map[string]interface{}{"status": constant.GroupOk}) err = imdb.UpdateGroupInfoDefaultZero(req.GroupID, map[string]interface{}{"status": constant.GroupOk}) if err != nil { log.Error(req.OperationID, "UpdateGroupInfoDefaultZero failed ", err.Error(), req.GroupID) From 0463f5e83d88de56cbd634f0668c9b24e0b2bebc Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 7 Jun 2022 18:25:08 +0800 Subject: [PATCH 2/3] fix bug --- internal/rpc/group/group.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index fb77a2cf1..60127c425 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1448,11 +1448,11 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil } if mutedInfo.RoleLevel == constant.GroupOwner && opFlag != 1 { - errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupOwner " + req.GroupID + req.UserID + err.Error() + errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupOwner " + req.GroupID + req.UserID return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil } if mutedInfo.RoleLevel == constant.GroupAdmin && opFlag == 3 { - errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupAdmin " + req.GroupID + req.UserID + err.Error() + errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupAdmin " + req.GroupID + req.UserID return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil } @@ -1490,11 +1490,11 @@ func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.Ca return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil } if mutedInfo.RoleLevel == constant.GroupOwner && opFlag != 1 { - errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupOwner " + req.GroupID + req.UserID + err.Error() + errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupOwner " + req.GroupID + req.UserID return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil } if mutedInfo.RoleLevel == constant.GroupAdmin && opFlag == 3 { - errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupAdmin " + req.GroupID + req.UserID + err.Error() + errMsg := req.OperationID + " mutedInfo.RoleLevel == constant.GroupAdmin " + req.GroupID + req.UserID return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: errMsg}}, nil } From a87e41eecd704ad46408dae5a366e744362782b9 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 7 Jun 2022 18:39:43 +0800 Subject: [PATCH 3/3] super group push --- internal/push/logic/push_to_client.go | 207 ++++++++++++++------------ 1 file changed, 111 insertions(+), 96 deletions(-) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 271bb4792..386c951e8 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -12,6 +12,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbCache "Open_IM/pkg/proto/cache" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" pbRtc "Open_IM/pkg/proto/rtc" @@ -138,102 +139,116 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { - return - //var wsResult []*pbRelay.SingelMsgToUserResultList - //isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) - //log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String()) - //if len(grpcCons) == 0 { - // log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ") - // grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) - //} - ////Online push message - //log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) - //for _, v := range grpcCons { - // msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) - // reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: pushMsg.PushToUserID}) - // if err != nil { - // log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err) - // continue - // } - // if reply != nil && reply.SinglePushResult != nil { - // wsResult = append(wsResult, reply.SinglePushResult...) - // } - //} - //log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) - //successCount++ - //if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { - // for _, v := range wsResult { - // if v.ResultCode == 0 { - // if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { - // break - // } - // continue - // } - // if utils.IsContainInt32(v.RecvPlatFormID, pushTerminal) { - // //Use offline push messaging - // var UIDList []string - // UIDList = append(UIDList, v.RecvID) - // customContent := OpenIMContent{ - // SessionType: int(pushMsg.MsgData.SessionType), - // From: pushMsg.MsgData.SendID, - // To: pushMsg.MsgData.RecvID, - // Seq: pushMsg.MsgData.Seq, - // } - // bCustomContent, _ := json.Marshal(customContent) - // jsonCustomContent := string(bCustomContent) - // var content string - // if pushMsg.MsgData.OfflinePushInfo != nil { - // content = pushMsg.MsgData.OfflinePushInfo.Title - // - // } else { - // switch pushMsg.MsgData.ContentType { - // case constant.Text: - // content = constant.ContentType2PushContent[constant.Text] - // case constant.Picture: - // content = constant.ContentType2PushContent[constant.Picture] - // case constant.Voice: - // content = constant.ContentType2PushContent[constant.Voice] - // case constant.Video: - // content = constant.ContentType2PushContent[constant.Video] - // case constant.File: - // content = constant.ContentType2PushContent[constant.File] - // case constant.AtText: - // a := AtContent{} - // _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) - // if utils.IsContain(v.RecvID, a.AtUserList) { - // content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - // } else { - // content = constant.ContentType2PushContent[constant.GroupMsg] - // } - // default: - // content = constant.ContentType2PushContent[constant.Common] - // } - // } - // callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData.OfflinePushInfo, v.RecvPlatFormID) - // log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") - // if callbackResp.ErrCode != 0 { - // log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) - // } - // if callbackResp.ActionCode != constant.ActionAllow { - // log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") - // break - // } - // - // if offlinePusher == nil { - // offlinePusher = jpush.JPushClient - // } - // pushResult, err := offlinePusher.Push(UIDList, content, jsonCustomContent, pushMsg.OperationID) - // if err != nil { - // log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) - // } else { - // log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) - // } - // break - // } - // - // } - // - //} + var wsResult []*pbRelay.SingelMsgToUserResultList + isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) + log.Debug(pushMsg.OperationID, "Get super group msg from msg_transfer And push msg", pushMsg.String()) + getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID} + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) + client := pbCache.NewCacheClient(etcdConn) + cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + if err != nil { + log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + return + } + if cacheResp.CommonResp.ErrCode != 0 { + log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + return + } + if len(grpcCons) == 0 { + log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ") + grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + } + //Online push message + log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) + for _, v := range grpcCons { + msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) + reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: cacheResp.UserIDList}) + if err != nil { + log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err) + continue + } + if reply != nil && reply.SinglePushResult != nil { + wsResult = append(wsResult, reply.SinglePushResult...) + } + } + log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) + successCount++ + if isOfflinePush { + var onlineSuccessUserIDList []string + onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID) + for _, v := range wsResult { + if v.OnlinePush && v.UserID != pushMsg.MsgData.SendID { + onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID) + } + } + onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, cacheResp.UserIDList) + //Use offline push messaging + customContent := OpenIMContent{ + SessionType: int(pushMsg.MsgData.SessionType), + From: pushMsg.MsgData.SendID, + To: pushMsg.MsgData.RecvID, + Seq: pushMsg.MsgData.Seq, + } + bCustomContent, _ := json.Marshal(customContent) + jsonCustomContent := string(bCustomContent) + var content string + if pushMsg.MsgData.OfflinePushInfo != nil { + content = pushMsg.MsgData.OfflinePushInfo.Title + + } else { + switch pushMsg.MsgData.ContentType { + case constant.Text: + content = constant.ContentType2PushContent[constant.Text] + case constant.Picture: + content = constant.ContentType2PushContent[constant.Picture] + case constant.Voice: + content = constant.ContentType2PushContent[constant.Voice] + case constant.Video: + content = constant.ContentType2PushContent[constant.Video] + case constant.File: + content = constant.ContentType2PushContent[constant.File] + case constant.AtText: + a := AtContent{} + _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) + if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) { + content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + } else { + content = constant.ContentType2PushContent[constant.GroupMsg] + } + case constant.SignalingNotification: + content = constant.ContentType2PushContent[constant.SignalMsg] + default: + content = constant.ContentType2PushContent[constant.Common] + + } + } + + callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList[0], pushMsg.MsgData) + log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") + if callbackResp.ErrCode != 0 { + log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) + } + if callbackResp.ActionCode != constant.ActionAllow { + log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") + return + } + if offlinePusher == nil { + return + } + opts, err := GetOfflinePushOpts(pushMsg) + if err != nil { + log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error()) + } + log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), onlineFailedUserIDList, content, jsonCustomContent, "opts:", opts) + pushResult, err := offlinePusher.Push(onlineFailedUserIDList, content, jsonCustomContent, pushMsg.OperationID, opts) + if err != nil { + log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) + } else { + log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) + } + + } + } func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) {