Compare commits

...

3 Commits

Author SHA1 Message Date
wsy6543
4398afe207
Merge d2409dff07d0418817677e7e006aac22cc17d896 into a0e6d9aa69f8a49852c1d82a2dc13221da7ad540 2026-01-19 19:53:46 +07:00
chao
a0e6d9aa69
fix: Mongo Malloc upsert overwrites min_seq initialization (#3657)
* fix: performance issues with Kafka caused by encapsulating the MQ interface

* fix: admin token in standalone mode

* fix: full id version

* fix: resolve deadlock in cache eviction and improve GetBatch implementation

* refactor: replace LongConn with ClientConn interface and simplify message handling

* refactor: replace LongConn with ClientConn interface and simplify message handling

* fix: seq use $setOnInsert for min_seq in conversation update
2026-01-15 06:24:17 +00:00
wushengyu
d2409dff07 fix GetMaxSeqs 2025-09-11 11:12:19 +08:00
2 changed files with 4 additions and 4 deletions

View File

@ -32,7 +32,7 @@ func (x *seqConversationCache) GetMinSeq(ctx context.Context, conversationID str
func (x *seqConversationCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
res := make(map[string]int64)
for _, conversationID := range conversationIDs {
seq, err := x.GetMinSeq(ctx, conversationID)
seq, err := x.GetMaxSeq(ctx, conversationID)
if err != nil {
return nil, err
}
@ -44,7 +44,7 @@ func (x *seqConversationCache) GetMaxSeqs(ctx context.Context, conversationIDs [
func (x *seqConversationCache) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
res := make(map[string]database.SeqTime)
for _, conversationID := range conversationIDs {
seq, err := x.GetMinSeq(ctx, conversationID)
seq, err := x.GetMaxSeq(ctx, conversationID)
if err != nil {
return nil, err
}

View File

@ -57,8 +57,8 @@ func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string
}
filter := map[string]any{"conversation_id": conversationID}
update := map[string]any{
"$inc": map[string]any{"max_seq": size},
"$set": map[string]any{"min_seq": int64(0)},
"$inc": map[string]any{"max_seq": size},
"$setOnInsert": map[string]any{"min_seq": int64(0)},
}
opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1})
lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt)