mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	sync
This commit is contained in:
		
							parent
							
								
									58c4c13cf1
								
							
						
					
					
						commit
						caebdf32ca
					
				@ -17,7 +17,9 @@ package group
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 | 
			
		||||
@ -287,6 +289,15 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint64, collName string, id string) {
 | 
			
		||||
	for _, coll := range versionctx.GetVersionLog(ctx).Get() {
 | 
			
		||||
		if coll.Name == collName && coll.Doc.DID == id {
 | 
			
		||||
			*version = uint64(coll.Doc.Version)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
 | 
			
		||||
	var err error
 | 
			
		||||
	defer func() {
 | 
			
		||||
@ -297,6 +308,7 @@ func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context,
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -380,6 +392,7 @@ func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, me
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	tips := &sdkws.MemberQuitTips{Group: group, QuitUser: member}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, member.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -467,6 +480,7 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, req.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -480,6 +494,7 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -503,6 +518,7 @@ func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context,
 | 
			
		||||
	}
 | 
			
		||||
	tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
 | 
			
		||||
	err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID)
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -524,6 +540,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -564,6 +581,7 @@ func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Conte
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -588,6 +606,7 @@ func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -666,6 +685,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -689,6 +709,7 @@ func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -713,5 +734,6 @@ func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx c
 | 
			
		||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
			
		||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import (
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/internal/rpc/group"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
 | 
			
		||||
	"github.com/openimsdk/tools/system/program"
 | 
			
		||||
	"github.com/spf13/cobra"
 | 
			
		||||
)
 | 
			
		||||
@ -58,5 +59,5 @@ func (a *GroupRpcCmd) Exec() error {
 | 
			
		||||
func (a *GroupRpcCmd) runE() error {
 | 
			
		||||
	return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
 | 
			
		||||
		a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports,
 | 
			
		||||
		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start)
 | 
			
		||||
		a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -27,7 +27,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewBlackMongo(db *mongo.Database) (database.Black, error) {
 | 
			
		||||
	coll := db.Collection("black")
 | 
			
		||||
	coll := db.Collection(database.BlackName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "owner_user_id", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@ package mgo
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@ -29,7 +30,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
 | 
			
		||||
	coll := db.Collection("conversation")
 | 
			
		||||
	coll := db.Collection(database.ConversationName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "owner_user_id", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -34,7 +34,7 @@ type FriendMgo struct {
 | 
			
		||||
 | 
			
		||||
// NewFriendMongo creates a new instance of FriendMgo with the provided MongoDB database.
 | 
			
		||||
func NewFriendMongo(db *mongo.Database) (database.Friend, error) {
 | 
			
		||||
	coll := db.Collection("friend")
 | 
			
		||||
	coll := db.Collection(database.FriendName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "owner_user_id", Value: 1},
 | 
			
		||||
@ -45,7 +45,7 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	owner, err := NewVersionLog(db.Collection("friend_version"))
 | 
			
		||||
	owner, err := NewVersionLog(db.Collection(database.FriendVersionName))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -27,7 +27,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewFriendRequestMongo(db *mongo.Database) (database.FriendRequest, error) {
 | 
			
		||||
	coll := db.Collection("friend_request")
 | 
			
		||||
	coll := db.Collection(database.FriendRequestName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "from_user_id", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -30,7 +30,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewGroupMongo(db *mongo.Database) (database.Group, error) {
 | 
			
		||||
	coll := db.Collection("group")
 | 
			
		||||
	coll := db.Collection(database.GroupName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "group_id", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewGroupMember(db *mongo.Database) (database.GroupMember, error) {
 | 
			
		||||
	coll := db.Collection("group_member")
 | 
			
		||||
	coll := db.Collection(database.GroupMemberName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "group_id", Value: 1},
 | 
			
		||||
@ -40,11 +40,11 @@ func NewGroupMember(db *mongo.Database) (database.GroupMember, error) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	member, err := NewVersionLog(db.Collection("group_member_version"))
 | 
			
		||||
	member, err := NewVersionLog(db.Collection(database.GroupMemberVersionName))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	join, err := NewVersionLog(db.Collection("group_join_version"))
 | 
			
		||||
	join, err := NewVersionLog(db.Collection(database.GroupJoinVersionName))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -28,7 +28,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewGroupRequestMgo(db *mongo.Database) (database.GroupRequest, error) {
 | 
			
		||||
	coll := db.Collection("group_request")
 | 
			
		||||
	coll := db.Collection(database.GroupRequestName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "group_id", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -28,7 +28,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewLogMongo(db *mongo.Database) (database.Log, error) {
 | 
			
		||||
	coll := db.Collection("log")
 | 
			
		||||
	coll := db.Collection(database.LogName)
 | 
			
		||||
	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
 | 
			
		||||
		{
 | 
			
		||||
			Keys: bson.D{
 | 
			
		||||
 | 
			
		||||
@ -27,7 +27,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) {
 | 
			
		||||
	coll := db.Collection("s3")
 | 
			
		||||
	coll := db.Collection(database.ObjectName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "name", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -31,7 +31,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewUserMongo(db *mongo.Database) (database.User, error) {
 | 
			
		||||
	coll := db.Collection("user")
 | 
			
		||||
	coll := db.Collection(database.UserName)
 | 
			
		||||
	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
 | 
			
		||||
		Keys: bson.D{
 | 
			
		||||
			{Key: "user_id", Value: 1},
 | 
			
		||||
 | 
			
		||||
@ -5,9 +5,9 @@ import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
 | 
			
		||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/bson"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/bson/primitive"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
			
		||||
@ -37,34 +37,41 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error {
 | 
			
		||||
	if len(eIds) == 0 {
 | 
			
		||||
		return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
 | 
			
		||||
	}
 | 
			
		||||
	if datautil.Duplicate(eIds) {
 | 
			
		||||
		return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds)
 | 
			
		||||
	}
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	res, err := l.writeLogBatch(ctx, dId, eIds, state, now)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
	_, err := l.IncrVersionResult(ctx, dId, eIds, state)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
	if res.MatchedCount > 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if _, err := l.initDoc(ctx, dId, eIds, state, now); err == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	} else if !mongo.IsDuplicateKeyError(err) {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if res, err := l.writeLogBatch(ctx, dId, eIds, state, now); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	} else if res.MatchedCount == 0 {
 | 
			
		||||
		return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLogTable, error) {
 | 
			
		||||
func (l *VersionLogMgo) IncrVersionResult(ctx context.Context, dId string, eIds []string, state int32) (*model.VersionLog, error) {
 | 
			
		||||
	vl, err := l.incrVersionResult(ctx, dId, eIds, state)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	versionctx.GetVersionLog(ctx).Append(versionctx.Collection{
 | 
			
		||||
		Name: l.coll.Name(),
 | 
			
		||||
		Doc:  vl,
 | 
			
		||||
	})
 | 
			
		||||
	return vl, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *VersionLogMgo) incrVersionResult(ctx context.Context, dId string, eIds []string, state int32) (*model.VersionLog, error) {
 | 
			
		||||
	if len(eIds) == 0 {
 | 
			
		||||
		return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId)
 | 
			
		||||
	}
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	if res, err := l.writeLogBatch2(ctx, dId, eIds, state, now); err == nil {
 | 
			
		||||
		return res, nil
 | 
			
		||||
	} else if !errors.Is(err, mongo.ErrNoDocuments) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if res, err := l.initDoc(ctx, dId, eIds, state, now); err == nil {
 | 
			
		||||
		return res, nil
 | 
			
		||||
	} else if !mongo.IsDuplicateKeyError(err) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return l.writeLogBatch2(ctx, dId, eIds, state, now)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLog, error) {
 | 
			
		||||
	wl := model.VersionLogTable{
 | 
			
		||||
		ID:         primitive.NewObjectID(),
 | 
			
		||||
		DID:        dId,
 | 
			
		||||
@ -81,11 +88,13 @@ func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string,
 | 
			
		||||
			LastUpdate: now,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	_, err := l.coll.InsertOne(ctx, &wl)
 | 
			
		||||
	return &wl, err
 | 
			
		||||
	if _, err := l.coll.InsertOne(ctx, &wl); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return wl.VersionLog(), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*mongo.UpdateResult, error) {
 | 
			
		||||
func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLog, error) {
 | 
			
		||||
	if eIds == nil {
 | 
			
		||||
		eIds = []string{}
 | 
			
		||||
	}
 | 
			
		||||
@ -142,7 +151,8 @@ func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []st
 | 
			
		||||
			"$unset": "delete_e_ids",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline)
 | 
			
		||||
	opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0})
 | 
			
		||||
	return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {
 | 
			
		||||
@ -160,7 +170,7 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil {
 | 
			
		||||
		return res.VersionLog(), nil
 | 
			
		||||
		return res, nil
 | 
			
		||||
	} else if mongo.IsDuplicateKeyError(err) {
 | 
			
		||||
		return l.findChangeLog(ctx, dId, version, limit)
 | 
			
		||||
	} else {
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										39
									
								
								pkg/common/storage/database/mgo/version_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								pkg/common/storage/database/mgo/version_test.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,39 @@
 | 
			
		||||
package mgo
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/mongo/options"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Result[V any](val V, err error) V {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return val
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Check(err error) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestName(t *testing.T) {
 | 
			
		||||
	cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
 | 
			
		||||
	coll := cli.Database("openim_v3").Collection("version_test")
 | 
			
		||||
	tmp, err := NewVersionLog(coll)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	vl := tmp.(*VersionLogMgo)
 | 
			
		||||
	res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Log(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	t.Logf("%+v", res)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										17
									
								
								pkg/common/storage/database/name.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								pkg/common/storage/database/name.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,17 @@
 | 
			
		||||
package database
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	BlackName              = "black"
 | 
			
		||||
	ConversationName       = "conversation"
 | 
			
		||||
	FriendName             = "friend"
 | 
			
		||||
	FriendVersionName      = "friend_version"
 | 
			
		||||
	FriendRequestName      = "friend_request"
 | 
			
		||||
	GroupName              = "group"
 | 
			
		||||
	GroupMemberName        = "group_member"
 | 
			
		||||
	GroupMemberVersionName = "group_member_version"
 | 
			
		||||
	GroupJoinVersionName   = "group_join_version"
 | 
			
		||||
	GroupRequestName       = "group_request"
 | 
			
		||||
	LogName                = "log"
 | 
			
		||||
	ObjectName             = "s3"
 | 
			
		||||
	UserName               = "user"
 | 
			
		||||
)
 | 
			
		||||
@ -38,7 +38,7 @@ func (v *VersionLogTable) VersionLog() *VersionLog {
 | 
			
		||||
		Version:    v.Version,
 | 
			
		||||
		Deleted:    v.Deleted,
 | 
			
		||||
		LastUpdate: v.LastUpdate,
 | 
			
		||||
		LogLen:     0,
 | 
			
		||||
		LogLen:     len(v.Logs),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										14
									
								
								pkg/common/storage/versionctx/rpc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										14
									
								
								pkg/common/storage/versionctx/rpc.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,14 @@
 | 
			
		||||
package versionctx
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func EnableVersionCtx() grpc.ServerOption {
 | 
			
		||||
	return grpc.ChainUnaryInterceptor(enableVersionCtxInterceptor)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func enableVersionCtxInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
 | 
			
		||||
	return handler(WithVersionLog(ctx), req)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										48
									
								
								pkg/common/storage/versionctx/version.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								pkg/common/storage/versionctx/version.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,48 @@
 | 
			
		||||
package versionctx
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Collection struct {
 | 
			
		||||
	Name string
 | 
			
		||||
	Doc  *tablerelation.VersionLog
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type versionKey struct{}
 | 
			
		||||
 | 
			
		||||
func WithVersionLog(ctx context.Context) context.Context {
 | 
			
		||||
	return context.WithValue(ctx, versionKey{}, &VersionLog{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetVersionLog(ctx context.Context) *VersionLog {
 | 
			
		||||
	if v, ok := ctx.Value(versionKey{}).(*VersionLog); ok {
 | 
			
		||||
		return v
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type VersionLog struct {
 | 
			
		||||
	lock sync.Mutex
 | 
			
		||||
	data []Collection
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (v *VersionLog) Append(data ...Collection) {
 | 
			
		||||
	if v == nil || len(data) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	v.lock.Lock()
 | 
			
		||||
	defer v.lock.Unlock()
 | 
			
		||||
	v.data = append(v.data, data...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (v *VersionLog) Get() []Collection {
 | 
			
		||||
	if v == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	v.lock.Lock()
 | 
			
		||||
	defer v.lock.Unlock()
 | 
			
		||||
	return v.data
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user