mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	new mongo
This commit is contained in:
		
							parent
							
								
									c6942f07da
								
							
						
					
					
						commit
						82c6b005e9
					
				
							
								
								
									
										4
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.mod
									
									
									
									
									
								
							@ -14,8 +14,8 @@ require (
 | 
				
			|||||||
	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
 | 
						github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
 | 
				
			||||||
	github.com/mitchellh/mapstructure v1.5.0
 | 
						github.com/mitchellh/mapstructure v1.5.0
 | 
				
			||||||
	github.com/openimsdk/protocol v0.0.65
 | 
						github.com/openimsdk/protocol v0.0.65
 | 
				
			||||||
	github.com/openimsdk/tools v0.0.49-alpha.19
 | 
						github.com/openimsdk/tools v0.0.49-alpha.23
 | 
				
			||||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
						github.com/pkg/errors v0.9.1
 | 
				
			||||||
	github.com/prometheus/client_golang v1.18.0
 | 
						github.com/prometheus/client_golang v1.18.0
 | 
				
			||||||
	github.com/stretchr/testify v1.9.0
 | 
						github.com/stretchr/testify v1.9.0
 | 
				
			||||||
	go.mongodb.org/mongo-driver v1.14.0
 | 
						go.mongodb.org/mongo-driver v1.14.0
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -288,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
 | 
				
			|||||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
					github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
 | 
					github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
					github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.23 h1:/KkJ7vfx8FAoJhq3veH9PWnxbSkEf+dTSshvDrHBR38=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.23/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
 | 
				
			||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
					github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
				
			||||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 | 
					github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 | 
				
			||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
 | 
					github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										250
									
								
								pkg/common/db/dataver/common.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										250
									
								
								pkg/common/db/dataver/common.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,250 @@
 | 
				
			|||||||
 | 
					package dataver
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/errs"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/utils/datautil"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						FirstVersion         = 1
 | 
				
			||||||
 | 
						DefaultDeleteVersion = 0
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type WriteLog struct {
 | 
				
			||||||
 | 
						DID        string    `bson:"d_id"`
 | 
				
			||||||
 | 
						Logs       []Elem    `bson:"logs"`
 | 
				
			||||||
 | 
						Version    uint      `bson:"version"`
 | 
				
			||||||
 | 
						Deleted    uint      `bson:"deleted"`
 | 
				
			||||||
 | 
						LastUpdate time.Time `bson:"last_update"`
 | 
				
			||||||
 | 
						LogLen     int       `bson:"log_len"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Elem struct {
 | 
				
			||||||
 | 
						EID        string    `bson:"e_id"`
 | 
				
			||||||
 | 
						Deleted    bool      `bson:"deleted"`
 | 
				
			||||||
 | 
						Version    uint      `bson:"version"`
 | 
				
			||||||
 | 
						LastUpdate time.Time `bson:"last_update"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type DataLog interface {
 | 
				
			||||||
 | 
						WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error
 | 
				
			||||||
 | 
						FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLog, error)
 | 
				
			||||||
 | 
						DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewDataLog(coll *mongo.Collection) (DataLog, error) {
 | 
				
			||||||
 | 
						lm := &logModel{coll: coll}
 | 
				
			||||||
 | 
						if lm.initIndex(context.Background()) != nil {
 | 
				
			||||||
 | 
							return nil, errs.ErrInternalServer.WrapMsg("init index failed", "coll", coll.Name())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return lm, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type logModel struct {
 | 
				
			||||||
 | 
						coll *mongo.Collection
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *logModel) initIndex(ctx context.Context) error {
 | 
				
			||||||
 | 
						_, err := l.coll.Indexes().CreateOne(ctx, mongo.IndexModel{
 | 
				
			||||||
 | 
							Keys: bson.M{
 | 
				
			||||||
 | 
								"d_id": 1,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *logModel) WriteLog(ctx context.Context, dId string, eIds []string, deleted bool) error {
 | 
				
			||||||
 | 
						if len(eIds) == 0 {
 | 
				
			||||||
 | 
							return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if datautil.Duplicate(eIds) {
 | 
				
			||||||
 | 
							return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						now := time.Now()
 | 
				
			||||||
 | 
						res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if res.MatchedCount > 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						} else if !mongo.IsDuplicateKeyError(err) {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						} else if res.ModifiedCount == 0 {
 | 
				
			||||||
 | 
							return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *logModel) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) error {
 | 
				
			||||||
 | 
						type tableWriteLog struct {
 | 
				
			||||||
 | 
							DID        string    `bson:"d_id"`
 | 
				
			||||||
 | 
							Logs       []Elem    `bson:"logs"`
 | 
				
			||||||
 | 
							Version    uint      `bson:"version"`
 | 
				
			||||||
 | 
							Deleted    uint      `bson:"deleted"`
 | 
				
			||||||
 | 
							LastUpdate time.Time `bson:"last_update"`
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						wl := tableWriteLog{
 | 
				
			||||||
 | 
							DID:        dId,
 | 
				
			||||||
 | 
							Logs:       make([]Elem, 0, len(eIds)),
 | 
				
			||||||
 | 
							Version:    FirstVersion,
 | 
				
			||||||
 | 
							Deleted:    DefaultDeleteVersion,
 | 
				
			||||||
 | 
							LastUpdate: now,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, eId := range eIds {
 | 
				
			||||||
 | 
							wl.Logs = append(wl.Logs, Elem{
 | 
				
			||||||
 | 
								EID:        eId,
 | 
				
			||||||
 | 
								Deleted:    deleted,
 | 
				
			||||||
 | 
								Version:    FirstVersion,
 | 
				
			||||||
 | 
								LastUpdate: now,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err := l.coll.InsertOne(ctx, &wl)
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *logModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
 | 
				
			||||||
 | 
						if len(eIds) == 0 {
 | 
				
			||||||
 | 
							return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						filter := bson.M{
 | 
				
			||||||
 | 
							"d_id": dId,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						elems := make([]bson.M, 0, len(eIds))
 | 
				
			||||||
 | 
						for _, eId := range eIds {
 | 
				
			||||||
 | 
							elems = append(elems, bson.M{
 | 
				
			||||||
 | 
								"e_id":        eId,
 | 
				
			||||||
 | 
								"version":     "$version",
 | 
				
			||||||
 | 
								"deleted":     deleted,
 | 
				
			||||||
 | 
								"last_update": now,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pipeline := []bson.M{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"delete_e_ids": eIds,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$set": bson.M{
 | 
				
			||||||
 | 
									"version":     bson.M{"$add": []any{"$version", 1}},
 | 
				
			||||||
 | 
									"last_update": now,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$set": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$filter": bson.M{
 | 
				
			||||||
 | 
											"input": "$logs",
 | 
				
			||||||
 | 
											"as":    "log",
 | 
				
			||||||
 | 
											"cond": bson.M{
 | 
				
			||||||
 | 
												"$not": bson.M{
 | 
				
			||||||
 | 
													"$in": []any{"$$log.e_id", "$delete_e_ids"},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$set": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$concatArrays": []any{
 | 
				
			||||||
 | 
											"$logs",
 | 
				
			||||||
 | 
											elems,
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$unset": "delete_e_ids",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *logModel) FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLog, error) {
 | 
				
			||||||
 | 
						pipeline := []bson.M{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$match": bson.M{
 | 
				
			||||||
 | 
									"d_id": did,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$cond": bson.M{
 | 
				
			||||||
 | 
											"if": bson.M{
 | 
				
			||||||
 | 
												"$or": []bson.M{
 | 
				
			||||||
 | 
													{"$lt": []any{"$version", version}},
 | 
				
			||||||
 | 
													{"$gte": []any{"$deleted", version}},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
											"then": []any{},
 | 
				
			||||||
 | 
											"else": "$logs",
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$filter": bson.M{
 | 
				
			||||||
 | 
											"input": "$logs",
 | 
				
			||||||
 | 
											"as":    "l",
 | 
				
			||||||
 | 
											"cond": bson.M{
 | 
				
			||||||
 | 
												"$gt": []any{"$$l.version", version},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"log_len": bson.M{"$size": "$logs"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$cond": bson.M{
 | 
				
			||||||
 | 
											"if": bson.M{
 | 
				
			||||||
 | 
												"$gt": []any{"$log_len", limit},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
											"then": []any{},
 | 
				
			||||||
 | 
											"else": "$logs",
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if limit <= 0 {
 | 
				
			||||||
 | 
							pipeline = pipeline[:len(pipeline)-1]
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						res, err := mongoutil.Aggregate[*WriteLog](ctx, l.coll, pipeline)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(res) == 0 {
 | 
				
			||||||
 | 
							return nil, errs.Wrap(mongo.ErrNoDocuments)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return res[0], nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *logModel) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error {
 | 
				
			||||||
 | 
						return mongoutil.DeleteMany(ctx, l.coll, bson.M{
 | 
				
			||||||
 | 
							"last_update": bson.M{
 | 
				
			||||||
 | 
								"$lt": deadline,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -16,6 +16,7 @@ package mgo
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/db/dataver"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
 | 
				
			||||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
@ -27,7 +28,9 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// FriendMgo implements FriendModelInterface using MongoDB as the storage backend.
 | 
					// FriendMgo implements FriendModelInterface using MongoDB as the storage backend.
 | 
				
			||||||
type FriendMgo struct {
 | 
					type FriendMgo struct {
 | 
				
			||||||
	coll *mongo.Collection
 | 
						coll   *mongo.Collection
 | 
				
			||||||
 | 
						owner  dataver.DataLog
 | 
				
			||||||
 | 
						friend dataver.DataLog
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewFriendMongo creates a new instance of FriendMgo with the provided MongoDB database.
 | 
					// NewFriendMongo creates a new instance of FriendMgo with the provided MongoDB database.
 | 
				
			||||||
@ -43,12 +46,25 @@ func NewFriendMongo(db *mongo.Database) (relation.FriendModelInterface, error) {
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &FriendMgo{coll: coll}, nil
 | 
						owner, err := dataver.NewDataLog(db.Collection("friend_owner_log"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						friend, err := dataver.NewDataLog(db.Collection("friend_log"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &FriendMgo{coll: coll, owner: owner, friend: friend}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Create inserts multiple friend records.
 | 
					// Create inserts multiple friend records.
 | 
				
			||||||
func (f *FriendMgo) Create(ctx context.Context, friends []*relation.FriendModel) error {
 | 
					func (f *FriendMgo) Create(ctx context.Context, friends []*relation.FriendModel) error {
 | 
				
			||||||
	return mongoutil.InsertMany(ctx, f.coll, friends)
 | 
						return Success(func() error {
 | 
				
			||||||
 | 
							return mongoutil.InsertMany(ctx, f.coll, friends)
 | 
				
			||||||
 | 
						}, func() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Delete removes specified friends of the owner user.
 | 
					// Delete removes specified friends of the owner user.
 | 
				
			||||||
@ -57,7 +73,13 @@ func (f *FriendMgo) Delete(ctx context.Context, ownerUserID string, friendUserID
 | 
				
			|||||||
		"owner_user_id":  ownerUserID,
 | 
							"owner_user_id":  ownerUserID,
 | 
				
			||||||
		"friend_user_id": bson.M{"$in": friendUserIDs},
 | 
							"friend_user_id": bson.M{"$in": friendUserIDs},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return mongoutil.DeleteOne(ctx, f.coll, filter)
 | 
						return Success(func() error {
 | 
				
			||||||
 | 
							return mongoutil.DeleteOne(ctx, f.coll, filter)
 | 
				
			||||||
 | 
						}, func() error {
 | 
				
			||||||
 | 
							return f.owner.WriteLog(ctx, ownerUserID, friendUserIDs, true)
 | 
				
			||||||
 | 
						}, func() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpdateByMap updates specific fields of a friend document using a map.
 | 
					// UpdateByMap updates specific fields of a friend document using a map.
 | 
				
			||||||
@ -69,18 +91,13 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU
 | 
				
			|||||||
		"owner_user_id":  ownerUserID,
 | 
							"owner_user_id":  ownerUserID,
 | 
				
			||||||
		"friend_user_id": friendUserID,
 | 
							"friend_user_id": friendUserID,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
 | 
						return Success(func() error {
 | 
				
			||||||
 | 
							return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
 | 
				
			||||||
 | 
						}, func() error {
 | 
				
			||||||
 | 
							return f.owner.WriteLog(ctx, ownerUserID, []string{friendUserID}, false)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Update modifies multiple friend documents.
 | 
					 | 
				
			||||||
// func (f *FriendMgo) Update(ctx context.Context, friends []*relation.FriendModel) error {
 | 
					 | 
				
			||||||
// 	filter := bson.M{
 | 
					 | 
				
			||||||
// 		"owner_user_id":  ownerUserID,
 | 
					 | 
				
			||||||
// 		"friend_user_id": friendUserID,
 | 
					 | 
				
			||||||
// 	}
 | 
					 | 
				
			||||||
// 	return mgotool.UpdateMany(ctx, f.coll, filter, friends)
 | 
					 | 
				
			||||||
// }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// UpdateRemark updates the remark for a specific friend.
 | 
					// UpdateRemark updates the remark for a specific friend.
 | 
				
			||||||
func (f *FriendMgo) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) error {
 | 
					func (f *FriendMgo) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) error {
 | 
				
			||||||
	return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark})
 | 
						return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark})
 | 
				
			||||||
@ -157,7 +174,18 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien
 | 
				
			|||||||
	// Create an update document
 | 
						// Create an update document
 | 
				
			||||||
	update := bson.M{"$set": val}
 | 
						update := bson.M{"$set": val}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Perform the update operation for all matching documents
 | 
						return Success(func() error {
 | 
				
			||||||
	_, err := mongoutil.UpdateMany(ctx, f.coll, filter, update)
 | 
							return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update))
 | 
				
			||||||
	return err
 | 
						}, func() error {
 | 
				
			||||||
 | 
							return f.owner.WriteLog(ctx, ownerUserID, friendUserIDs, false)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func Success(fns ...func() error) error {
 | 
				
			||||||
 | 
						for _, fn := range fns {
 | 
				
			||||||
 | 
							if err := fn(); err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -4,6 +4,7 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
	"github.com/openimsdk/tools/errs"
 | 
						"github.com/openimsdk/tools/errs"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/utils/datautil"
 | 
				
			||||||
	"github.com/pkg/errors"
 | 
						"github.com/pkg/errors"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/bson"
 | 
						"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
@ -17,6 +18,11 @@ var (
 | 
				
			|||||||
	ErrNotFound     = mongo.ErrNoDocuments
 | 
						ErrNotFound     = mongo.ErrNoDocuments
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						FirstVersion         = 1
 | 
				
			||||||
 | 
						DefaultDeleteVersion = 0
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Elem struct {
 | 
					type Elem struct {
 | 
				
			||||||
	ID      string
 | 
						ID      string
 | 
				
			||||||
	Version uint
 | 
						Version uint
 | 
				
			||||||
@ -28,18 +34,27 @@ type ChangeLog struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type WriteLog struct {
 | 
					type WriteLog struct {
 | 
				
			||||||
	DID           string    `bson:"d_id"`
 | 
						DID        string    `bson:"d_id"`
 | 
				
			||||||
	Logs          []LogElem `bson:"logs"`
 | 
						Logs       []LogElem `bson:"logs"`
 | 
				
			||||||
	Version       uint      `bson:"version"`
 | 
						Version    uint      `bson:"version"`
 | 
				
			||||||
	LastUpdate    time.Time `bson:"last_update"`
 | 
						Deleted    uint      `bson:"deleted"`
 | 
				
			||||||
	DeleteVersion uint      `bson:"delete_version"`
 | 
						LastUpdate time.Time `bson:"last_update"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type WriteLogLen struct {
 | 
				
			||||||
 | 
						DID        string    `bson:"d_id"`
 | 
				
			||||||
 | 
						Logs       []LogElem `bson:"logs"`
 | 
				
			||||||
 | 
						Version    uint      `bson:"version"`
 | 
				
			||||||
 | 
						Deleted    uint      `bson:"deleted"`
 | 
				
			||||||
 | 
						LastUpdate time.Time `bson:"last_update"`
 | 
				
			||||||
 | 
						LogLen     int       `bson:"log_len"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type LogElem struct {
 | 
					type LogElem struct {
 | 
				
			||||||
	EID        string    `bson:"e_id"`
 | 
						EID        string    `bson:"e_id"`
 | 
				
			||||||
	Deleted    bool      `bson:"deleted"`
 | 
						Deleted    bool      `bson:"deleted"`
 | 
				
			||||||
	Version    uint      `bson:"version"`
 | 
						Version    uint      `bson:"version"`
 | 
				
			||||||
	UpdateTime time.Time `bson:"update_time"`
 | 
						LastUpdate time.Time `bson:"last_update"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type LogModel struct {
 | 
					type LogModel struct {
 | 
				
			||||||
@ -47,7 +62,24 @@ type LogModel struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (l *LogModel) InitIndex(ctx context.Context) error {
 | 
					func (l *LogModel) InitIndex(ctx context.Context) error {
 | 
				
			||||||
	return nil
 | 
						_, err := l.coll.Indexes().CreateOne(ctx, mongo.IndexModel{
 | 
				
			||||||
 | 
							Keys: bson.M{
 | 
				
			||||||
 | 
								"d_id": 1,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *LogModel) WriteLog1(ctx context.Context, dId string, eId string, deleted bool) {
 | 
				
			||||||
 | 
						if err := l.WriteLog(ctx, dId, eId, deleted); err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *LogModel) WriteLogBatch1(ctx context.Context, dId string, eIds []string, deleted bool) {
 | 
				
			||||||
 | 
						if err := l.WriteLogBatch(ctx, dId, eIds, deleted); err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (l *LogModel) WriteLog(ctx context.Context, dId string, eId string, deleted bool) error {
 | 
					func (l *LogModel) WriteLog(ctx context.Context, dId string, eId string, deleted bool) error {
 | 
				
			||||||
@ -65,13 +97,13 @@ func (l *LogModel) WriteLog(ctx context.Context, dId string, eId string, deleted
 | 
				
			|||||||
			{
 | 
								{
 | 
				
			||||||
				EID:        eId,
 | 
									EID:        eId,
 | 
				
			||||||
				Deleted:    deleted,
 | 
									Deleted:    deleted,
 | 
				
			||||||
				Version:    1,
 | 
									Version:    FirstVersion,
 | 
				
			||||||
				UpdateTime: now,
 | 
									LastUpdate: now,
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		Version:       1,
 | 
							Version:    FirstVersion,
 | 
				
			||||||
		LastUpdate:    now,
 | 
							Deleted:    DefaultDeleteVersion,
 | 
				
			||||||
		DeleteVersion: 0,
 | 
							LastUpdate: now,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if _, err := l.coll.InsertOne(ctx, &wl); err == nil {
 | 
						if _, err := l.coll.InsertOne(ctx, &wl); err == nil {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
@ -94,7 +126,7 @@ func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted
 | 
				
			|||||||
		"e_id":        eId,
 | 
							"e_id":        eId,
 | 
				
			||||||
		"version":     "$version",
 | 
							"version":     "$version",
 | 
				
			||||||
		"deleted":     deleted,
 | 
							"deleted":     deleted,
 | 
				
			||||||
		"update_time": now,
 | 
							"last_update": now,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pipeline := []bson.M{
 | 
						pipeline := []bson.M{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
@ -107,7 +139,7 @@ func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted
 | 
				
			|||||||
		{
 | 
							{
 | 
				
			||||||
			"$set": bson.M{
 | 
								"$set": bson.M{
 | 
				
			||||||
				"version":     bson.M{"$add": []any{"$version", 1}},
 | 
									"version":     bson.M{"$add": []any{"$version", 1}},
 | 
				
			||||||
				"update_time": now,
 | 
									"last_update": now,
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
@ -158,6 +190,181 @@ func (l *LogModel) writeLog(ctx context.Context, dId string, eId string, deleted
 | 
				
			|||||||
	return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
 | 
						return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint) (*ChangeLog, error) {
 | 
					func (l *LogModel) WriteLogBatch(ctx context.Context, dId string, eIds []string, deleted bool) error {
 | 
				
			||||||
	return nil, nil
 | 
						if len(eIds) == 0 {
 | 
				
			||||||
 | 
							return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if datautil.Duplicate(eIds) {
 | 
				
			||||||
 | 
							return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						now := time.Now()
 | 
				
			||||||
 | 
						res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if res.MatchedCount > 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						wl := WriteLog{
 | 
				
			||||||
 | 
							DID:        dId,
 | 
				
			||||||
 | 
							Logs:       make([]LogElem, 0, len(eIds)),
 | 
				
			||||||
 | 
							Version:    FirstVersion,
 | 
				
			||||||
 | 
							Deleted:    DefaultDeleteVersion,
 | 
				
			||||||
 | 
							LastUpdate: now,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, eId := range eIds {
 | 
				
			||||||
 | 
							wl.Logs = append(wl.Logs, LogElem{
 | 
				
			||||||
 | 
								EID:        eId,
 | 
				
			||||||
 | 
								Deleted:    deleted,
 | 
				
			||||||
 | 
								Version:    FirstVersion,
 | 
				
			||||||
 | 
								LastUpdate: now,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if _, err := l.coll.InsertOne(ctx, &wl); err == nil {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						} else if !mongo.IsDuplicateKeyError(err) {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						} else if res.ModifiedCount == 0 {
 | 
				
			||||||
 | 
							return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *LogModel) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) {
 | 
				
			||||||
 | 
						if len(eIds) == 0 {
 | 
				
			||||||
 | 
							return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						filter := bson.M{
 | 
				
			||||||
 | 
							"d_id": dId,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						elems := make([]bson.M, 0, len(eIds))
 | 
				
			||||||
 | 
						for _, eId := range eIds {
 | 
				
			||||||
 | 
							elems = append(elems, bson.M{
 | 
				
			||||||
 | 
								"e_id":        eId,
 | 
				
			||||||
 | 
								"version":     "$version",
 | 
				
			||||||
 | 
								"deleted":     deleted,
 | 
				
			||||||
 | 
								"last_update": now,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pipeline := []bson.M{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"delete_e_ids": eIds,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$set": bson.M{
 | 
				
			||||||
 | 
									"version":     bson.M{"$add": []any{"$version", 1}},
 | 
				
			||||||
 | 
									"last_update": now,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$set": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$filter": bson.M{
 | 
				
			||||||
 | 
											"input": "$logs",
 | 
				
			||||||
 | 
											"as":    "log",
 | 
				
			||||||
 | 
											"cond": bson.M{
 | 
				
			||||||
 | 
												"$not": bson.M{
 | 
				
			||||||
 | 
													"$in": []any{"$$log.e_id", "$delete_e_ids"},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$set": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$concatArrays": []any{
 | 
				
			||||||
 | 
											"$logs",
 | 
				
			||||||
 | 
											elems,
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$unset": "delete_e_ids",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *LogModel) FindChangeLog(ctx context.Context, did string, version uint, limit int) (*WriteLogLen, error) {
 | 
				
			||||||
 | 
						pipeline := []bson.M{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$match": bson.M{
 | 
				
			||||||
 | 
									"d_id": did,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$cond": bson.M{
 | 
				
			||||||
 | 
											"if": bson.M{
 | 
				
			||||||
 | 
												"$or": []bson.M{
 | 
				
			||||||
 | 
													{"$lt": []any{"$version", version}},
 | 
				
			||||||
 | 
													{"$gte": []any{"$deleted", version}},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
											"then": []any{},
 | 
				
			||||||
 | 
											"else": "$logs",
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$filter": bson.M{
 | 
				
			||||||
 | 
											"input": "$logs",
 | 
				
			||||||
 | 
											"as":    "l",
 | 
				
			||||||
 | 
											"cond": bson.M{
 | 
				
			||||||
 | 
												"$gt": []any{"$$l.version", version},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"log_len": bson.M{"$size": "$logs"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								"$addFields": bson.M{
 | 
				
			||||||
 | 
									"logs": bson.M{
 | 
				
			||||||
 | 
										"$cond": bson.M{
 | 
				
			||||||
 | 
											"if": bson.M{
 | 
				
			||||||
 | 
												"$gt": []any{"$log_len", limit},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
											"then": []any{},
 | 
				
			||||||
 | 
											"else": "$logs",
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if limit <= 0 {
 | 
				
			||||||
 | 
							pipeline = pipeline[:len(pipeline)-1]
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						res, err := mongoutil.Aggregate[*WriteLogLen](ctx, l.coll, pipeline)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(res) == 0 {
 | 
				
			||||||
 | 
							return nil, ErrNotFound
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return res[0], nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *LogModel) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error {
 | 
				
			||||||
 | 
						return mongoutil.DeleteMany(ctx, l.coll, bson.M{
 | 
				
			||||||
 | 
							"last_update": bson.M{
 | 
				
			||||||
 | 
								"$lt": deadline,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -2,7 +2,6 @@ package listdemo
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo/options"
 | 
						"go.mongodb.org/mongo-driver/mongo/options"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
@ -24,7 +23,7 @@ func Check(err error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestName(t *testing.T) {
 | 
					func TestName(t *testing.T) {
 | 
				
			||||||
	cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
 | 
						cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
 | 
				
			||||||
	coll := cli.Database("openim_v3").Collection("demo")
 | 
						coll := cli.Database("openim_v3").Collection("friend_version")
 | 
				
			||||||
	_ = coll
 | 
						_ = coll
 | 
				
			||||||
	//Result(coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
 | 
						//Result(coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
 | 
				
			||||||
	//	{
 | 
						//	{
 | 
				
			||||||
@ -35,27 +34,24 @@ func TestName(t *testing.T) {
 | 
				
			|||||||
	//	},
 | 
						//	},
 | 
				
			||||||
	//}))
 | 
						//}))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	wl := WriteLog{
 | 
						const num = 1
 | 
				
			||||||
		DID: "100",
 | 
						lm := &LogModel{coll: coll}
 | 
				
			||||||
		Logs: []LogElem{
 | 
					
 | 
				
			||||||
			{
 | 
						//start := time.Now()
 | 
				
			||||||
				EID:        "1000",
 | 
						//eIds := make([]string, 0, num)
 | 
				
			||||||
				Deleted:    false,
 | 
						//for i := 0; i < num; i++ {
 | 
				
			||||||
				Version:    1,
 | 
						//	eIds = append(eIds, strconv.Itoa(1000+(i)))
 | 
				
			||||||
				UpdateTime: time.Now(),
 | 
						//}
 | 
				
			||||||
			},
 | 
						//lm.WriteLogBatch1(context.Background(), "100", eIds, false)
 | 
				
			||||||
			{
 | 
						//end := time.Now()
 | 
				
			||||||
				EID:        "2000",
 | 
						//t.Log(end.Sub(start))       // 509.962208ms
 | 
				
			||||||
				Deleted:    false,
 | 
						//t.Log(end.Sub(start) / num) // 511.496µs
 | 
				
			||||||
				Version:    1,
 | 
					
 | 
				
			||||||
				UpdateTime: time.Now(),
 | 
						start := time.Now()
 | 
				
			||||||
			},
 | 
						wll, err := lm.FindChangeLog(context.Background(), "100", 3, 100)
 | 
				
			||||||
		},
 | 
						if err != nil {
 | 
				
			||||||
		Version:       2,
 | 
							panic(err)
 | 
				
			||||||
		DeleteVersion: 0,
 | 
					 | 
				
			||||||
		LastUpdate:    time.Now(),
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						t.Log(time.Since(start))
 | 
				
			||||||
	fmt.Println(Result(coll.InsertOne(context.Background(), wl)))
 | 
						t.Log(wll)
 | 
				
			||||||
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -1,4 +1,4 @@
 | 
				
			|||||||
db.demo.updateMany(
 | 
					db.friend_version.updateMany(
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        "d_id": "100"
 | 
					        "d_id": "100"
 | 
				
			||||||
    },
 | 
					    },
 | 
				
			||||||
@ -18,7 +18,7 @@ db.demo.updateMany(
 | 
				
			|||||||
                version: {
 | 
					                version: {
 | 
				
			||||||
                    $add: ["$version", 1]
 | 
					                    $add: ["$version", 1]
 | 
				
			||||||
                },
 | 
					                },
 | 
				
			||||||
                update_time: new Date(),
 | 
					                last_update: new Date(),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        },
 | 
					        },
 | 
				
			||||||
@ -35,7 +35,7 @@ db.demo.updateMany(
 | 
				
			|||||||
                                [
 | 
					                                [
 | 
				
			||||||
                                    {
 | 
					                                    {
 | 
				
			||||||
                                        e_id: "1000",
 | 
					                                        e_id: "1000",
 | 
				
			||||||
                                        update_time: new Date(),
 | 
					                                        last_update: new Date(),
 | 
				
			||||||
                                        version: "$version",
 | 
					                                        version: "$version",
 | 
				
			||||||
                                        deleted: false
 | 
					                                        deleted: false
 | 
				
			||||||
                                    }
 | 
					                                    }
 | 
				
			||||||
@ -57,7 +57,7 @@ db.demo.updateMany(
 | 
				
			|||||||
                                        },
 | 
					                                        },
 | 
				
			||||||
                                        then: {
 | 
					                                        then: {
 | 
				
			||||||
                                            e_id: "1000",
 | 
					                                            e_id: "1000",
 | 
				
			||||||
                                            update_time: new Date(),
 | 
					                                            last_update: new Date(),
 | 
				
			||||||
                                            version: "$version",
 | 
					                                            version: "$version",
 | 
				
			||||||
                                            deleted: false
 | 
					                                            deleted: false
 | 
				
			||||||
                                        },
 | 
					                                        },
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										63
									
								
								pkg/common/listdemo2/demo2.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								pkg/common/listdemo2/demo2.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,63 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					db.friend_version.updateMany(
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "d_id": "100"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    [
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            $addFields: {
 | 
				
			||||||
 | 
					                update_elem_ids: ["1000", "1001","1003", "2000"]
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            $set: {
 | 
				
			||||||
 | 
					                version: {
 | 
				
			||||||
 | 
					                    $add: ["$version", 1]
 | 
				
			||||||
 | 
					                },
 | 
				
			||||||
 | 
					                last_update: new Date(),
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            $set: {
 | 
				
			||||||
 | 
					                logs: {
 | 
				
			||||||
 | 
					                    $filter: {
 | 
				
			||||||
 | 
					                        input: "$logs",
 | 
				
			||||||
 | 
					                        as: "log",
 | 
				
			||||||
 | 
					                        cond: {
 | 
				
			||||||
 | 
					                            "$not": {
 | 
				
			||||||
 | 
					                                $in: ["$$log.e_id", "$update_elem_ids"]
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            $set: {
 | 
				
			||||||
 | 
					                logs: {
 | 
				
			||||||
 | 
					                    $concatArrays: [
 | 
				
			||||||
 | 
					                        "$logs",
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            {
 | 
				
			||||||
 | 
					                                e_id: "1003",
 | 
				
			||||||
 | 
					                                last_update: ISODate("2024-05-25T06:32:10.238Z"),
 | 
				
			||||||
 | 
					                                version: "$version",
 | 
				
			||||||
 | 
					                                deleted: false
 | 
				
			||||||
 | 
					                            },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        ]
 | 
				
			||||||
 | 
					                    ]
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            $unset: ["update_elem_ids"]
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
							
								
								
									
										59
									
								
								pkg/common/listdemo2/demo3.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								pkg/common/listdemo2/demo3.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,59 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					db.friend_version.aggregate([
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "$match": {
 | 
				
			||||||
 | 
					            "d_id": "100",
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "$project": {
 | 
				
			||||||
 | 
					            "_id": 0,
 | 
				
			||||||
 | 
					            "d_id": 0,
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "$addFields": {
 | 
				
			||||||
 | 
					            "logs": {
 | 
				
			||||||
 | 
					                $cond: {
 | 
				
			||||||
 | 
					                    if: {
 | 
				
			||||||
 | 
					                        $or: [
 | 
				
			||||||
 | 
					                            {$lt: ["$version", 3]},
 | 
				
			||||||
 | 
					                            {$gte: ["$deleted", 3]},
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    },
 | 
				
			||||||
 | 
					                    then: [],
 | 
				
			||||||
 | 
					                    else: "$logs",
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "$addFields": {
 | 
				
			||||||
 | 
					            "logs": {
 | 
				
			||||||
 | 
					                "$filter": {
 | 
				
			||||||
 | 
					                    input: "$logs",
 | 
				
			||||||
 | 
					                    as: "l",
 | 
				
			||||||
 | 
					                    cond: { $gt: ["$$l.version", 3] }
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "$addFields": {
 | 
				
			||||||
 | 
					            "log_len": {
 | 
				
			||||||
 | 
					                $size: "$logs"
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "$addFields": {
 | 
				
			||||||
 | 
					            "logs": {
 | 
				
			||||||
 | 
					                $cond: {
 | 
				
			||||||
 | 
					                    if: {$gt: ["$log_len", 1]},
 | 
				
			||||||
 | 
					                    then: [],
 | 
				
			||||||
 | 
					                    else: "$logs",
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					])
 | 
				
			||||||
							
								
								
									
										10
									
								
								pkg/common/listdemo2/demo5.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								pkg/common/listdemo2/demo5.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,10 @@
 | 
				
			|||||||
 | 
					db.friend_version.updateMany(
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        "d_id": "100"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    [
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ],
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user