msg status

This commit is contained in:
withchao 2023-02-13 10:33:54 +08:00
parent 217d6380f7
commit 52e393dc45
5 changed files with 65 additions and 75 deletions

View File

@ -1,46 +1,24 @@
package msg
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
pbMsg "Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"
goRedis "github.com/go-redis/redis/v8"
)
func (rpc *msgServer) SetSendMsgStatus(_ context.Context, req *pbMsg.SetSendMsgStatusReq) (resp *pbMsg.SetSendMsgStatusResp, err error) {
resp = &pbMsg.SetSendMsgStatusResp{}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String())
if err := db.DB.SetSendMsgStatus(req.Status, req.OperationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
resp.ErrCode = constant.ErrDatabase.ErrCode
resp.ErrMsg = err.Error()
return resp, nil
func (s *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) {
resp := &pbMsg.SetSendMsgStatusResp{}
if err := s.MsgInterface.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil {
return nil, err
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String())
return resp, nil
}
func (rpc *msgServer) GetSendMsgStatus(_ context.Context, req *pbMsg.GetSendMsgStatusReq) (resp *pbMsg.GetSendMsgStatusResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.String())
func (s *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (resp *pbMsg.GetSendMsgStatusResp, err error) {
resp = &pbMsg.GetSendMsgStatusResp{}
status, err := db.DB.GetSendMsgStatus(req.OperationID)
resp.Status, err = s.MsgInterface.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx))
if err != nil {
resp.Status = constant.MsgStatusNotExist
if err == goRedis.Nil {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), req.OperationID, "not exist")
return resp, nil
} else {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
resp.ErrMsg = err.Error()
resp.ErrCode = constant.ErrDB.ErrCode
return resp, nil
}
return nil, err
}
resp.Status = int32(status)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp.String())
return resp, nil
}

View File

@ -1,47 +1,37 @@
package msg
import (
commonDB "Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"
go_redis "github.com/go-redis/redis/v8"
)
func (rpc *msgServer) GetSuperGroupMsg(context context.Context, req *msg.GetSuperGroupMsgReq) (*msg.GetSuperGroupMsgResp, error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), req.String())
resp := new(msg.GetSuperGroupMsgResp)
redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.GroupID, []uint32{req.Seq}, req.OperationID)
if err != nil {
if err != go_redis.Nil {
promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList))
log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList)
} else {
log.Debug(req.OperationID, "get message from redis is nil", failedSeqList)
}
msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID)
if err1 != nil {
promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList))
log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error())
resp.ErrCode = 201
resp.ErrMsg = err.Error()
return resp, nil
} else {
promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList))
redisMsgList = append(redisMsgList, msgList...)
for _, m := range msgList {
resp.MsgData = m
}
}
} else {
promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList))
for _, m := range redisMsgList {
resp.MsgData = m
}
}
log.Debug(req.OperationID, utils.GetSelfFuncName(), resp.String())
return resp, nil
}
//func (rpc *msgServer) GetSuperGroupMsg(context context.Context, req *msg.GetSuperGroupMsgReq) (*msg.GetSuperGroupMsgResp, error) {
// log.Debug(req.OperationID, utils.GetSelfFuncName(), req.String())
// resp := new(msg.GetSuperGroupMsgResp)
// redisMsgList, failedSeqList, err := commonDB.DB.GetMessageListBySeq(req.GroupID, []uint32{req.Seq}, req.OperationID)
// if err != nil {
// if err != go_redis.Nil {
// promePkg.PromeAdd(promePkg.MsgPullFromRedisFailedCounter, len(failedSeqList))
// log.Error(req.OperationID, "get message from redis exception", err.Error(), failedSeqList)
// } else {
// log.Debug(req.OperationID, "get message from redis is nil", failedSeqList)
// }
// msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID)
// if err1 != nil {
// promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList))
// log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error())
// resp.ErrCode = 201
// resp.ErrMsg = err.Error()
// return resp, nil
// } else {
// promePkg.PromeAdd(promePkg.MsgPullFromMongoSuccessCounter, len(msgList))
// redisMsgList = append(redisMsgList, msgList...)
// for _, m := range msgList {
// resp.MsgData = m
// }
//
// }
// } else {
// promePkg.PromeAdd(promePkg.MsgPullFromRedisSuccessCounter, len(redisMsgList))
// for _, m := range redisMsgList {
// resp.MsgData = m
// }
// }
// log.Debug(req.OperationID, utils.GetSelfFuncName(), resp.String())
// return resp, nil
//}

View File

@ -5,6 +5,7 @@ import (
"Open_IM/internal/common/rpcserver"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
@ -23,6 +24,7 @@ import (
type msgServer struct {
*rpcserver.RpcServer
MsgInterface controller.MsgInterface
}
type deleteMsg struct {

16
internal/rpc/msg/utils.go Normal file
View File

@ -0,0 +1,16 @@
package msg
import (
"Open_IM/pkg/utils"
"github.com/go-redis/redis/v8"
"gorm.io/gorm"
)
func IsNotFound(err error) bool {
switch utils.Unwrap(err) {
case gorm.ErrRecordNotFound, redis.Nil:
return true
default:
return false
}
}

View File

@ -18,6 +18,10 @@ type MsgInterface interface {
DelMsgLogic(ctx context.Context, userID string, seqList []uint32) error
DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error)
ReplaceMsgToBlankByIndex(docID string, index int) (replaceMaxSeq uint32, err error)
// status
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error) // 不存在返回 constant.MsgStatusNotExist
}
type MsgDatabaseInterface interface {