Merge branch 'superGroup' of github.com:OpenIMSDK/Open-IM-Server into superGroup

This commit is contained in:
wangchuxiao 2022-06-13 10:38:15 +08:00
commit 7d6701780c
6 changed files with 36 additions and 22 deletions

View File

@ -14,7 +14,6 @@ import (
commonPb "Open_IM/pkg/proto/sdk_ws" commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"

View File

@ -44,17 +44,23 @@ func (r *RPCServer) GetSingleUserMsgForPushPlatforms(operationID string, msgData
} }
func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) []*sdk_ws.MsgData { func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) []*sdk_ws.MsgData {
msgData.MsgDataList = nil
return []*sdk_ws.MsgData{msgData}
userConn := ws.getUserConn(pushToUserID, platformID) userConn := ws.getUserConn(pushToUserID, platformID)
if userConn == nil { if userConn == nil {
log.Debug(operationID, "userConn == nil")
return []*sdk_ws.MsgData{msgData} return []*sdk_ws.MsgData{msgData}
} }
if msgData.Seq <= userConn.PushedMaxSeq { if msgData.Seq <= userConn.PushedMaxSeq {
log.Debug(operationID, "msgData.Seq <= userConn.PushedMaxSeq", msgData.Seq, userConn.PushedMaxSeq)
return nil return nil
} }
msgList := r.GetSingleUserMsg(operationID, msgData.Seq, pushToUserID) msgList := r.GetSingleUserMsg(operationID, msgData.Seq, pushToUserID)
if msgList == nil { if msgList == nil {
log.Debug(operationID, "GetSingleUserMsg msgList == nil", msgData.Seq, userConn.PushedMaxSeq)
userConn.PushedMaxSeq = msgData.Seq userConn.PushedMaxSeq = msgData.Seq
return []*sdk_ws.MsgData{msgData} return []*sdk_ws.MsgData{msgData}
} }
@ -65,6 +71,7 @@ func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.
userConn.PushedMaxSeq = v.Seq userConn.PushedMaxSeq = v.Seq
} }
} }
log.Debug(operationID, "GetSingleUserMsg msgList len ", len(msgList), userConn.PushedMaxSeq)
return msgList return msgList
} }

View File

@ -205,20 +205,28 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
for k, _ := range userConnMap { for k, _ := range userConnMap {
platformList = append(platformList, k) platformList = append(platformList, k)
} }
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList) log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String())
needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList)
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms ", req.MsgData.Seq, v, platformList, len(needPushMapList)) log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList))
for platform, list := range needPushMapList { for platform, list := range needPushMapList {
if list != nil { if list != nil {
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms ", "userID: ", v, "platform: ", platform, "push msg num:", len(list)) log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:", len(list))
for _, v := range list { for _, v := range list {
log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String())
req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList))
} }
log.Debug(req.OperationID, "r.encodeWsData no string")
log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String())
log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String())
replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID)
if err != nil { if err != nil {
log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String())
continue continue
} }
log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len())
resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true tempT.OnlinePush = true
@ -250,7 +258,13 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
}, nil }, nil
} }
func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) { func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) {
msgBytes, _ := proto.Marshal(wsData) log.Debug(operationID, "encodeWsData begin", wsData.String())
msgBytes, err := proto.Marshal(wsData)
if err != nil {
log.NewError(operationID, "Marshal", err.Error())
return bytes.Buffer{}, utils.Wrap(err, "")
}
log.Debug(operationID, "encodeWsData begin", wsData.String())
mReply := Resp{ mReply := Resp{
ReqIdentifier: constant.WSPushMsg, ReqIdentifier: constant.WSPushMsg,
OperationID: operationID, OperationID: operationID,
@ -258,10 +272,10 @@ func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (by
} }
var replyBytes bytes.Buffer var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes) enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply) err = enc.Encode(mReply)
if err != nil { if err != nil {
log.NewError(operationID, "data encode err", err.Error()) log.NewError(operationID, "data encode err", err.Error())
return bytes.Buffer{}, err return bytes.Buffer{}, utils.Wrap(err, "")
} }
return replyBytes, nil return replyBytes, nil
} }

View File

@ -47,7 +47,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
} }
//Online push message //Online push message
log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
for _, v := range grpcCons { for _, v := range grpcCons {
msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v)
reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}}) reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}})

View File

@ -41,7 +41,7 @@ for ((i = 0; i < ${#service_source_root[*]}; i++)); do
echo "wait pid: " ${build_pid_array[i]} ${service_names[$i]} echo "wait pid: " ${build_pid_array[i]} ${service_names[$i]}
wait ${build_pid_array[i]} wait ${build_pid_array[i]}
stat=$? stat=$?
echo ${build_pid_array[i]} " " $stat echo ${service_names[$i]} "pid: " ${build_pid_array[i]} "stat: " $stat
if [ $stat == 0 ] if [ $stat == 0 ]
then then
echo -e "${GREEN_PREFIX}${service_names[$i]} successfully be built ${COLOR_SUFFIX}\n" echo -e "${GREEN_PREFIX}${service_names[$i]} successfully be built ${COLOR_SUFFIX}\n"

View File

@ -21,25 +21,19 @@ echo "==========================================================">>../logs/openI
echo "==========================================================">>../logs/openIM.log 2>&1 & echo "==========================================================">>../logs/openIM.log 2>&1 &
build_pid_array=() build_pid_array=()
idx=0
for i in ${need_to_start_server_shell[*]}; do for i in ${need_to_start_server_shell[*]}; do
chmod +x $i chmod +x $i
./$i & ./$i &
build_pid=$! build_pid=$!
build_pid_array[i]=$build_pid echo "build_pid " $build_pid
build_pid_array[idx]=$build_pid
let idx=idx+1
done done
echo "wait all start finish....." echo "wait all start finish....."
for i in ${need_to_start_server_shell[*]}; do exit 0
chmod +x $i
./$i &
if [ $? -ne 0 ]; then
exit -1
fi
done
success_num=0 success_num=0
for ((i = 0; i < ${#need_to_start_server_shell[*]}; i++)); do for ((i = 0; i < ${#need_to_start_server_shell[*]}; i++)); do
@ -49,11 +43,11 @@ for ((i = 0; i < ${#need_to_start_server_shell[*]}; i++)); do
echo ${build_pid_array[i]} " " $stat echo ${build_pid_array[i]} " " $stat
if [ $stat == 0 ] if [ $stat == 0 ]
then then
echo -e "${GREEN_PREFIX}${need_to_start_server_shell[$i]} successfully be built ${COLOR_SUFFIX}\n" # echo -e "${GREEN_PREFIX}${need_to_start_server_shell[$i]} successfully be built ${COLOR_SUFFIX}\n"
let success_num=$success_num+1 let success_num=$success_num+1
else else
echo -e "${RED_PREFIX}${need_to_start_server_shell[$i]} build failed ${COLOR_SUFFIX}\n" #echo -e "${RED_PREFIX}${need_to_start_server_shell[$i]} build failed ${COLOR_SUFFIX}\n"
exit -1 exit -1
fi fi
done done