diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 9bf13e57a..43bd240c4 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -39,15 +39,13 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { } func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) { operationID := utils.OperationIDGenerator() - for { - err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) - if err != nil { - log.Error(operationID, "TriggerCmd failed ", err.Error(), status) - continue - } - log.Debug(operationID, "TriggerCmd success", status) + err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) + if err != nil { + log.Error(operationID, "TriggerCmd failed ", err.Error(), status) return } + log.Debug(operationID, "TriggerCmd success", status) + } func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { var flag = 0 diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 8f7e7f427..16e4c5ab6 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -24,6 +24,9 @@ import ( "time" ) +//When the number of group members is greater than this value,Online users will be sent first,Guaranteed service availability +const GroupMemberNum = 500 + type MsgCallBackReq struct { SendID string `json:"sendID"` RecvID string `json:"recvID"` @@ -254,34 +257,32 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S default: } - onUserIDList, offUserIDList := getOnlineAndOfflineUserIDList(memberUserIDList, pb.OperationID) - log.Debug(pb.OperationID, onUserIDList, offUserIDList) + m := make(map[string][]string, 2) + if len(memberUserIDList) > GroupMemberNum { + getOnlineAndOfflineUserIDList(memberUserIDList, m, pb.OperationID) + log.Debug(pb.OperationID, m[constant.OnlineStatus], m[constant.OfflineStatus]) + } else { + m[constant.OnlineStatus] = memberUserIDList + } + //split parallel send var wg sync.WaitGroup var sendTag bool var split = 50 - remain := len(onUserIDList) % split - for i := 0; i < len(onUserIDList)/split; i++ { - wg.Add(1) - go rpc.sendMsgToGroup(onUserIDList[i*split:(i+1)*split], *pb, constant.OnlineStatus, &sendTag, &wg) + for k, v := range m { + remain := len(v) % split + for i := 0; i < len(v)/split; i++ { + wg.Add(1) + go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg) + } + if remain > 0 { + wg.Add(1) + go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg) + } } - if remain > 0 { - wg.Add(1) - go rpc.sendMsgToGroup(onUserIDList[split*(len(onUserIDList)/split):], *pb, constant.OnlineStatus, &sendTag, &wg) - } - wg.Wait() - remain = len(offUserIDList) % split - for i := 0; i < len(offUserIDList)/split; i++ { - wg.Add(1) - go rpc.sendMsgToGroup(offUserIDList[i*split:(i+1)*split], *pb, constant.OfflineStatus, &sendTag, &wg) - } - if remain > 0 { - wg.Add(1) - go rpc.sendMsgToGroup(offUserIDList[split*(len(offUserIDList)/split):], *pb, constant.OfflineStatus, &sendTag, &wg) - } - wg.Wait() wg.Add(1) - rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) + go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg) + wg.Wait() // callback if err := callbackAfterSendGroupMsg(pb); err != nil { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) @@ -695,7 +696,8 @@ func Notification(n *NotificationMsg) { log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), reply.ErrCode, reply.ErrMsg) } } -func getOnlineAndOfflineUserIDList(memberList []string, operationID string) (onllUserIDList []string, offlUserIDList []string) { +func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, operationID string) { + var onllUserIDList, offlUserIDList []string var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult req := &pbRelay.GetUsersOnlineStatusReq{} req.UserIDList = memberList @@ -731,7 +733,8 @@ func getOnlineAndOfflineUserIDList(memberList []string, operationID string) (onl offlUserIDList = append(offlUserIDList, v1) } } - return onllUserIDList, offlUserIDList + m[constant.OnlineStatus] = onllUserIDList + m[constant.OfflineStatus] = offlUserIDList } func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {