From 888c837cddfe12ad7f00bff4aa8c67be8e807c2d Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 15:10:44 +0800 Subject: [PATCH] office tag async send --- internal/rpc/office/office.go | 53 ++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 91b8ab669..016e6278f 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -28,15 +28,18 @@ type officeServer struct { rpcRegisterName string etcdSchema string etcdAddr []string + ch chan tagSendStruct } func NewOfficeServer(port int) *officeServer { log.NewPrivateLog(constant.LogFileName) + ch := make(chan tagSendStruct, 10000) return &officeServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, + ch: ch, } } @@ -80,6 +83,7 @@ func (s *officeServer) Run() { log.NewError("0", "RegisterEtcd failed ", err.Error()) return } + go s.sendTagMsgRoutine() err = srv.Serve(listener) if err != nil { log.NewError("0", "Serve failed ", err.Error()) @@ -88,6 +92,23 @@ func (s *officeServer) Run() { log.NewInfo("0", "message cms rpc success") } +type tagSendStruct struct { + operationID string + user *db.User + userID string + content string + senderPlatformID int32 +} + +func (s *officeServer) sendTagMsgRoutine() { + log.NewInfo("", utils.GetSelfFuncName(), "start") + select { + case v := <-s.ch: + msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) + time.Sleep(time.Millisecond * 500) + } +} + func (s *officeServer) GetUserTags(_ context.Context, req *pbOffice.GetUserTagsReq) (resp *pbOffice.GetUserTagsResp, err error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req ", req.String()) resp = &pbOffice.GetUserTagsResp{ @@ -228,17 +249,29 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR resp.CommonResp.ErrCode = constant.ErrDB.ErrCode return resp, nil } - var wg sync.WaitGroup - wg.Add(len(userIDList)) - for _, userID := range userIDList { - go func(userID string) { - defer wg.Done() - msg.TagSendMessage(req.OperationID, user, userID, req.Content, req.SenderPlatformID) - }(userID) - } - wg.Wait() - var tagSendLogs db.TagSendLog + for _, userID := range userIDList { + t := tagSendStruct{ + operationID: req.OperationID, + user: user, + userID: userID, + content: req.Content, + senderPlatformID: 0, + } + select { + case s.ch <- t: + log.NewDebug(t.operationID, utils.GetSelfFuncName(), "msg: ", t, "send success") + // if channel is full, return grpc req + case <-time.After(1 * time.Second): + log.NewError(t.operationID, utils.GetSelfFuncName(), s.ch, "channel is full") + resp.CommonResp.ErrCode = constant.ErrSendLimit.ErrCode + resp.CommonResp.ErrMsg = constant.ErrSendLimit.ErrMsg + return resp, nil + } + } + + var tagSendLogs db.TagSendLog + var wg sync.WaitGroup wg.Add(len(userIDList)) var lock sync.Mutex for _, userID := range userIDList {