From c2a45cdbeb83c4f430643d53dbecdffa051ac580 Mon Sep 17 00:00:00 2001
From: Gordon <1432970085@qq.com>
Date: Mon, 29 Nov 2021 16:26:57 +0800
Subject: [PATCH] get all node online user status

---
 internal/api/manage/management_user.go  | 71 +++++++++++++++++++++++++
 internal/msg_gateway/gate/rpc_server.go | 24 +++++++++
 internal/push/logic/push_to_client.go   |  3 +-
 pkg/common/constant/constant.go         |  3 ++
 4 files changed, 100 insertions(+), 1 deletion(-)

diff --git a/internal/api/manage/management_user.go b/internal/api/manage/management_user.go
index dce66a41e..19b25e8a0 100644
--- a/internal/api/manage/management_user.go
+++ b/internal/api/manage/management_user.go
@@ -8,9 +8,13 @@ package manage
 
 import (
 	"Open_IM/pkg/common/config"
+	"Open_IM/pkg/common/constant"
 	"Open_IM/pkg/common/log"
+	"Open_IM/pkg/common/token_verify"
 	"Open_IM/pkg/grpc-etcdv3/getcdv3"
+	pbRelay "Open_IM/pkg/proto/relay"
 	pbUser "Open_IM/pkg/proto/user"
+	"Open_IM/pkg/utils"
 	"context"
 	"github.com/gin-gonic/gin"
 	"net/http"
@@ -24,6 +28,10 @@ type paramsDeleteUsers struct {
 type paramsGetAllUsersUid struct {
 	OperationID string `json:"operationID" binding:"required"`
 }
+type paramsGetUsersOnlineStatus struct {
+	OperationID string   `json:"operationID" binding:"required"`
+	UserIDList  []string `json:"userIDList" binding:"required,lte=200"`
+}
 
 func DeleteUser(c *gin.Context) {
 	params := paramsDeleteUsers{}
@@ -80,3 +88,66 @@ func GetAllUsersUid(c *gin.Context) {
 	c.JSON(http.StatusOK, resp)
 
 }
+func GetUsersOnlineStatus(c *gin.Context) {
+	params := paramsGetUsersOnlineStatus{}
+	if err := c.BindJSON(&params); err != nil {
+		c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
+		return
+	}
+	claims, err := token_verify.ParseToken(c.Request.Header.Get("token"))
+	if err != nil {
+		log.ErrorByKv("parse token failed", params.OperationID, "err", err.Error())
+		c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": err.Error()})
+		return
+	}
+	if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
+		log.ErrorByKv(" Authentication failed", params.OperationID, "args", c)
+		c.JSON(http.StatusBadRequest, gin.H{"errCode": 402, "errMsg": "not authorized"})
+		return
+	}
+	req := &pbRelay.GetUsersOnlineStatusReq{
+		OperationID: params.OperationID,
+		UserIDList:  params.UserIDList,
+	}
+	var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
+	var respResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
+	flag := false
+	log.NewDebug(params.OperationID, "GetUsersOnlineStatus req come here", params.UserIDList)
+	grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
+	for _, v := range grpcCons {
+		client := pbRelay.NewOnlineMessageRelayServiceClient(v)
+		reply, err := client.GetUsersOnlineStatus(context.Background(), req)
+		if err != nil {
+			log.NewError(params.OperationID, "GetUsersOnlineStatus rpc  err", req.String(), err.Error())
+			continue
+		} else {
+			if reply.ErrCode == 0 {
+				wsResult = append(wsResult, reply.SuccessResult...)
+			}
+		}
+	}
+	log.NewDebug(params.OperationID, "call GetUsersOnlineStatus rpc server is success", wsResult)
+	//Online data merge of each node
+	for _, v1 := range params.UserIDList {
+		flag = false
+		temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
+		for _, v2 := range wsResult {
+			if v2.UserID == v1 {
+				flag = true
+				temp.UserID = v1
+				temp.Status = constant.OnlineStatus
+				temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, v2.DetailPlatformStatus...)
+			}
+
+		}
+		if !flag {
+			temp.UserID = v1
+			temp.Status = constant.OfflineStatus
+		}
+		respResult = append(respResult, temp)
+	}
+	log.NewDebug(params.OperationID, "Finished merged data", respResult)
+	resp := gin.H{"errCode": 0, "errMsg": "", "successResult": respResult}
+	c.JSON(http.StatusOK, resp)
+
+}
diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go
index f87e8a4ff..7a0632fad 100644
--- a/internal/msg_gateway/gate/rpc_server.go
+++ b/internal/msg_gateway/gate/rpc_server.go
@@ -122,7 +122,31 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
 		Resp: resp,
 	}, nil
 }
+func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
+	log.NewDebug(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
+	var UIDAndPID []string
+	var resp pbRelay.GetUsersOnlineStatusResp
+	for _, v1 := range req.UserIDList {
+		userIDList := genUidPlatformArray(v1)
+		temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
+		temp.UserID = v1
+		for _, v2 := range userIDList {
+			UIDAndPID = strings.Split(v2, " ")
+			if conn := ws.getUserConn(v2); conn != nil {
+				ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
+				ps.Platform = UIDAndPID[1]
+				ps.Status = constant.OnlineStatus
+				temp.Status = constant.OnlineStatus
+				temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
 
+			}
+		}
+		if temp.Status == constant.OnlineStatus {
+			resp.SuccessResult = append(resp.SuccessResult, temp)
+		}
+	}
+	return &resp, nil
+}
 func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
 	err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
 	if err != nil {
diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go
index 21916a68d..c2d986a59 100644
--- a/internal/push/logic/push_to_client.go
+++ b/internal/push/logic/push_to_client.go
@@ -48,8 +48,9 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
 		reply, err := msgClient.MsgToUser(context.Background(), sendPbData)
 		if err != nil {
 			log.InfoByKv("push data to client rpc err", sendPbData.OperationID, "err", err)
+			continue
 		}
-		if reply != nil && reply.Resp != nil && err == nil {
+		if reply != nil && reply.Resp != nil {
 			wsResult = append(wsResult, reply.Resp...)
 		}
 	}
diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go
index 00c73ff4e..39487fcd0 100644
--- a/pkg/common/constant/constant.go
+++ b/pkg/common/constant/constant.go
@@ -86,6 +86,9 @@ const (
 	WebAndOther = 3
 	//Pc端互斥,移动端互斥,但是web端可以同时在线
 	PcMobileAndWeb = 4
+
+	OnlineStatus  = "Online"
+	OfflineStatus = "Offline"
 )
 
 var ContentType2PushContent = map[int64]string{