mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	Merge branch 'pre-release-v3.8.4' into cherry-pick-1697651
This commit is contained in:
		
						commit
						7c6b9a3c0f
					
				
							
								
								
									
										25
									
								
								.github/workflows/go-build-test.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										25
									
								
								.github/workflows/go-build-test.yml
									
									
									
									
										vendored
									
									
								
							| @ -12,6 +12,10 @@ jobs: | ||||
|   go-build: | ||||
|     name: Test with go ${{ matrix.go_version }} on ${{ matrix.os }} | ||||
|     runs-on: ${{ matrix.os }} | ||||
| 
 | ||||
|     env: | ||||
|       SHARE_CONFIG_PATH: config/share.yml | ||||
| 
 | ||||
|     permissions: | ||||
|       contents: write | ||||
|       pull-requests: write | ||||
| @ -40,6 +44,10 @@ jobs: | ||||
|         with: | ||||
|           compose-file: "./docker-compose.yml" | ||||
| 
 | ||||
|       - name: Modify Server Configuration | ||||
|         run: | | ||||
|           yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }} | ||||
| 
 | ||||
|       # - name: Get Internal IP Address | ||||
|       #   id: get-ip | ||||
|       #   run: | | ||||
| @ -71,6 +79,11 @@ jobs: | ||||
|           go mod download | ||||
|           go install github.com/magefile/mage@latest | ||||
| 
 | ||||
|       - name: Modify Chat Configuration | ||||
|         run: | | ||||
|           cd ${{ github.workspace }}/chat-repo | ||||
|           yq e '.openIM.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }} | ||||
| 
 | ||||
|       - name: Build and test Chat Services | ||||
|         run: | | ||||
|           cd ${{ github.workspace }}/chat-repo | ||||
| @ -132,7 +145,7 @@ jobs: | ||||
| 
 | ||||
|           # Test get admin token | ||||
|           get_admin_token_response=$(curl -X POST -H "Content-Type: application/json" -H "operationID: imAdmin" -d '{ | ||||
|             "secret": "openIM123", | ||||
|             "secret": "123456", | ||||
|             "platformID": 2, | ||||
|             "userID": "imAdmin" | ||||
|           }' http://127.0.0.1:10002/auth/get_admin_token) | ||||
| @ -169,7 +182,8 @@ jobs: | ||||
|       contents: write | ||||
|     env: | ||||
|       SDK_DIR: openim-sdk-core | ||||
|       CONFIG_PATH: config/notification.yml | ||||
|       NOTIFICATION_CONFIG_PATH: config/notification.yml | ||||
|       SHARE_CONFIG_PATH: config/share.yml | ||||
| 
 | ||||
|     strategy: | ||||
|       matrix: | ||||
| @ -184,7 +198,7 @@ jobs: | ||||
|         uses: actions/checkout@v4 | ||||
|         with: | ||||
|           repository: "openimsdk/openim-sdk-core" | ||||
|           ref: "release-v3.8" | ||||
|           ref: "main" | ||||
|           path: ${{ env.SDK_DIR }} | ||||
| 
 | ||||
|       - name: Set up Go ${{ matrix.go_version }} | ||||
| @ -199,8 +213,9 @@ jobs: | ||||
| 
 | ||||
|       - name: Modify Server Configuration | ||||
|         run: | | ||||
|           yq e '.groupCreated.isSendMsg = true' -i ${{ env.CONFIG_PATH }} | ||||
|           yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.CONFIG_PATH }} | ||||
|           yq e '.groupCreated.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }} | ||||
|           yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }} | ||||
|           yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }} | ||||
| 
 | ||||
|       - name: Start Server Services | ||||
|         run: | | ||||
|  | ||||
| @ -1,9 +1,9 @@ | ||||
| secret: openIM123 | ||||
| 
 | ||||
| imAdminUserID: [ imAdmin ] | ||||
| imAdminUserID: [imAdmin] | ||||
| 
 | ||||
| # 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time | ||||
| multiLogin: | ||||
|   policy: 1 | ||||
|   # max num of tokens in one end | ||||
|   maxNumOneEnd: 30 | ||||
|   maxNumOneEnd: 30 | ||||
|  | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -219,3 +219,5 @@ require ( | ||||
| 	golang.org/x/crypto v0.27.0 // indirect | ||||
| 	gopkg.in/ini.v1 v1.67.0 // indirect | ||||
| ) | ||||
| 
 | ||||
| //replace github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol | ||||
|  | ||||
| @ -144,24 +144,23 @@ func Start(ctx context.Context, index int, config *Config) error { | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	if config.Discovery.Enable == conf.ETCD { | ||||
| 		cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) | ||||
| 		cm.Watch(ctx) | ||||
| 	} | ||||
| 
 | ||||
| 	sigs := make(chan os.Signal, 1) | ||||
| 	signal.Notify(sigs, syscall.SIGTERM) | ||||
| 
 | ||||
