This commit is contained in:
withchao 2024-06-21 18:24:30 +08:00
parent 2d2941a526
commit 8e3890ed88
5 changed files with 130 additions and 27 deletions

View File

@ -75,21 +75,43 @@ func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s st
return m, nil return m, nil
} }
func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { //func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) // return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
} //}
//
func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { //func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) // return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey)
} //}
//
func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { //func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMaxSeqKey) // return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
} //}
//
func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { //func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) // return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
} //}
//
//func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
// for conversationID, seq := range seqs {
// if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil {
// return errs.Wrap(err)
// }
// }
// return nil
//}
//
//
//func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
// return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
//}
//
//func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
// return c.getSeq(ctx, conversationID, c.getMinSeqKey)
//}
//
//func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
// return c.setSeqs(ctx, seqs, c.getMinSeqKey)
//}
func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
for conversationID, seq := range seqs { for conversationID, seq := range seqs {
@ -100,18 +122,6 @@ func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey fu
return nil return nil
} }
func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return c.getSeq(ctx, conversationID, c.getMinSeqKey)
}
func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()
if err != nil { if err != nil {

View File

@ -0,0 +1,71 @@
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) {
coll := db.Collection(database.SeqConversationName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "user_id", Value: 1},
{Key: "conversation_id", Value: 1},
},
})
if err != nil {
return nil, err
}
return &seqUserMongo{coll: coll}, nil
}
type seqUserMongo struct {
coll *mongo.Collection
}
func (s *seqUserMongo) setSeq(ctx context.Context, userID string, conversationID string, seq int64, field string) error {
filter := map[string]any{
"user_id": userID,
"conversation_id": conversationID,
}
update := map[string]any{
"$set": map[string]any{"field": int64(0)},
}
opt := options.Update().SetUpsert(true)
return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt)
}
func (s *seqUserMongo) GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
//TODO implement me
panic("implement me")
}
func (s *seqUserMongo) SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error {
//TODO implement me
panic("implement me")
}
func (s *seqUserMongo) GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
//TODO implement me
panic("implement me")
}
func (s *seqUserMongo) SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error {
//TODO implement me
panic("implement me")
}
func (s *seqUserMongo) GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) {
//TODO implement me
panic("implement me")
}
func (s *seqUserMongo) SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error {
//TODO implement me
panic("implement me")
}

View File

@ -15,4 +15,5 @@ const (
ObjectName = "s3" ObjectName = "s3"
UserName = "user" UserName = "user"
SeqConversationName = "seq" SeqConversationName = "seq"
SeqUserName = "seq_user"
) )

View File

@ -0,0 +1,12 @@
package database
import "context"
type SeqUser interface {
GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error)
SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error
GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error
GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error
}

View File

@ -0,0 +1,9 @@
package model
type SeqUser struct {
UserID string `bson:"user_id"`
ConversationID string `bson:"conversation_id"`
MinSeq int64 `bson:"min_seq"`
MaxSeq int64 `bson:"max_seq"`
ReadSeq int64 `bson:"read_seq"`
}