diff --git a/internal/rpc/msg/del_msg.go b/internal/rpc/msg/del_msg.go index 17610a384..c6856bb98 100644 --- a/internal/rpc/msg/del_msg.go +++ b/internal/rpc/msg/del_msg.go @@ -2,23 +2,28 @@ package msg import ( "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" commonPb "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" + "time" ) func (rpc *rpcChat) DelMsgList(_ context.Context, req *commonPb.DelMsgListReq) (*commonPb.DelMsgListResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) resp := &commonPb.DelMsgListResp{} - if err := db.DB.DelMsgBySeqList(req.UserID, req.SeqList, req.OperationID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelMsg failed", err.Error()) - resp.ErrMsg = constant.ErrDB.ErrMsg - resp.ErrCode = constant.ErrDB.ErrCode + select { + case rpc.delMsgCh <- deleteMsg{ + UserID: req.UserID, + OpUserID: req.OpUserID, + SeqList: req.SeqList, + OperationID: req.OperationID, + }: + case <-time.After(1 * time.Second): + resp.ErrCode = constant.ErrSendLimit.ErrCode + resp.ErrMsg = constant.ErrSendLimit.ErrMsg return resp, nil } - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String()) return resp, nil } diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 002918bae..fd8b5ccf5 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -3,6 +3,7 @@ package msg import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -21,6 +22,14 @@ type rpcChat struct { etcdAddr []string onlineProducer *kafka.Producer offlineProducer *kafka.Producer + delMsgCh chan deleteMsg +} + +type deleteMsg struct { + UserID string + OpUserID string + SeqList []uint32 + OperationID string } func NewRpcChatServer(port int) *rpcChat { @@ -33,6 +42,7 @@ func NewRpcChatServer(port int) *rpcChat { } rc.onlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic) + rc.delMsgCh = make(chan deleteMsg, 1000) return &rc } @@ -75,3 +85,15 @@ func (rpc *rpcChat) Run() { } log.Info("", "rpc rpcChat init success") } + +func (rpc *rpcChat) runCh() { + log.NewInfo("", "start del msg chan ") + for { + select { + case msg := <-rpc.delMsgCh: + if err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID); err != nil { + log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList qrgs: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error()) + } + } + } +} diff --git a/pkg/common/constant/error.go b/pkg/common/constant/error.go index 2537c3706..b0c28725e 100644 --- a/pkg/common/constant/error.go +++ b/pkg/common/constant/error.go @@ -54,7 +54,7 @@ var ( ErrArgs = ErrInfo{ErrCode: 803, ErrMsg: ArgsMsg.Error()} ErrStatus = ErrInfo{ErrCode: 804, ErrMsg: StatusMsg.Error()} ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()} - ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many users"} + ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"} ) var (