diff --git a/internal/rpc/msg/msg_status.go b/internal/rpc/msg/msg_status.go index 6a38def31..fe05f5194 100644 --- a/internal/rpc/msg/msg_status.go +++ b/internal/rpc/msg/msg_status.go @@ -1,46 +1,24 @@ package msg import ( - "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" + "Open_IM/pkg/common/tracelog" pbMsg "Open_IM/pkg/proto/msg" - "Open_IM/pkg/utils" "context" - - goRedis "github.com/go-redis/redis/v8" ) -func (rpc *msgServer) SetSendMsgStatus(_ context.Context, req *pbMsg.SetSendMsgStatusReq) (resp *pbMsg.SetSendMsgStatusResp, err error) { - resp = &pbMsg.SetSendMsgStatusResp{} - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String()) - if err := db.DB.SetSendMsgStatus(req.Status, req.OperationID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) - resp.ErrCode = constant.ErrDatabase.ErrCode - resp.ErrMsg = err.Error() - return resp, nil +func (s *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) { + resp := &pbMsg.SetSendMsgStatusResp{} + if err := s.MsgInterface.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil { + return nil, err } - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String()) return resp, nil } -func (rpc *msgServer) GetSendMsgStatus(_ context.Context, req *pbMsg.GetSendMsgStatusReq) (resp *pbMsg.GetSendMsgStatusResp, err error) { - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String()) +func (s *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (resp *pbMsg.GetSendMsgStatusResp, err error) { resp = &pbMsg.GetSendMsgStatusResp{} - status, err := db.DB.GetSendMsgStatus(req.OperationID) + resp.Status, err = s.MsgInterface.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx)) if err != nil { - resp.Status = constant.MsgStatusNotExist - if err == goRedis.Nil { - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.OperationID, "not exist") - return resp, nil - } else { - log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error()) - resp.ErrMsg = err.Error() - resp.ErrCode = constant.ErrDB.ErrCode - return resp, nil - } + return nil, err } - resp.Status = int32(status) - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String()) return resp, nil } diff --git a/internal/rpc/msg/query_msg.go b/internal/rpc/msg/query_msg.go index e0f763f92..66f2f0810 100644 --- a/internal/rpc/msg/query_msg.go +++ b/internal/rpc/msg/query_msg.go @@ -1,47 +1,37 @@ package msg -import ( - commonDB "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" - promePkg "Open_IM/pkg/common/prometheus" - "Open_IM/pkg/proto/msg" - "Open_IM/pkg/utils" - "context" - go_redis "github.com/go-redis/redis/v8" -) - -func (rpc *msgServer) GetSuperGroupMsg(context context.Context, req *msg.GetSuperGroupMsgReq) (*msg.GetSuperGroupMsgResp, error) { - log.Debug(req.OperationID, utils.GetSelfFuncName(), req.String()) - resp := new(msg.GetSuperGroupMsgResp) - redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.GroupID, []uint32{req.Seq}, req.OperationID) - if err != nil { - if err != go_redis.Nil { - promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) - log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList) - } else { - log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) - } - msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID) - if err1 != nil { - promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) - log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error()) - resp.ErrCode = 201 - resp.ErrMsg = err.Error() - return resp, nil - } else { - promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) - redisMsgList = append(redisMsgList, msgList...) - for _, m := range msgList { - resp.MsgData = m - } - - } - } else { - promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) - for _, m := range redisMsgList { - resp.MsgData = m - } - } - log.Debug(req.OperationID, utils.GetSelfFuncName(), resp.String()) - return resp, nil -} +//func (rpc *msgServer) GetSuperGroupMsg(context context.Context, req *msg.GetSuperGroupMsgReq) (*msg.GetSuperGroupMsgResp, error) { +// log.Debug(req.OperationID, utils.GetSelfFuncName(), req.String()) +// resp := new(msg.GetSuperGroupMsgResp) +// redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.GroupID, []uint32{req.Seq}, req.OperationID) +// if err != nil { +// if err != go_redis.Nil { +// promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList)) +// log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList) +// } else { +// log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) +// } +// msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID) +// if err1 != nil { +// promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) +// log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error()) +// resp.ErrCode = 201 +// resp.ErrMsg = err.Error() +// return resp, nil +// } else { +// promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList)) +// redisMsgList = append(redisMsgList, msgList...) +// for _, m := range msgList { +// resp.MsgData = m +// } +// +// } +// } else { +// promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList)) +// for _, m := range redisMsgList { +// resp.MsgData = m +// } +// } +// log.Debug(req.OperationID, utils.GetSelfFuncName(), resp.String()) +// return resp, nil +//} diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index ea2187127..b62822cf0 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -5,6 +5,7 @@ import ( "Open_IM/internal/common/rpcserver" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" @@ -23,6 +24,7 @@ import ( type msgServer struct { *rpcserver.RpcServer + MsgInterface controller.MsgInterface } type deleteMsg struct { diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go new file mode 100644 index 000000000..fd0f4e9b3 --- /dev/null +++ b/internal/rpc/msg/utils.go @@ -0,0 +1,16 @@ +package msg + +import ( + "Open_IM/pkg/utils" + "github.com/go-redis/redis/v8" + "gorm.io/gorm" +) + +func IsNotFound(err error) bool { + switch utils.Unwrap(err) { + case gorm.ErrRecordNotFound, redis.Nil: + return true + default: + return false + } +} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 15cbe872a..fe8bf604f 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -18,6 +18,10 @@ type MsgInterface interface { DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error) ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error) + + // status + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) // 不存在返回 constant.MsgStatusNotExist } type MsgDatabaseInterface interface {