| 	shutdown := func() error { | ||||
| 		ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) | ||||
| 		defer cancel() | ||||
| 		err := server.Shutdown(ctx) | ||||
| 		if err != nil { | ||||
| 			return errs.WrapMsg(err, "shutdown err") | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 	disetcd.RegisterShutDown(shutdown) | ||||
| 	//if config.Discovery.Enable == conf.ETCD { | ||||
| 	//	cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) | ||||
| 	//	cm.Watch(ctx) | ||||
| 	//} | ||||
| 	//sigs := make(chan os.Signal, 1) | ||||
| 	//signal.Notify(sigs, syscall.SIGTERM) | ||||
| 	//select { | ||||
| 	//case val := <-sigs: | ||||
| 	//	log.ZDebug(ctx, "recv exit", "signal", val.String()) | ||||
| 	//	cancel(fmt.Errorf("signal %s", val.String())) | ||||
| 	//case <-ctx.Done(): | ||||
| 	//} | ||||
| 	<-apiCtx.Done() | ||||
| 	exitCause := context.Cause(apiCtx) | ||||
| 	log.ZWarn(ctx, "api server exit", exitCause) | ||||
| 	timer := time.NewTimer(time.Second * 15) | ||||
| 	defer timer.Stop() | ||||
| 	select { | ||||
| 	case <-sigs: | ||||
| 		program.SIGTERMExit() | ||||
|  | ||||
| @ -124,6 +124,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf | ||||
| 		userRouterGroup.POST("/add_notification_account", u.AddNotificationAccount) | ||||
| 		userRouterGroup.POST("/update_notification_account", u.UpdateNotificationAccountInfo) | ||||
| 		userRouterGroup.POST("/search_notification_account", u.SearchNotificationAccount) | ||||
| 
 | ||||
| 		userRouterGroup.POST("/get_user_client_config", u.GetUserClientConfig) | ||||
| 		userRouterGroup.POST("/set_user_client_config", u.SetUserClientConfig) | ||||
| 		userRouterGroup.POST("/del_user_client_config", u.DelUserClientConfig) | ||||
| 		userRouterGroup.POST("/page_user_client_config", u.PageUserClientConfig) | ||||
| 	} | ||||
| 	// friend routing group | ||||
| 	{ | ||||
|  | ||||
| @ -242,3 +242,19 @@ func (u *UserApi) UpdateNotificationAccountInfo(c *gin.Context) { | ||||
| func (u *UserApi) SearchNotificationAccount(c *gin.Context) { | ||||
| 	a2r.Call(c, user.UserClient.SearchNotificationAccount, u.Client) | ||||
| } | ||||
| 
 | ||||
| func (u *UserApi) GetUserClientConfig(c *gin.Context) { | ||||
| 	a2r.Call(c, user.UserClient.GetUserClientConfig, u.Client) | ||||
| } | ||||
| 
 | ||||
| func (u *UserApi) SetUserClientConfig(c *gin.Context) { | ||||
| 	a2r.Call(c, user.UserClient.SetUserClientConfig, u.Client) | ||||
| } | ||||
| 
 | ||||
| func (u *UserApi) DelUserClientConfig(c *gin.Context) { | ||||
| 	a2r.Call(c, user.UserClient.DelUserClientConfig, u.Client) | ||||
| } | ||||
| 
 | ||||
| func (u *UserApi) PageUserClientConfig(c *gin.Context) { | ||||
| 	a2r.Call(c, user.UserClient.PageUserClientConfig, u.Client) | ||||
| } | ||||
|  | ||||
| @ -62,7 +62,7 @@ func (t *thirdServer) InitiateMultipartUpload(ctx context.Context, req *third.In | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	expireTime := time.Now().Add(t.defaultExpire) | ||||
| 	result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, t.defaultExpire, int(req.MaxParts)) | ||||
| 	result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, t.defaultExpire, int(req.MaxParts), req.ContentType) | ||||
| 	if err != nil { | ||||
| 		if haErr, ok := errs.Unwrap(err).(*cont.HashAlreadyExistsError); ok { | ||||
| 			obj := &model.Object{ | ||||
|  | ||||
							
								
								
									
										71
									
								
								internal/rpc/user/config.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										71
									
								
								internal/rpc/user/config.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,71 @@ | ||||
| package user | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	pbuser "github.com/openimsdk/protocol/user" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| ) | ||||
| 
 | ||||
| func (s *userServer) GetUserClientConfig(ctx context.Context, req *pbuser.GetUserClientConfigReq) (*pbuser.GetUserClientConfigResp, error) { | ||||
| 	if req.UserID != "" { | ||||
| 		if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		if _, err := s.db.GetUserByID(ctx, req.UserID); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	res, err := s.clientConfig.GetUserConfig(ctx, req.UserID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &pbuser.GetUserClientConfigResp{Configs: res}, nil | ||||
| } | ||||
| 
 | ||||
| func (s *userServer) SetUserClientConfig(ctx context.Context, req *pbuser.SetUserClientConfigReq) (*pbuser.SetUserClientConfigResp, error) { | ||||
| 	if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if req.UserID != "" { | ||||
| 		if _, err := s.db.GetUserByID(ctx, req.UserID); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	if err := s.clientConfig.SetUserConfig(ctx, req.UserID, req.Configs); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &pbuser.SetUserClientConfigResp{}, nil | ||||
| } | ||||
| 
 | ||||
| func (s *userServer) DelUserClientConfig(ctx context.Context, req *pbuser.DelUserClientConfigReq) (*pbuser.DelUserClientConfigResp, error) { | ||||
| 	if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if err := s.clientConfig.DelUserConfig(ctx, req.UserID, req.Keys); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &pbuser.DelUserClientConfigResp{}, nil | ||||
| } | ||||
| 
 | ||||
| func (s *userServer) PageUserClientConfig(ctx context.Context, req *pbuser.PageUserClientConfigReq) (*pbuser.PageUserClientConfigResp, error) { | ||||
| 	if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	total, res, err := s.clientConfig.GetUserConfigPage(ctx, req.UserID, req.Key, req.Pagination) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &pbuser.PageUserClientConfigResp{ | ||||
| 		Total: total, | ||||
| 		Configs: datautil.Slice(res, func(e *model.ClientConfig) *pbuser.ClientConfig { | ||||
| 			return &pbuser.ClientConfig{ | ||||
| 				UserID: e.UserID, | ||||
| 				Key:    e.Key, | ||||
| 				Value:  e.Value, | ||||
| 			} | ||||
| 		}), | ||||
| 	}, nil | ||||
| } | ||||
| @ -51,6 +51,10 @@ import ( | ||||
| 	"google.golang.org/grpc" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	defaultSecret = "openIM123" | ||||
| ) | ||||
| 
 | ||||
| type userServer struct { | ||||
| 	pbuser.UnimplementedUserServer | ||||
| 	online                   cache.OnlineCache | ||||
| @ -62,6 +66,7 @@ type userServer struct { | ||||
| 	webhookClient            *webhook.Client | ||||
| 	groupClient              *rpcli.GroupClient | ||||
| 	relationClient           *rpcli.RelationClient | ||||
| 	clientConfig             controller.ClientConfigDatabase | ||||
| } | ||||
| 
 | ||||
| type Config struct { | ||||
| @ -94,6 +99,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	clientConfigDB, err := mgo.NewClientConfig(mgocli.GetDB()) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @ -118,9 +127,9 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi | ||||
| 		userNotificationSender:   NewUserNotificationSender(config, msgClient, WithUserFunc(database.FindWithError)), | ||||
| 		config:                   config, | ||||
| 		webhookClient:            webhook.NewWebhookClient(config.WebhooksConfig.URL), | ||||
| 
 | ||||
| 		groupClient:    rpcli.NewGroupClient(groupConn), | ||||
| 		relationClient: rpcli.NewRelationClient(friendConn), | ||||
| 		clientConfig:             controller.NewClientConfigDatabase(clientConfigDB, redis.NewClientConfigCache(rdb, clientConfigDB), mgocli.GetTx()), | ||||
| 		groupClient:              rpcli.NewGroupClient(groupConn), | ||||
| 		relationClient:           rpcli.NewRelationClient(friendConn), | ||||
| 	} | ||||
| 	pbuser.RegisterUserServer(server, u) | ||||
| 	return u.db.InitOnce(context.Background(), users) | ||||
| @ -273,6 +282,10 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR | ||||
| 	if len(req.Users) == 0 { | ||||
| 		return nil, errs.ErrArgs.WrapMsg("users is empty") | ||||
| 	} | ||||
| 	// check if secret is changed | ||||
| 	if s.config.Share.Secret == defaultSecret { | ||||
| 		return nil, servererrs.ErrSecretNotChanged.Wrap() | ||||
| 	} | ||||
| 
 | ||||
| 	if err = authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { | ||||
| 		return nil, err | ||||
|  | ||||
| @ -37,7 +37,8 @@ const ( | ||||
| 
 | ||||
| // General error codes. | ||||
| const ( | ||||
| 	NoError       = 0     // No error | ||||
| 	NoError = 0 // No error | ||||
| 
 | ||||
| 	DatabaseError = 90002 // Database error (redis/mysql, etc.) | ||||
| 	NetworkError  = 90004 // Network error | ||||
| 	DataError     = 90007 // Data error | ||||
| @ -45,11 +46,12 @@ const ( | ||||
| 	CallbackError = 80000 | ||||
| 
 | ||||
| 	// General error codes. | ||||
| 	ServerInternalError = 500  // Server internal error | ||||
| 	ArgsError           = 1001 // Input parameter error | ||||
| 	NoPermissionError   = 1002 // Insufficient permission | ||||
| 	DuplicateKeyError   = 1003 | ||||
| 	RecordNotFoundError = 1004 // Record does not exist | ||||
| 	ServerInternalError   = 500  // Server internal error | ||||
| 	ArgsError             = 1001 // Input parameter error | ||||
| 	NoPermissionError     = 1002 // Insufficient permission | ||||
| 	DuplicateKeyError     = 1003 | ||||
| 	RecordNotFoundError   = 1004 // Record does not exist | ||||
| 	SecretNotChangedError = 1050 // secret not changed | ||||
| 
 | ||||
| 	// Account error codes. | ||||
| 	UserIDNotFoundError    = 1101 // UserID does not exist or is not registered | ||||
|  | ||||
| @ -17,6 +17,8 @@ package servererrs | ||||
| import "github.com/openimsdk/tools/errs" | ||||
| 
 | ||||
| var ( | ||||
| 	ErrSecretNotChanged = errs.NewCodeError(SecretNotChangedError, "secret not changed, please change secret in config/share.yml for security reasons") | ||||
| 
 | ||||
| 	ErrDatabase         = errs.NewCodeError(DatabaseError, "DatabaseError") | ||||
| 	ErrNetwork          = errs.NewCodeError(NetworkError, "NetworkError") | ||||
| 	ErrCallback         = errs.NewCodeError(CallbackError, "CallbackError") | ||||
|  | ||||
							
								
								
									
										10
									
								
								pkg/common/storage/cache/cachekey/client_config.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								pkg/common/storage/cache/cachekey/client_config.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,10 @@ | ||||
| package cachekey | ||||
| 
 | ||||
| const ClientConfig = "CLIENT_CONFIG" | ||||
| 
 | ||||
| func GetClientConfigKey(userID string) string { | ||||
| 	if userID == "" { | ||||
| 		return ClientConfig | ||||
| 	} | ||||
| 	return ClientConfig + ":" + userID | ||||
| } | ||||
							
								
								
									
										8
									
								
								pkg/common/storage/cache/client_config.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								pkg/common/storage/cache/client_config.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,8 @@ | ||||
| package cache | ||||
| 
 | ||||
| import "context" | ||||
| 
 | ||||
| type ClientConfigCache interface { | ||||
| 	DeleteUserCache(ctx context.Context, userIDs []string) error | ||||
| 	GetUserConfig(ctx context.Context, userID string) (map[string]string, error) | ||||
| } | ||||
							
								
								
									
										69
									
								
								pkg/common/storage/cache/redis/client_config.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								pkg/common/storage/cache/redis/client_config.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,69 @@ | ||||
| package redis | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/redis/go-redis/v9" | ||||
| ) | ||||
| 
 | ||||
| func NewClientConfigCache(rdb redis.UniversalClient, mgo database.ClientConfig) cache.ClientConfigCache { | ||||
| 	rc := newRocksCacheClient(rdb) | ||||
| 	return &ClientConfigCache{ | ||||
| 		mgo:      mgo, | ||||
| 		rcClient: rc, | ||||
| 		delete:   rc.GetBatchDeleter(), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type ClientConfigCache struct { | ||||
| 	mgo      database.ClientConfig | ||||
| 	rcClient *rocksCacheClient | ||||
| 	delete   cache.BatchDeleter | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfigCache) getExpireTime(userID string) time.Duration { | ||||
| 	if userID == "" { | ||||
| 		return time.Hour * 24 | ||||
| 	} else { | ||||
| 		return time.Hour | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfigCache) getClientConfigKey(userID string) string { | ||||
| 	return cachekey.GetClientConfigKey(userID) | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfigCache) GetConfig(ctx context.Context, userID string) (map[string]string, error) { | ||||
| 	return getCache(ctx, x.rcClient, x.getClientConfigKey(userID), x.getExpireTime(userID), func(ctx context.Context) (map[string]string, error) { | ||||
| 		return x.mgo.Get(ctx, userID) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfigCache) DeleteUserCache(ctx context.Context, userIDs []string) error { | ||||
| 	keys := make([]string, 0, len(userIDs)) | ||||
| 	for _, userID := range userIDs { | ||||
| 		keys = append(keys, x.getClientConfigKey(userID)) | ||||
| 	} | ||||
| 	return x.delete.ExecDelWithKeys(ctx, keys) | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfigCache) GetUserConfig(ctx context.Context, userID string) (map[string]string, error) { | ||||
| 	config, err := x.GetConfig(ctx, "") | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if userID != "" { | ||||
| 		userConfig, err := x.GetConfig(ctx, userID) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		for k, v := range userConfig { | ||||
| 			config[k] = v | ||||
| 		} | ||||
| 	} | ||||
| 	return config, nil | ||||
| } | ||||
							
								
								
									
										58
									
								
								pkg/common/storage/controller/client_config.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										58
									
								
								pkg/common/storage/controller/client_config.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,58 @@ | ||||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"github.com/openimsdk/tools/db/pagination" | ||||
| 	"github.com/openimsdk/tools/db/tx" | ||||
| ) | ||||
| 
 | ||||
| type ClientConfigDatabase interface { | ||||
| 	SetUserConfig(ctx context.Context, userID string, config map[string]string) error | ||||
| 	GetUserConfig(ctx context.Context, userID string) (map[string]string, error) | ||||
| 	DelUserConfig(ctx context.Context, userID string, keys []string) error | ||||
| 	GetUserConfigPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) | ||||
| } | ||||
| 
 | ||||
| func NewClientConfigDatabase(db database.ClientConfig, cache cache.ClientConfigCache, tx tx.Tx) ClientConfigDatabase { | ||||
| 	return &clientConfigDatabase{ | ||||
| 		tx:    tx, | ||||
| 		db:    db, | ||||
| 		cache: cache, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type clientConfigDatabase struct { | ||||
| 	tx    tx.Tx | ||||
| 	db    database.ClientConfig | ||||
| 	cache cache.ClientConfigCache | ||||
| } | ||||
| 
 | ||||
| func (x *clientConfigDatabase) SetUserConfig(ctx context.Context, userID string, config map[string]string) error { | ||||
| 	return x.tx.Transaction(ctx, func(ctx context.Context) error { | ||||
| 		if err := x.db.Set(ctx, userID, config); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		return x.cache.DeleteUserCache(ctx, []string{userID}) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (x *clientConfigDatabase) GetUserConfig(ctx context.Context, userID string) (map[string]string, error) { | ||||
| 	return x.cache.GetUserConfig(ctx, userID) | ||||
| } | ||||
| 
 | ||||
| func (x *clientConfigDatabase) DelUserConfig(ctx context.Context, userID string, keys []string) error { | ||||
| 	return x.tx.Transaction(ctx, func(ctx context.Context) error { | ||||
| 		if err := x.db.Del(ctx, userID, keys); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		return x.cache.DeleteUserCache(ctx, []string{userID}) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (x *clientConfigDatabase) GetUserConfigPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) { | ||||
| 	return x.db.GetPage(ctx, userID, key, pagination) | ||||
| } | ||||
| @ -33,7 +33,7 @@ type S3Database interface { | ||||
| 	PartLimit() (*s3.PartLimit, error) | ||||
| 	PartSize(ctx context.Context, size int64) (int64, error) | ||||
| 	AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error) | ||||
| 	InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) | ||||
| 	InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int, contentType string) (*cont.InitiateUploadResult, error) | ||||
| 	CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) | ||||
| 	AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error) | ||||
| 	SetObject(ctx context.Context, info *model.Object) error | ||||
| @ -73,8 +73,8 @@ func (s *s3Database) AuthSign(ctx context.Context, uploadID string, partNumbers | ||||
| 	return s.s3.AuthSign(ctx, uploadID, partNumbers) | ||||
| } | ||||
| 
 | ||||
| func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) { | ||||
| 	return s.s3.InitiateUpload(ctx, hash, size, expire, maxParts) | ||||
| func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int, contentType string) (*cont.InitiateUploadResult, error) { | ||||
| 	return s.s3.InitiateUploadContentType(ctx, hash, size, expire, maxParts, contentType) | ||||
| } | ||||
| 
 | ||||
| func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) { | ||||
|  | ||||
							
								
								
									
										15
									
								
								pkg/common/storage/database/client_config.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								pkg/common/storage/database/client_config.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,15 @@ | ||||
| package database | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"github.com/openimsdk/tools/db/pagination" | ||||
| ) | ||||
| 
 | ||||
| type ClientConfig interface { | ||||
| 	Set(ctx context.Context, userID string, config map[string]string) error | ||||
| 	Get(ctx context.Context, userID string) (map[string]string, error) | ||||
| 	Del(ctx context.Context, userID string, keys []string) error | ||||
| 	GetPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) | ||||
| } | ||||
							
								
								
									
										99
									
								
								pkg/common/storage/database/mgo/client_config.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								pkg/common/storage/database/mgo/client_config.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,99 @@ | ||||
| // Copyright © 2023 OpenIM open source community. All rights reserved. | ||||
| // | ||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| // you may not use this file except in compliance with the License. | ||||
| // You may obtain a copy of the License at | ||||
| // | ||||
| //     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| // | ||||
| // Unless required by applicable law or agreed to in writing, software | ||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| // See the License for the specific language governing permissions and | ||||
| // limitations under the License. | ||||
| 
 | ||||
| package mgo | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	"github.com/openimsdk/tools/db/mongoutil" | ||||
| 	"github.com/openimsdk/tools/db/pagination" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/mongo" | ||||
| 	"go.mongodb.org/mongo-driver/mongo/options" | ||||
| 
 | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| ) | ||||
| 
 | ||||
| func NewClientConfig(db *mongo.Database) (database.ClientConfig, error) { | ||||
| 	coll := db.Collection("config") | ||||
| 	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ | ||||
| 		{ | ||||
| 			Keys: bson.D{ | ||||
| 				{Key: "key", Value: 1}, | ||||
| 				{Key: "user_id", Value: 1}, | ||||
| 			}, | ||||
| 			Options: options.Index().SetUnique(true), | ||||
| 		}, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, errs.Wrap(err) | ||||
| 	} | ||||
| 	return &ClientConfig{ | ||||
| 		coll: coll, | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| type ClientConfig struct { | ||||
| 	coll *mongo.Collection | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfig) Set(ctx context.Context, userID string, config map[string]string) error { | ||||
| 	if len(config) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 	for key, value := range config { | ||||
| 		filter := bson.M{"key": key, "user_id": userID} | ||||
| 		update := bson.M{ | ||||
| 			"value": value, | ||||
| 		} | ||||
| 		err := mongoutil.UpdateOne(ctx, x.coll, filter, bson.M{"$set": update}, false, options.Update().SetUpsert(true)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfig) Get(ctx context.Context, userID string) (map[string]string, error) { | ||||
| 	cs, err := mongoutil.Find[*model.ClientConfig](ctx, x.coll, bson.M{"user_id": userID}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	cm := make(map[string]string) | ||||
| 	for _, config := range cs { | ||||
| 		cm[config.Key] = config.Value | ||||
| 	} | ||||
| 	return cm, nil | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfig) Del(ctx context.Context, userID string, keys []string) error { | ||||
| 	if len(keys) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return mongoutil.DeleteMany(ctx, x.coll, bson.M{"key": bson.M{"$in": keys}, "user_id": userID}) | ||||
| } | ||||
| 
 | ||||
| func (x *ClientConfig) GetPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) { | ||||
| 	filter := bson.M{} | ||||
| 	if userID != "" { | ||||
| 		filter["user_id"] = userID | ||||
| 	} | ||||
| 	if key != "" { | ||||
| 		filter["key"] = key | ||||
| 	} | ||||
| 	return mongoutil.FindPage[*model.ClientConfig](ctx, x.coll, filter, pagination) | ||||
| } | ||||
							
								
								
									
										7
									
								
								pkg/common/storage/model/client_config.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								pkg/common/storage/model/client_config.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,7 @@ | ||||
| package model | ||||
| 
 | ||||
| type ClientConfig struct { | ||||
| 	Key    string `bson:"key"` | ||||
| 	UserID string `bson:"user_id"` | ||||
| 	Value  string `bson:"value"` | ||||
| } | ||||
| @ -4,6 +4,11 @@ import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"path/filepath" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/mitchellh/mapstructure" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" | ||||
| @ -19,10 +24,6 @@ import ( | ||||
| 	"github.com/openimsdk/tools/s3/oss" | ||||
| 	"github.com/spf13/viper" | ||||
| 	"go.mongodb.org/mongo-driver/mongo" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"path/filepath" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| const defaultTimeout = time.Second * 10 | ||||
| @ -159,7 +160,7 @@ func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Res | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour) | ||||
| 	putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour, &s3.PutOption{ContentType: obj.ContentType}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @ -176,7 +177,7 @@ func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Res | ||||
| 		return nil, fmt.Errorf("download object failed %s", downloadResp.Status) | ||||
| 	} | ||||
| 	log.Printf("file size %d", obj.Size) | ||||
| 	request, err := http.NewRequest(http.MethodPut, putURL, downloadResp.Body) | ||||
| 	request, err := http.NewRequest(http.MethodPut, putURL.URL, downloadResp.Body) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
							
								
								
									
										25
									
								
								tools/stress-test/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								tools/stress-test/README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,25 @@ | ||||
| # Stress Test | ||||
| 
 | ||||
| ## Usage | ||||
| 
 | ||||
| You need set `TestTargetUserList` and `DefaultGroupID` variables. | ||||
| 
 | ||||
| ### Build | ||||
| 
 | ||||
| ```bash | ||||
| go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go | ||||
| 
 | ||||
| # or | ||||
| 
 | ||||
| go build -o tools/stress-test/stress-test tools/stress-test/main.go | ||||
| ``` | ||||
| 
 | ||||
| ### Excute | ||||
| 
 | ||||
| ```bash | ||||
| _output/bin/tools/linux/amd64/stress-test -c config/ | ||||
| 
 | ||||
| #or | ||||
| 
 | ||||
| tools/stress-test/stress-test -c config/ | ||||
| ``` | ||||
							
								
								
									
										453
									
								
								tools/stress-test/main.go
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										453
									
								
								tools/stress-test/main.go
									
									
									
									
									
										Executable file
									
								
							| @ -0,0 +1,453 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"flag" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"os/signal" | ||||
| 	"sync" | ||||
| 	"syscall" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/protocol/auth" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	"github.com/openimsdk/protocol/group" | ||||
| 	"github.com/openimsdk/protocol/relation" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
| 	pbuser "github.com/openimsdk/protocol/user" | ||||
| 	"github.com/openimsdk/tools/log" | ||||
| 	"github.com/openimsdk/tools/system/program" | ||||
| ) | ||||
| 
 | ||||
| /* | ||||
|  1. Create one user every minute | ||||
|  2. Import target users as friends | ||||
|  3. Add users to the default group | ||||
|  4. Send a message to the default group every second, containing index and current timestamp | ||||
|  5. Create a new group every minute and invite target users to join | ||||
| */ | ||||
| 
 | ||||
| // !!! ATTENTION: This variable is must be added! | ||||
| var ( | ||||
| 	//  Use default userIDs List for testing, need to be created. | ||||
| 	TestTargetUserList = []string{ | ||||
| 		"<need-update-it>", | ||||
| 	} | ||||
| 	DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created. | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	ApiAddress string | ||||
| 
 | ||||
| 	// API method | ||||
| 	GetAdminToken = "/auth/get_admin_token" | ||||
| 	CreateUser    = "/user/user_register" | ||||
| 	ImportFriend  = "/friend/import_friend" | ||||
| 	InviteToGroup = "/group/invite_user_to_group" | ||||
| 	SendMsg       = "/msg/send_msg" | ||||
| 	CreateGroup   = "/group/create_group" | ||||
| 	GetUserToken  = "/auth/user_token" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	MaxUser  = 10000 | ||||
| 	MaxGroup = 1000 | ||||
| 
 | ||||
| 	CreateUserTicker  = 1 * time.Minute // Ticker is 1min in create user | ||||
| 	SendMessageTicker = 1 * time.Second // Ticker is 1s in send message | ||||
| 	CreateGroupTicker = 1 * time.Minute | ||||
| ) | ||||
| 
 | ||||
| type BaseResp struct { | ||||
| 	ErrCode int             `json:"errCode"` | ||||
| 	ErrMsg  string          `json:"errMsg"` | ||||
| 	Data    json.RawMessage `json:"data"` | ||||
| } | ||||
| 
 | ||||
| type StressTest struct { | ||||
| 	Conf              *conf | ||||
| 	AdminUserID       string | ||||
| 	AdminToken        string | ||||
| 	DefaultGroupID    string | ||||
| 	DefaultSendUserID string | ||||
| 	UserCounter       int | ||||
| 	GroupCounter      int | ||||
| 	MsgCounter        int | ||||
| 	CreatedUsers      []string | ||||
| 	CreatedGroups     []string | ||||
| 	Mutex             sync.Mutex | ||||
| 	Ctx               context.Context | ||||
| 	Cancel            context.CancelFunc | ||||
| 	HttpClient        *http.Client | ||||
| 	Wg                sync.WaitGroup | ||||
| 	Once              sync.Once | ||||
| } | ||||
| 
 | ||||
| type conf struct { | ||||
| 	Share config.Share | ||||
| 	Api   config.API | ||||
| } | ||||
| 
 | ||||
| func initConfig(configDir string) (*config.Share, *config.API, error) { | ||||
| 	var ( | ||||
| 		share     = &config.Share{} | ||||
| 		apiConfig = &config.API{} | ||||
| 	) | ||||
| 
 | ||||
| 	err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return share, apiConfig, nil | ||||
| } | ||||
| 
 | ||||
| // Post Request | ||||
| func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { | ||||
| 	// Marshal body | ||||
| 	jsonBody, err := json.Marshal(reqbody) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	req.Header.Set("operationID", st.AdminUserID) | ||||
| 	if st.AdminToken != "" { | ||||
| 		req.Header.Set("token", st.AdminToken) | ||||
| 	} | ||||
| 
 | ||||
| 	// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) | ||||
| 
 | ||||
| 	resp, err := st.HttpClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| 
 | ||||
| 	respBody, err := io.ReadAll(resp.Body) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to read response body", err, "url", url) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	var baseResp BaseResp | ||||
| 	if err := json.Unmarshal(respBody, &baseResp); err != nil { | ||||
| 		log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if baseResp.ErrCode != 0 { | ||||
| 		err = fmt.Errorf(baseResp.ErrMsg) | ||||
| 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return baseResp.Data, nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { | ||||
| 	req := auth.GetAdminTokenReq{ | ||||
| 		Secret: st.Conf.Share.Secret, | ||||
| 		UserID: st.AdminUserID, | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	data := &auth.GetAdminTokenResp{} | ||||
| 	if err := json.Unmarshal(resp, &data); err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return data.Token, nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { | ||||
| 	user := &sdkws.UserInfo{ | ||||
| 		UserID:   userID, | ||||
| 		Nickname: userID, | ||||
| 	} | ||||
| 
 | ||||
| 	req := pbuser.UserRegisterReq{ | ||||
| 		Users: []*sdkws.UserInfo{user}, | ||||
| 	} | ||||
| 
 | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	st.UserCounter++ | ||||
| 	return userID, nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { | ||||
| 	req := relation.ImportFriendReq{ | ||||
| 		OwnerUserID:   userID, | ||||
| 		FriendUserIDs: TestTargetUserList, | ||||
| 	} | ||||
| 
 | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { | ||||
| 	req := group.InviteUserToGroupReq{ | ||||
| 		GroupID:        st.DefaultGroupID, | ||||
| 		InvitedUserIDs: []string{userID}, | ||||
| 	} | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) SendMsg(ctx context.Context, userID string) error { | ||||
| 	contentObj := map[string]any{ | ||||
| 		"content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), | ||||
| 	} | ||||
| 
 | ||||
| 	req := map[string]any{ | ||||
| 		"sendID":      userID, | ||||
| 		"groupID":     st.DefaultGroupID, | ||||
| 		"contentType": constant.Text, | ||||
| 		"sessionType": constant.ReadGroupChatType, | ||||
| 		"content":     contentObj, | ||||
| 	} | ||||
| 
 | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	st.MsgCounter++ | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { | ||||
| 	groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) | ||||
| 
 | ||||
| 	req := map[string]any{ | ||||
| 		"memberUserIDs": TestTargetUserList, | ||||
| 		"ownerUserID":   userID, | ||||
| 		"groupInfo": map[string]any{ | ||||
| 			"groupID":   groupID, | ||||
| 			"groupName": groupID, | ||||
| 			"groupType": constant.WorkingGroup, | ||||
| 		}, | ||||
| 	} | ||||
| 	resp := group.CreateGroupResp{} | ||||
| 
 | ||||
| 	response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := json.Unmarshal(response, &resp); err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	st.GroupCounter++ | ||||
| 
 | ||||
| 	return resp.GroupInfo.GroupID, nil | ||||
| } | ||||
| 
 | ||||
| func main() { | ||||
| 	var configPath string | ||||
| 	// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") | ||||
| 	// flag.StringVar(&configPath, "c", defaultConfigDir, "config path") | ||||
| 	flag.StringVar(&configPath, "c", "", "config path") | ||||
| 	flag.Parse() | ||||
| 
 | ||||
| 	if configPath == "" { | ||||
| 		_, _ = fmt.Fprintln(os.Stderr, "config path is empty") | ||||
| 		os.Exit(1) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	fmt.Printf(" Config Path: %s\n", configPath) | ||||
| 
 | ||||
| 	share, apiConfig, err := initConfig(configPath) | ||||
| 	if err != nil { | ||||
| 		program.ExitWithError(err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) | ||||
| 
 | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	ch := make(chan struct{}) | ||||
| 
 | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	st := &StressTest{ | ||||
| 		Conf: &conf{ | ||||
| 			Share: *share, | ||||
| 			Api:   *apiConfig, | ||||
| 		}, | ||||
| 		AdminUserID: share.IMAdminUserID[0], | ||||
| 		Ctx:         ctx, | ||||
| 		Cancel:      cancel, | ||||
| 		HttpClient: &http.Client{ | ||||
| 			Timeout: 50 * time.Second, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	c := make(chan os.Signal, 1) | ||||
| 	signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||||
| 	go func() { | ||||
| 		<-c | ||||
| 		fmt.Println("\nReceived stop signal, stopping...") | ||||
| 
 | ||||
| 		select { | ||||
| 		case <-ch: | ||||
| 		default: | ||||
| 			close(ch) | ||||
| 		} | ||||
| 
 | ||||
| 		st.Cancel() | ||||
| 	}() | ||||
| 
 | ||||
| 	token, err := st.GetAdminToken(st.Ctx) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) | ||||
| 	} | ||||
| 
 | ||||
| 	st.AdminToken = token | ||||
| 	fmt.Println("Admin Token:", st.AdminToken) | ||||
| 	fmt.Println("ApiAddress:", ApiAddress) | ||||
| 
 | ||||
| 	st.DefaultGroupID = DefaultGroupID | ||||
| 
 | ||||
| 	st.Wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer st.Wg.Done() | ||||
| 
 | ||||
| 		ticker := time.NewTicker(CreateUserTicker) | ||||
| 		defer ticker.Stop() | ||||
| 
 | ||||
| 		for st.UserCounter < MaxUser { | ||||
| 			select { | ||||
| 			case <-st.Ctx.Done(): | ||||
| 				log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") | ||||
| 				return | ||||
| 
 | ||||
| 			case <-ticker.C: | ||||
| 				// Create User | ||||
| 				userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) | ||||
| 
 | ||||
| 				userCreatedID, err := st.CreateUser(st.Ctx, userID) | ||||
| 				if err != nil { | ||||
| 					log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 				// fmt.Println("User Created ID:", userCreatedID) | ||||
| 
 | ||||
| 				// Import Friend | ||||
| 				if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { | ||||
| 					log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 
 | ||||
| 				// Invite To Group | ||||
| 				if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { | ||||
| 					log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 
 | ||||
| 				st.Once.Do(func() { | ||||
| 					st.DefaultSendUserID = userCreatedID | ||||
| 					fmt.Println("Default Send User Created ID:", userCreatedID) | ||||
| 					close(ch) | ||||
| 				}) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	st.Wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer st.Wg.Done() | ||||
| 
 | ||||
| 		ticker := time.NewTicker(SendMessageTicker) | ||||
| 		defer ticker.Stop() | ||||
| 		<-ch | ||||
| 
 | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-st.Ctx.Done(): | ||||
| 				log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") | ||||
| 				return | ||||
| 
 | ||||
| 			case <-ticker.C: | ||||
| 				// Send Message | ||||
| 				if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { | ||||
| 					log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	st.Wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer st.Wg.Done() | ||||
| 
 | ||||
| 		ticker := time.NewTicker(CreateGroupTicker) | ||||
| 		defer ticker.Stop() | ||||
| 		<-ch | ||||
| 
 | ||||
| 		for st.GroupCounter < MaxGroup { | ||||
| 
 | ||||
| 			select { | ||||
| 			case <-st.Ctx.Done(): | ||||
| 				log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") | ||||
| 				return | ||||
| 
 | ||||
| 			case <-ticker.C: | ||||
| 
 | ||||
| 				// Create Group | ||||
| 				_, err := st.CreateGroup(st.Ctx, st.DefaultSendUserID) | ||||
| 				if err != nil { | ||||
| 					log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultSendUserID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 
 | ||||
| 				// fmt.Println("Group Created ID:", groupID) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	st.Wg.Wait() | ||||
| } | ||||
| @ -1,6 +1,14 @@ | ||||
| package version | ||||
| 
 | ||||
| import _ "embed" | ||||
| import ( | ||||
| 	_ "embed" | ||||
| 	"strings" | ||||
| ) | ||||
| 
 | ||||
| //go:embed version | ||||
| var Version string | ||||
| 
 | ||||
| func init() { | ||||
| 	Version = strings.Trim(Version, "\n") | ||||
| 	Version = strings.TrimSpace(Version) | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user