diff --git a/internal/api/office/tag.go b/internal/api/office/tag.go index c187ecef5..02278f60e 100644 --- a/internal/api/office/tag.go +++ b/internal/api/office/tag.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/utils" "context" "github.com/gin-gonic/gin" + "google.golang.org/grpc" "net/http" "strings" ) @@ -260,7 +261,8 @@ func GetTagSendLogs(c *gin.Context) { } etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfficeName) client := pbOffice.NewOfficeServiceClient(etcdConn) - respPb, err := client.GetTagSendLogs(context.Background(), &reqPb) + maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20) + respPb, err := client.GetTagSendLogs(context.Background(), &reqPb, maxSizeOption) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetTagSendLogs failed", err.Error()) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetTagSendLogs rpc server failed" + err.Error()}) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 5cf682b84..60c97f1de 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -19,6 +19,8 @@ var ( sendMsgFailedCount uint64 sendMsgSuccessCount uint64 userCount uint64 + + sendMsgAllCountLock sync.RWMutex ) func Init(rpcPort, wsPort int) { diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 51955f0fe..5644f1cb3 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -143,7 +143,9 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { + sendMsgAllCountLock.Lock() sendMsgAllCount++ + sendMsgAllCountLock.Unlock() log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) nReply := new(pbChat.SendMsgResp) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 1cade65eb..753fca492 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -28,15 +28,18 @@ type officeServer struct { rpcRegisterName string etcdSchema string etcdAddr []string + ch chan tagSendStruct } func NewOfficeServer(port int) *officeServer { log.NewPrivateLog(constant.LogFileName) + ch := make(chan tagSendStruct, 100000) return &officeServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, + ch: ch, } } @@ -80,6 +83,7 @@ func (s *officeServer) Run() { log.NewError("0", "RegisterEtcd failed ", err.Error()) return } + go s.sendTagMsgRoutine() err = srv.Serve(listener) if err != nil { log.NewError("0", "Serve failed ", err.Error()) @@ -88,6 +92,25 @@ func (s *officeServer) Run() { log.NewInfo("0", "message cms rpc success") } +type tagSendStruct struct { + operationID string + user *db.User + userID string + content string + senderPlatformID int32 +} + +func (s *officeServer) sendTagMsgRoutine() { + log.NewInfo("", utils.GetSelfFuncName(), "start") + for { + select { + case v := <-s.ch: + msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) + time.Sleep(time.Millisecond * 100) + } + } +} + func (s *officeServer) GetUserTags(_ context.Context, req *pbOffice.GetUserTagsReq) (resp *pbOffice.GetUserTagsResp, err error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req ", req.String()) resp = &pbOffice.GetUserTagsResp{ @@ -228,30 +251,46 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR resp.CommonResp.ErrCode = constant.ErrDB.ErrCode return resp, nil } - var wg sync.WaitGroup - wg.Add(len(userIDList)) + var successUserIDList []string for _, userID := range userIDList { - go func(userID string) { - defer wg.Done() - msg.TagSendMessage(req.OperationID, user, userID, req.Content, req.SenderPlatformID) - }(userID) + t := tagSendStruct{ + operationID: req.OperationID, + user: user, + userID: userID, + content: req.Content, + senderPlatformID: req.SenderPlatformID, + } + select { + case s.ch <- t: + log.NewDebug(t.operationID, utils.GetSelfFuncName(), "msg: ", t, "send success") + successUserIDList = append(successUserIDList, userID) + // if channel is full, return grpc req + case <-time.After(1 * time.Second): + log.NewError(t.operationID, utils.GetSelfFuncName(), s.ch, "channel is full") + resp.CommonResp.ErrCode = constant.ErrSendLimit.ErrCode + resp.CommonResp.ErrMsg = constant.ErrSendLimit.ErrMsg + return resp, nil + } } - wg.Wait() - var tagSendLogs db.TagSendLog - wg.Add(len(userIDList)) - for _, userID := range userIDList { + var tagSendLogs db.TagSendLog + var wg sync.WaitGroup + wg.Add(len(successUserIDList)) + var lock sync.Mutex + for _, userID := range successUserIDList { go func(userID string) { defer wg.Done() userName, err := im_mysql_model.GetUserNameByUserID(userID) if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error()) + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error(), userID) return } + lock.Lock() tagSendLogs.UserList = append(tagSendLogs.UserList, db.TagUser{ UserID: userID, UserName: userName, }) + lock.Unlock() }(userID) } wg.Wait() diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 621bf9eb1..66a4439e3 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -273,4 +273,4 @@ const BigVersion = "v3" const LogFileName = "OpenIM.log" -const StatisticsTimeInterval = 300 +const StatisticsTimeInterval = 60