mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 02:16:16 +08:00
exception msgs
This commit is contained in:
parent
f6c6ea8706
commit
d18c7e2c52
@ -1,6 +1,8 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
|
||||||
@ -24,7 +26,7 @@ func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry, rdb redis.Universal
|
|||||||
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
|
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
|
||||||
_ = v.RegisterValidation("required_if", RequiredIf)
|
_ = v.RegisterValidation("required_if", RequiredIf)
|
||||||
}
|
}
|
||||||
log.Info("load config: ", config.Config)
|
log.ZInfo(context.Background(), "load config", config.Config)
|
||||||
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
|
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
|
||||||
if config.Config.Prometheus.Enable {
|
if config.Config.Prometheus.Enable {
|
||||||
prome.NewApiRequestCounter()
|
prome.NewApiRequestCounter()
|
||||||
|
@ -444,8 +444,8 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq
|
|||||||
return seqMsgs, unExistSeqs, nil
|
return seqMsgs, unExistSeqs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, allSeqs []int64, begin, end, num int64) (seqMsgs []*sdkws.MsgData, err error) {
|
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
|
||||||
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end, "num", num)
|
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
|
||||||
m := db.msg.GetDocIDSeqsMap(conversationID, allSeqs)
|
m := db.msg.GetDocIDSeqsMap(conversationID, allSeqs)
|
||||||
var totalNotExistSeqs []int64
|
var totalNotExistSeqs []int64
|
||||||
// mongo index
|
// mongo index
|
||||||
@ -460,21 +460,25 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
|
|||||||
}
|
}
|
||||||
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs)
|
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs)
|
||||||
// find by next doc
|
// find by next doc
|
||||||
|
var missedSeqs []int64
|
||||||
if len(totalNotExistSeqs) > 0 {
|
if len(totalNotExistSeqs) > 0 {
|
||||||
m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
|
m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
|
||||||
for docID, seqs := range m {
|
for docID, seqs := range m {
|
||||||
docID = db.msg.ToNextDoc(docID)
|
docID = db.msg.ToNextDoc(docID)
|
||||||
msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
|
msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
missedSeqs = append(missedSeqs, seqs...)
|
||||||
log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
|
log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
missedSeqs = append(missedSeqs, unExistSeqs...)
|
||||||
seqMsgs = append(seqMsgs, msgs...)
|
seqMsgs = append(seqMsgs, msgs...)
|
||||||
if len(unExistSeqs) > 0 {
|
if len(unExistSeqs) > 0 {
|
||||||
log.ZWarn(ctx, "some seqs lost in mongo", err, "docID", docID, "seqs", seqs, "unExistSeqs", unExistSeqs)
|
log.ZWarn(ctx, "some seqs lost in mongo", err, "docID", docID, "seqs", seqs, "unExistSeqs", unExistSeqs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(missedSeqs)...)
|
||||||
var delSeqs []int64
|
var delSeqs []int64
|
||||||
for _, msg := range seqMsgs {
|
for _, msg := range seqMsgs {
|
||||||
if msg.Status == constant.MsgDeleted {
|
if msg.Status == constant.MsgDeleted {
|
||||||
@ -515,7 +519,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation
|
|||||||
// get from cache or db
|
// get from cache or db
|
||||||
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
|
||||||
if len(failedSeqs) > 0 {
|
if len(failedSeqs) > 0 {
|
||||||
mongoMsgs, err := db.getMsgBySeqsRange(ctx, conversationID, failedSeqs, begin, end, num-int64(len(successMsgs)))
|
mongoMsgs, err := db.getMsgBySeqsRange(ctx, conversationID, failedSeqs, begin, end)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user