mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
refactor: lower the level of code nesting (#1396)
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
This commit is contained in:
parent
05ab3fcd06
commit
7502b4ac0f
@ -17,22 +17,19 @@ package msggateway
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/msggateway"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
|
||||
)
|
||||
|
||||
@ -41,6 +38,7 @@ func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, serve
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgModel := cache.NewMsgCacheModel(rdb)
|
||||
s.LongConnServer.SetDiscoveryRegistry(disCov)
|
||||
s.LongConnServer.SetCacheHandler(msgModel)
|
||||
@ -97,22 +95,25 @@ func (s *Server) GetUsersOnlineStatus(
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
|
||||
temp.UserID = userID
|
||||
|
||||
uresp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
|
||||
uresp.UserID = userID
|
||||
for _, client := range clients {
|
||||
if client != nil {
|
||||
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
|
||||
ps.Platform = constant.PlatformIDToName(client.PlatformID)
|
||||
ps.Status = constant.OnlineStatus
|
||||
ps.ConnID = client.ctx.GetConnID()
|
||||
ps.Token = client.token
|
||||
ps.IsBackground = client.IsBackground
|
||||
temp.Status = constant.OnlineStatus
|
||||
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
|
||||
ps.Platform = constant.PlatformIDToName(client.PlatformID)
|
||||
ps.Status = constant.OnlineStatus
|
||||
ps.ConnID = client.ctx.GetConnID()
|
||||
ps.Token = client.token
|
||||
ps.IsBackground = client.IsBackground
|
||||
uresp.Status = constant.OnlineStatus
|
||||
uresp.DetailPlatformStatus = append(uresp.DetailPlatformStatus, ps)
|
||||
}
|
||||
if temp.Status == constant.OnlineStatus {
|
||||
resp.SuccessResult = append(resp.SuccessResult, temp)
|
||||
if uresp.Status == constant.OnlineStatus {
|
||||
resp.SuccessResult = append(resp.SuccessResult, uresp)
|
||||
}
|
||||
}
|
||||
return &resp, nil
|
||||
@ -129,50 +130,55 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(
|
||||
ctx context.Context,
|
||||
req *msggateway.OnlineBatchPushOneMsgReq,
|
||||
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
|
||||
var singleUserResult []*msggateway.SingleMsgToUserResults
|
||||
|
||||
var singleUserResults []*msggateway.SingleMsgToUserResults
|
||||
|
||||
for _, v := range req.PushToUserIDs {
|
||||
var resp []*msggateway.SingleMsgToUserPlatform
|
||||
tempT := &msggateway.SingleMsgToUserResults{
|
||||
results := &msggateway.SingleMsgToUserResults{
|
||||
UserID: v,
|
||||
}
|
||||
clients, ok := s.LongConnServer.GetUserAllCons(v)
|
||||
if !ok {
|
||||
log.ZDebug(ctx, "push user not online", "userID", v)
|
||||
tempT.Resp = resp
|
||||
singleUserResult = append(singleUserResult, tempT)
|
||||
results.Resp = resp
|
||||
singleUserResults = append(singleUserResults, results)
|
||||
continue
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
|
||||
for _, client := range clients {
|
||||
if client != nil {
|
||||
temp := &msggateway.SingleMsgToUserPlatform{
|
||||
RecvID: v,
|
||||
RecvPlatFormID: int32(client.PlatformID),
|
||||
}
|
||||
if !client.IsBackground ||
|
||||
(client.IsBackground == true && client.PlatformID != constant.IOSPlatformID) {
|
||||
err := client.PushMessage(ctx, req.MsgData)
|
||||
if err != nil {
|
||||
temp.ResultCode = -2
|
||||
resp = append(resp, temp)
|
||||
} else {
|
||||
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
|
||||
tempT.OnlinePush = true
|
||||
resp = append(resp, temp)
|
||||
}
|
||||
}
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
userPlatform := &msggateway.SingleMsgToUserPlatform{
|
||||
RecvID: v,
|
||||
RecvPlatFormID: int32(client.PlatformID),
|
||||
}
|
||||
if !client.IsBackground ||
|
||||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
||||
err := client.PushMessage(ctx, req.MsgData)
|
||||
if err != nil {
|
||||
userPlatform.ResultCode = -2
|
||||
resp = append(resp, userPlatform)
|
||||
} else {
|
||||
temp.ResultCode = -3
|
||||
resp = append(resp, temp)
|
||||
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
|
||||
results.OnlinePush = true
|
||||
resp = append(resp, userPlatform)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
userPlatform.ResultCode = -3
|
||||
resp = append(resp, userPlatform)
|
||||
}
|
||||
}
|
||||
tempT.Resp = resp
|
||||
singleUserResult = append(singleUserResult, tempT)
|
||||
results.Resp = resp
|
||||
singleUserResults = append(singleUserResults, results)
|
||||
}
|
||||
|
||||
return &msggateway.OnlineBatchPushOneMsgResp{
|
||||
SinglePushResult: singleUserResult,
|
||||
SinglePushResult: singleUserResults,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -181,17 +187,21 @@ func (s *Server) KickUserOffline(
|
||||
req *msggateway.KickUserOfflineReq,
|
||||
) (*msggateway.KickUserOfflineResp, error) {
|
||||
for _, v := range req.KickUserIDList {
|
||||
if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok {
|
||||
for _, client := range clients {
|
||||
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
|
||||
if err := client.longConnServer.KickUserConn(client); err != nil {
|
||||
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
|
||||
if !ok {
|
||||
log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, client := range clients {
|
||||
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
|
||||
if err := client.longConnServer.KickUserConn(client); err != nil {
|
||||
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return &msggateway.KickUserOfflineResp{}, nil
|
||||
}
|
||||
|
||||
|
@ -57,12 +57,6 @@ type LongConnServer interface {
|
||||
MessageHandler
|
||||
}
|
||||
|
||||
var bufferPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 1024)
|
||||
},
|
||||
}
|
||||
|
||||
type WsServer struct {
|
||||
port int
|
||||
wsMaxConnNum int64
|
||||
@ -374,7 +368,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
|
||||
}
|
||||
ws.onlineUserConnNum.Add(-1)
|
||||
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
|
||||
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num",
|
||||
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num",
|
||||
ws.onlineUserConnNum.Load(),
|
||||
)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user