Merge branch 'pre-release-v3.8.4' into cherry-pick-fa3d251

This commit is contained in:
chao 2025-05-14 16:37:14 +08:00 committed by GitHub
commit 24d13b2ee1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 2983 additions and 56 deletions

View File

@ -12,6 +12,10 @@ jobs:
go-build:
name: Test with go ${{ matrix.go_version }} on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
env:
SHARE_CONFIG_PATH: config/share.yml
permissions:
contents: write
pull-requests: write
@ -40,6 +44,10 @@ jobs:
with:
compose-file: "./docker-compose.yml"
- name: Modify Server Configuration
run: |
yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }}
# - name: Get Internal IP Address
# id: get-ip
# run: |
@ -71,6 +79,11 @@ jobs:
go mod download
go install github.com/magefile/mage@latest
- name: Modify Chat Configuration
run: |
cd ${{ github.workspace }}/chat-repo
yq e '.openIM.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }}
- name: Build and test Chat Services
run: |
cd ${{ github.workspace }}/chat-repo
@ -132,7 +145,7 @@ jobs:
# Test get admin token
get_admin_token_response=$(curl -X POST -H "Content-Type: application/json" -H "operationID: imAdmin" -d '{
"secret": "openIM123",
"secret": "123456",
"platformID": 2,
"userID": "imAdmin"
}' http://127.0.0.1:10002/auth/get_admin_token)
@ -169,7 +182,8 @@ jobs:
contents: write
env:
SDK_DIR: openim-sdk-core
CONFIG_PATH: config/notification.yml
NOTIFICATION_CONFIG_PATH: config/notification.yml
SHARE_CONFIG_PATH: config/share.yml
strategy:
matrix:
@ -184,7 +198,7 @@ jobs:
uses: actions/checkout@v4
with:
repository: "openimsdk/openim-sdk-core"
ref: "release-v3.8"
ref: "main"
path: ${{ env.SDK_DIR }}
- name: Set up Go ${{ matrix.go_version }}
@ -199,8 +213,9 @@ jobs:
- name: Modify Server Configuration
run: |
yq e '.groupCreated.isSendMsg = true' -i ${{ env.CONFIG_PATH }}
yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.CONFIG_PATH }}
yq e '.groupCreated.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }}
yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }}
yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }}
- name: Start Server Services
run: |

View File

@ -28,6 +28,8 @@ run:
# - util
# - .*~
# - api/swagger/docs
# - server/docs
# - components/mnt/config/certs
# - logs

View File

@ -131,7 +131,15 @@ Thank you for contributing to building a powerful instant messaging solution!
## :closed_book: License
This software is licensed under the Apache License 2.0
This software is licensed under a dual-license model:
- The GNU Affero General Public License (AGPL), Version 3 or later; **OR**
- Commercial license terms from OpenIMSDK.
If you wish to use this software under commercial terms, please contact us at: contact@openim.io
For more information, see: https://www.openim.io/en/licensing

View File

@ -131,9 +131,17 @@
感谢您的贡献,一起来打造强大的即时通讯解决方案!
## :closed_book: 许可证
## :closed_book: 开源许可证 License
本软件采用双重授权模型:
GNU Affero 通用公共许可证AGPL第 3 版或更高版本;或
来自 OpenIMSDK 的商业授权条款。
如需商用请联系contact@openim.io
详见https://www.openim.io/en/licensing
This software is licensed under the Apache License 2.0
## 🔮 Thanks to our contributors!

View File

@ -1,6 +1,6 @@
secret: openIM123
imAdminUserID: [ imAdmin ]
imAdminUserID: [imAdmin]
# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time
multiLogin:

6
go.mod
View File

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.79
github.com/openimsdk/tools v0.0.50-alpha.74
github.com/openimsdk/protocol v0.0.73-alpha.6
github.com/openimsdk/tools v0.0.50-alpha.79
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
@ -219,3 +219,5 @@ require (
golang.org/x/crypto v0.27.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
//replace github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol

12
go.sum
View File

@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s=
github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc=
github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.6 h1:sna9coWG7HN1zObBPtvG0Ki/vzqHXiB4qKbA5P3w7kc=
github.com/openimsdk/protocol v0.0.73-alpha.6/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.79 h1:jxYEbrzaze4Z2r4NrKad816buZ690ix0L9MTOOOH3ik=
github.com/openimsdk/tools v0.0.50-alpha.79/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -144,24 +144,23 @@ func Start(ctx context.Context, index int, config *Config) error {
}
}()
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
shutdown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := server.Shutdown(ctx)
if err != nil {
return errs.WrapMsg(err, "shutdown err")
}
return nil
}
disetcd.RegisterShutDown(shutdown)
//if config.Discovery.Enable == conf.ETCD {
// cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
// cm.Watch(ctx)
//}
//sigs := make(chan os.Signal, 1)
//signal.Notify(sigs, syscall.SIGTERM)
//select {
//case val := <-sigs:
// log.ZDebug(ctx, "recv exit", "signal", val.String())
// cancel(fmt.Errorf("signal %s", val.String()))
//case <-ctx.Done():
//}
<-apiCtx.Done()
exitCause := context.Cause(apiCtx)
log.ZWarn(ctx, "api server exit", exitCause)
timer := time.NewTimer(time.Second * 15)
defer timer.Stop()
select {
case <-sigs:
program.SIGTERMExit()

View File

@ -124,6 +124,11 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
userRouterGroup.POST("/add_notification_account", u.AddNotificationAccount)
userRouterGroup.POST("/update_notification_account", u.UpdateNotificationAccountInfo)
userRouterGroup.POST("/search_notification_account", u.SearchNotificationAccount)
userRouterGroup.POST("/get_user_client_config", u.GetUserClientConfig)
userRouterGroup.POST("/set_user_client_config", u.SetUserClientConfig)
userRouterGroup.POST("/del_user_client_config", u.DelUserClientConfig)
userRouterGroup.POST("/page_user_client_config", u.PageUserClientConfig)
}
// friend routing group
{

View File

@ -242,3 +242,19 @@ func (u *UserApi) UpdateNotificationAccountInfo(c *gin.Context) {
func (u *UserApi) SearchNotificationAccount(c *gin.Context) {
a2r.Call(c, user.UserClient.SearchNotificationAccount, u.Client)
}
func (u *UserApi) GetUserClientConfig(c *gin.Context) {
a2r.Call(c, user.UserClient.GetUserClientConfig, u.Client)
}
func (u *UserApi) SetUserClientConfig(c *gin.Context) {
a2r.Call(c, user.UserClient.SetUserClientConfig, u.Client)
}
func (u *UserApi) DelUserClientConfig(c *gin.Context) {
a2r.Call(c, user.UserClient.DelUserClientConfig, u.Client)
}
func (u *UserApi) PageUserClientConfig(c *gin.Context) {
a2r.Call(c, user.UserClient.PageUserClientConfig, u.Client)
}

View File

@ -1382,6 +1382,7 @@ func (g *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou
if err != nil {
return nil, err
}
group.Status = constant.GroupStatusDismissed
tips := &sdkws.GroupDismissedTips{
Group: g.groupDB2PB(group, owner.UserID, num),
OpUser: &sdkws.GroupMemberFullInfo{},

View File

@ -283,7 +283,8 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri
func (g *NotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
versions := versionctx.GetVersionLog(ctx).Get()
for _, coll := range versions {
for i := len(versions) - 1; i >= 0; i-- {
coll := versions[i]
if coll.Name == collName && coll.Doc.DID == id {
*version = uint64(coll.Doc.Version)
*versionID = coll.Doc.ID.Hex()

View File

@ -61,6 +61,13 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
return nil, err
}
resp := &msg.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*msg.Seqs)}
if req.ReturnPinned {
pinnedConversationIDs, err := m.ConversationLocalCache.GetPinnedConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
resp.PinnedConversationIDs = pinnedConversationIDs
}
for conversationID, maxSeq := range maxSeqs {
resp.Seqs[conversationID] = &msg.Seqs{
HasReadSeq: hasReadSeqs[conversationID],

View File

@ -62,7 +62,7 @@ func (t *thirdServer) InitiateMultipartUpload(ctx context.Context, req *third.In
return nil, err
}
expireTime := time.Now().Add(t.defaultExpire)
result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, t.defaultExpire, int(req.MaxParts))
result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, t.defaultExpire, int(req.MaxParts), req.ContentType)
if err != nil {
if haErr, ok := errs.Unwrap(err).(*cont.HashAlreadyExistsError); ok {
obj := &model.Object{

View File

@ -0,0 +1,71 @@
package user
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/utils/datautil"
)
func (s *userServer) GetUserClientConfig(ctx context.Context, req *pbuser.GetUserClientConfigReq) (*pbuser.GetUserClientConfigResp, error) {
if req.UserID != "" {
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if _, err := s.db.GetUserByID(ctx, req.UserID); err != nil {
return nil, err
}
}
res, err := s.clientConfig.GetUserConfig(ctx, req.UserID)
if err != nil {
return nil, err
}
return &pbuser.GetUserClientConfigResp{Configs: res}, nil
}
func (s *userServer) SetUserClientConfig(ctx context.Context, req *pbuser.SetUserClientConfigReq) (*pbuser.SetUserClientConfigResp, error) {
if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.UserID != "" {
if _, err := s.db.GetUserByID(ctx, req.UserID); err != nil {
return nil, err
}
}
if err := s.clientConfig.SetUserConfig(ctx, req.UserID, req.Configs); err != nil {
return nil, err
}
return &pbuser.SetUserClientConfigResp{}, nil
}
func (s *userServer) DelUserClientConfig(ctx context.Context, req *pbuser.DelUserClientConfigReq) (*pbuser.DelUserClientConfigResp, error) {
if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if err := s.clientConfig.DelUserConfig(ctx, req.UserID, req.Keys); err != nil {
return nil, err
}
return &pbuser.DelUserClientConfigResp{}, nil
}
func (s *userServer) PageUserClientConfig(ctx context.Context, req *pbuser.PageUserClientConfigReq) (*pbuser.PageUserClientConfigResp, error) {
if err := authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err
}
total, res, err := s.clientConfig.GetUserConfigPage(ctx, req.UserID, req.Key, req.Pagination)
if err != nil {
return nil, err
}
return &pbuser.PageUserClientConfigResp{
Total: total,
Configs: datautil.Slice(res, func(e *model.ClientConfig) *pbuser.ClientConfig {
return &pbuser.ClientConfig{
UserID: e.UserID,
Key: e.Key,
Value: e.Value,
}
}),
}, nil
}

View File

@ -51,6 +51,10 @@ import (
"google.golang.org/grpc"
)
const (
defaultSecret = "openIM123"
)
type userServer struct {
pbuser.UnimplementedUserServer
online cache.OnlineCache
@ -62,6 +66,7 @@ type userServer struct {
webhookClient *webhook.Client
groupClient *rpcli.GroupClient
relationClient *rpcli.RelationClient
clientConfig controller.ClientConfigDatabase
}
type Config struct {
@ -94,6 +99,10 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
if err != nil {
return err
}
clientConfigDB, err := mgo.NewClientConfig(mgocli.GetDB())
if err != nil {
return err
}
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg)
if err != nil {
return err
@ -118,9 +127,9 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
userNotificationSender: NewUserNotificationSender(config, msgClient, WithUserFunc(database.FindWithError)),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
clientConfig: controller.NewClientConfigDatabase(clientConfigDB, redis.NewClientConfigCache(rdb, clientConfigDB), mgocli.GetTx()),
groupClient: rpcli.NewGroupClient(groupConn),
relationClient: rpcli.NewRelationClient(friendConn),
}
pbuser.RegisterUserServer(server, u)
return u.db.InitOnce(context.Background(), users)
@ -273,6 +282,10 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if len(req.Users) == 0 {
return nil, errs.ErrArgs.WrapMsg("users is empty")
}
// check if secret is changed
if s.config.Share.Secret == defaultSecret {
return nil, servererrs.ErrSecretNotChanged.Wrap()
}
if err = authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err

View File

@ -37,7 +37,8 @@ const (
// General error codes.
const (
NoError = 0 // No error
NoError = 0 // No error
DatabaseError = 90002 // Database error (redis/mysql, etc.)
NetworkError = 90004 // Network error
DataError = 90007 // Data error
@ -45,11 +46,12 @@ const (
CallbackError = 80000
// General error codes.
ServerInternalError = 500 // Server internal error
ArgsError = 1001 // Input parameter error
NoPermissionError = 1002 // Insufficient permission
DuplicateKeyError = 1003
RecordNotFoundError = 1004 // Record does not exist
ServerInternalError = 500 // Server internal error
ArgsError = 1001 // Input parameter error
NoPermissionError = 1002 // Insufficient permission
DuplicateKeyError = 1003
RecordNotFoundError = 1004 // Record does not exist
SecretNotChangedError = 1050 // secret not changed
// Account error codes.
UserIDNotFoundError = 1101 // UserID does not exist or is not registered

View File

@ -17,6 +17,8 @@ package servererrs
import "github.com/openimsdk/tools/errs"
var (
ErrSecretNotChanged = errs.NewCodeError(SecretNotChangedError, "secret not changed, please change secret in config/share.yml for security reasons")
ErrDatabase = errs.NewCodeError(DatabaseError, "DatabaseError")
ErrNetwork = errs.NewCodeError(NetworkError, "NetworkError")
ErrCallback = errs.NewCodeError(CallbackError, "CallbackError")

View File

@ -0,0 +1,10 @@
package cachekey
const ClientConfig = "CLIENT_CONFIG"
func GetClientConfigKey(userID string) string {
if userID == "" {
return ClientConfig
}
return ClientConfig + ":" + userID
}

View File

@ -0,0 +1,8 @@
package cache
import "context"
type ClientConfigCache interface {
DeleteUserCache(ctx context.Context, userIDs []string) error
GetUserConfig(ctx context.Context, userID string) (map[string]string, error)
}

View File

@ -0,0 +1,69 @@
package redis
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/redis/go-redis/v9"
)
func NewClientConfigCache(rdb redis.UniversalClient, mgo database.ClientConfig) cache.ClientConfigCache {
rc := newRocksCacheClient(rdb)
return &ClientConfigCache{
mgo: mgo,
rcClient: rc,
delete: rc.GetBatchDeleter(),
}
}
type ClientConfigCache struct {
mgo database.ClientConfig
rcClient *rocksCacheClient
delete cache.BatchDeleter
}
func (x *ClientConfigCache) getExpireTime(userID string) time.Duration {
if userID == "" {
return time.Hour * 24
} else {
return time.Hour
}
}
func (x *ClientConfigCache) getClientConfigKey(userID string) string {
return cachekey.GetClientConfigKey(userID)
}
func (x *ClientConfigCache) GetConfig(ctx context.Context, userID string) (map[string]string, error) {
return getCache(ctx, x.rcClient, x.getClientConfigKey(userID), x.getExpireTime(userID), func(ctx context.Context) (map[string]string, error) {
return x.mgo.Get(ctx, userID)
})
}
func (x *ClientConfigCache) DeleteUserCache(ctx context.Context, userIDs []string) error {
keys := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
keys = append(keys, x.getClientConfigKey(userID))
}
return x.delete.ExecDelWithKeys(ctx, keys)
}
func (x *ClientConfigCache) GetUserConfig(ctx context.Context, userID string) (map[string]string, error) {
config, err := x.GetConfig(ctx, "")
if err != nil {
return nil, err
}
if userID != "" {
userConfig, err := x.GetConfig(ctx, userID)
if err != nil {
return nil, err
}
for k, v := range userConfig {
config[k] = v
}
}
return config, nil
}

View File

@ -0,0 +1,58 @@
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx"
)
type ClientConfigDatabase interface {
SetUserConfig(ctx context.Context, userID string, config map[string]string) error
GetUserConfig(ctx context.Context, userID string) (map[string]string, error)
DelUserConfig(ctx context.Context, userID string, keys []string) error
GetUserConfigPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error)
}
func NewClientConfigDatabase(db database.ClientConfig, cache cache.ClientConfigCache, tx tx.Tx) ClientConfigDatabase {
return &clientConfigDatabase{
tx: tx,
db: db,
cache: cache,
}
}
type clientConfigDatabase struct {
tx tx.Tx
db database.ClientConfig
cache cache.ClientConfigCache
}
func (x *clientConfigDatabase) SetUserConfig(ctx context.Context, userID string, config map[string]string) error {
return x.tx.Transaction(ctx, func(ctx context.Context) error {
if err := x.db.Set(ctx, userID, config); err != nil {
return err
}
return x.cache.DeleteUserCache(ctx, []string{userID})
})
}
func (x *clientConfigDatabase) GetUserConfig(ctx context.Context, userID string) (map[string]string, error) {
return x.cache.GetUserConfig(ctx, userID)
}
func (x *clientConfigDatabase) DelUserConfig(ctx context.Context, userID string, keys []string) error {
return x.tx.Transaction(ctx, func(ctx context.Context) error {
if err := x.db.Del(ctx, userID, keys); err != nil {
return err
}
return x.cache.DeleteUserCache(ctx, []string{userID})
})
}
func (x *clientConfigDatabase) GetUserConfigPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) {
return x.db.GetPage(ctx, userID, key, pagination)
}

View File

@ -33,7 +33,7 @@ type S3Database interface {
PartLimit() (*s3.PartLimit, error)
PartSize(ctx context.Context, size int64) (int64, error)
AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error)
InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error)
InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int, contentType string) (*cont.InitiateUploadResult, error)
CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error)
AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error)
SetObject(ctx context.Context, info *model.Object) error
@ -73,8 +73,8 @@ func (s *s3Database) AuthSign(ctx context.Context, uploadID string, partNumbers
return s.s3.AuthSign(ctx, uploadID, partNumbers)
}
func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error) {
return s.s3.InitiateUpload(ctx, hash, size, expire, maxParts)
func (s *s3Database) InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int, contentType string) (*cont.InitiateUploadResult, error) {
return s.s3.InitiateUploadContentType(ctx, hash, size, expire, maxParts, contentType)
}
func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error) {

View File

@ -0,0 +1,15 @@
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
)
type ClientConfig interface {
Set(ctx context.Context, userID string, config map[string]string) error
Get(ctx context.Context, userID string) (map[string]string, error)
Del(ctx context.Context, userID string, keys []string) error
GetPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error)
}

View File

@ -0,0 +1,99 @@
// Copyright © 2023 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/tools/errs"
)
func NewClientConfig(db *mongo.Database) (database.ClientConfig, error) {
coll := db.Collection("config")
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{
{Key: "key", Value: 1},
{Key: "user_id", Value: 1},
},
Options: options.Index().SetUnique(true),
},
})
if err != nil {
return nil, errs.Wrap(err)
}
return &ClientConfig{
coll: coll,
}, nil
}
type ClientConfig struct {
coll *mongo.Collection
}
func (x *ClientConfig) Set(ctx context.Context, userID string, config map[string]string) error {
if len(config) == 0 {
return nil
}
for key, value := range config {
filter := bson.M{"key": key, "user_id": userID}
update := bson.M{
"value": value,
}
err := mongoutil.UpdateOne(ctx, x.coll, filter, bson.M{"$set": update}, false, options.Update().SetUpsert(true))
if err != nil {
return err
}
}
return nil
}
func (x *ClientConfig) Get(ctx context.Context, userID string) (map[string]string, error) {
cs, err := mongoutil.Find[*model.ClientConfig](ctx, x.coll, bson.M{"user_id": userID})
if err != nil {
return nil, err
}
cm := make(map[string]string)
for _, config := range cs {
cm[config.Key] = config.Value
}
return cm, nil
}
func (x *ClientConfig) Del(ctx context.Context, userID string, keys []string) error {
if len(keys) == 0 {
return nil
}
return mongoutil.DeleteMany(ctx, x.coll, bson.M{"key": bson.M{"$in": keys}, "user_id": userID})
}
func (x *ClientConfig) GetPage(ctx context.Context, userID string, key string, pagination pagination.Pagination) (int64, []*model.ClientConfig, error) {
filter := bson.M{}
if userID != "" {
filter["user_id"] = userID
}
if key != "" {
filter["key"] = key
}
return mongoutil.FindPage[*model.ClientConfig](ctx, x.coll, filter, pagination)
}

View File

@ -0,0 +1,7 @@
package model
type ClientConfig struct {
Key string `bson:"key"`
UserID string `bson:"user_id"`
Value string `bson:"value"`
}

View File

@ -16,6 +16,7 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
@ -153,6 +154,26 @@ func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx con
}))
}
func (c *ConversationLocalCache) getPinnedConversationIDs(ctx context.Context, userID string) (val []string, err error) {
log.ZDebug(ctx, "ConversationLocalCache getPinnedConversations req", "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "ConversationLocalCache getPinnedConversations return", "userID", userID, "value", val)
} else {
log.ZError(ctx, "ConversationLocalCache getPinnedConversations return", err, "userID", userID)
}
}()
var cache cacheProto[pbconversation.GetPinnedConversationIDsResp]
resp, err := cache.Unmarshal(c.local.Get(ctx, cachekey.GetPinnedConversationIDs(userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs rpc", "userID", userID)
return cache.Marshal(c.client.ConversationClient.GetPinnedConversationIDs(ctx, &pbconversation.GetPinnedConversationIDsReq{UserID: userID}))
}))
if err != nil {
return nil, err
}
return resp.ConversationIDs, nil
}
func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID)
if err != nil {
@ -168,3 +189,7 @@ func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx c
}
return datautil.SliceSet(res.UserIDs), nil
}
func (c *ConversationLocalCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) {
return c.getPinnedConversationIDs(ctx, userID)
}

View File

@ -0,0 +1,19 @@
# Stress Test V2
## Usage
You need set `TestTargetUserList` variables.
### Build
```bash
go build -o test/stress-test-v2/stress-test-v2 test/stress-test-v2/main.go
```
### Excute
```bash
tools/stress-test-v2/stress-test-v2 -c config/
```

759
test/stress-test-v2/main.go Normal file
View File

@ -0,0 +1,759 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
)
// 1. Create 100K New Users
// 2. Create 100 100K Groups
// 3. Create 1000 999 Groups
// 4. Send message to 100K Groups every second
// 5. Send message to 999 Groups every minute
var (
// Use default userIDs List for testing, need to be created.
TestTargetUserList = []string{
// "<need-update-it>",
}
// DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created.
)
var (
ApiAddress string
// API method
GetAdminToken = "/auth/get_admin_token"
UserCheck = "/user/account_check"
CreateUser = "/user/user_register"
ImportFriend = "/friend/import_friend"
InviteToGroup = "/group/invite_user_to_group"
GetGroupMemberInfo = "/group/get_group_members_info"
SendMsg = "/msg/send_msg"
CreateGroup = "/group/create_group"
GetUserToken = "/auth/user_token"
)
const (
MaxUser = 100000
Max1kUser = 1000
Max100KGroup = 100
Max999Group = 1000
MaxInviteUserLimit = 999
CreateUserTicker = 1 * time.Second
CreateGroupTicker = 1 * time.Second
Create100KGroupTicker = 1 * time.Second
Create999GroupTicker = 1 * time.Second
SendMsgTo100KGroupTicker = 1 * time.Second
SendMsgTo999GroupTicker = 1 * time.Minute
)
type BaseResp struct {
ErrCode int `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data json.RawMessage `json:"data"`
}
type StressTest struct {
Conf *conf
AdminUserID string
AdminToken string
DefaultGroupID string
DefaultUserID string
UserCounter int
CreateUserCounter int
Create100kGroupCounter int
Create999GroupCounter int
MsgCounter int
CreatedUsers []string
CreatedGroups []string
Mutex sync.Mutex
Ctx context.Context
Cancel context.CancelFunc
HttpClient *http.Client
Wg sync.WaitGroup
Once sync.Once
}
type conf struct {
Share config.Share
Api config.API
}
func initConfig(configDir string) (*config.Share, *config.API, error) {
var (
share = &config.Share{}
apiConfig = &config.API{}
)
err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share)
if err != nil {
return nil, nil, err
}
err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig)
if err != nil {
return nil, nil, err
}
return share, apiConfig, nil
}
// Post Request
func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) {
// Marshal body
jsonBody, err := json.Marshal(reqbody)
if err != nil {
log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody)
return nil, err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("operationID", st.AdminUserID)
if st.AdminToken != "" {
req.Header.Set("token", st.AdminToken)
}
// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken)
resp, err := st.HttpClient.Do(req)
if err != nil {
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody)
return nil, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
log.ZError(ctx, "Failed to read response body", err, "url", url)
return nil, err
}
var baseResp BaseResp
if err := json.Unmarshal(respBody, &baseResp); err != nil {
log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody))
return nil, err
}
if baseResp.ErrCode != 0 {
err = fmt.Errorf(baseResp.ErrMsg)
// log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp)
return nil, err
}
return baseResp.Data, nil
}
func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) {
req := auth.GetAdminTokenReq{
Secret: st.Conf.Share.Secret,
UserID: st.AdminUserID,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req)
if err != nil {
return "", err
}
data := &auth.GetAdminTokenResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return "", err
}
return data.Token, nil
}
func (st *StressTest) CheckUser(ctx context.Context, userIDs []string) ([]string, error) {
req := pbuser.AccountCheckReq{
CheckUserIDs: userIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+UserCheck, &req)
if err != nil {
return nil, err
}
data := &pbuser.AccountCheckResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return nil, err
}
unRegisteredUserIDs := make([]string, 0)
for _, res := range data.Results {
if res.AccountStatus == constant.UnRegistered {
unRegisteredUserIDs = append(unRegisteredUserIDs, res.UserID)
}
}
return unRegisteredUserIDs, nil
}
func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
req := pbuser.UserRegisterReq{
Users: []*sdkws.UserInfo{user},
}
_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return "", err
}
st.UserCounter++
return userID, nil
}
func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) error {
// The method can import a large number of users at once.
var userList []*sdkws.UserInfo
defer st.Once.Do(
func() {
st.DefaultUserID = userIDs[0]
fmt.Println("Default Send User Created ID:", st.DefaultUserID)
})
needUserIDs, err := st.CheckUser(ctx, userIDs)
if err != nil {
return err
}
for _, userID := range needUserIDs {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
userList = append(userList, user)
}
req := pbuser.UserRegisterReq{
Users: userList,
}
_, err = st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return err
}
st.UserCounter += len(userList)
return nil
}
func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) {
needInviteUserIDs := make([]string, 0)
const maxBatchSize = 500
if len(userIDs) > maxBatchSize {
for i := 0; i < len(userIDs); i += maxBatchSize {
end := min(i+maxBatchSize, len(userIDs))
batchUserIDs := userIDs[i:end]
// log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1,
// "batchUserCount", len(batchUserIDs))
// Process a single batch
batchReq := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: batchUserIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq)
if err != nil {
log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1)
continue
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1)
continue
}
// Process the batch results
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range batchUserIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
}
return needInviteUserIDs, nil
}
req := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: userIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req)
if err != nil {
return nil, err
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return nil, err
}
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range userIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
return needInviteUserIDs, nil
}
func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs []string) error {
req := group.InviteUserToGroupReq{
GroupID: groupID,
InvitedUserIDs: userIDs,
}
_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error {
contentObj := map[string]any{
// "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")),
"content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")),
}
req := &apistruct.SendMsgReq{
SendMsg: apistruct.SendMsg{
SendID: userID,
SenderNickname: userID,
GroupID: groupID,
ContentType: constant.Text,
SessionType: constant.ReadGroupChatType,
Content: contentObj,
},
}
_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req)
if err != nil {
log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req)
return err
}
st.MsgCounter++
return nil
}
// Max userIDs number is 1000
func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) {
groupInfo := &sdkws.GroupInfo{
GroupID: groupID,
GroupName: groupID,
GroupType: constant.WorkingGroup,
}
req := group.CreateGroupReq{
OwnerUserID: userID,
MemberUserIDs: userIDsList,
GroupInfo: groupInfo,
}
resp := group.CreateGroupResp{}
response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req)
if err != nil {
return "", err
}
if err := json.Unmarshal(response, &resp); err != nil {
return "", err
}
// st.GroupCounter++
return resp.GroupInfo.GroupID, nil
}
func main() {
var configPath string
// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config")
// flag.StringVar(&configPath, "c", defaultConfigDir, "config path")
flag.StringVar(&configPath, "c", "", "config path")
flag.Parse()
if configPath == "" {
_, _ = fmt.Fprintln(os.Stderr, "config path is empty")
os.Exit(1)
return
}
fmt.Printf(" Config Path: %s\n", configPath)
share, apiConfig, err := initConfig(configPath)
if err != nil {
program.ExitWithError(err)
return
}
ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0]))
ctx, cancel := context.WithCancel(context.Background())
// ch := make(chan struct{})
st := &StressTest{
Conf: &conf{
Share: *share,
Api: *apiConfig,
},
AdminUserID: share.IMAdminUserID[0],
Ctx: ctx,
Cancel: cancel,
HttpClient: &http.Client{
Timeout: 50 * time.Second,
},
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\nReceived stop signal, stopping...")
go func() {
// time.Sleep(5 * time.Second)
fmt.Println("Force exit")
os.Exit(0)
}()
st.Cancel()
}()
token, err := st.GetAdminToken(st.Ctx)
if err != nil {
log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID)
}
st.AdminToken = token
fmt.Println("Admin Token:", st.AdminToken)
fmt.Println("ApiAddress:", ApiAddress)
for i := range MaxUser {
userID := fmt.Sprintf("v2_StressTest_User_%d", i)
st.CreatedUsers = append(st.CreatedUsers, userID)
st.CreateUserCounter++
}
// err = st.CreateUserBatch(st.Ctx, st.CreatedUsers)
// if err != nil {
// log.ZError(ctx, "Create user failed.", err)
// }
const batchSize = 1000
totalUsers := len(st.CreatedUsers)
successCount := 0
if st.DefaultUserID == "" && len(st.CreatedUsers) > 0 {
st.DefaultUserID = st.CreatedUsers[0]
}
for i := 0; i < totalUsers; i += batchSize {
end := min(i+batchSize, totalUsers)
userBatch := st.CreatedUsers[i:end]
log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch))
err = st.CreateUserBatch(st.Ctx, userBatch)
if err != nil {
log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1)
} else {
successCount += len(userBatch)
log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1,
"progress", fmt.Sprintf("%d/%d", successCount, totalUsers))
}
}
// Execute create 100k group
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
create100kGroupTicker := time.NewTicker(Create100KGroupTicker)
defer create100kGroupTicker.Stop()
for i := range Max100KGroup {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 100K Group")
return
case <-create100kGroupTicker.C:
// Create 100K groups
st.Wg.Add(1)
go func(idx int) {
startTime := time.Now()
defer func() {
elapsedTime := time.Since(startTime)
log.ZInfo(st.Ctx, "100K group creation completed",
"groupID", fmt.Sprintf("v2_StressTest_Group_100K_%d", idx),
"index", idx,
"duration", elapsedTime.String())
}()
defer st.Wg.Done()
defer func() {
st.Mutex.Lock()
st.Create100kGroupCounter++
st.Mutex.Unlock()
}()
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i <= MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
// log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
if err != nil {
log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
continue
}
if len(InviteUserIDs) == 0 {
// log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
// create 999 groups
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
create999GroupTicker := time.NewTicker(Create999GroupTicker)
defer create999GroupTicker.Stop()
for i := range Max999Group {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 999 Group")
return
case <-create999GroupTicker.C:
// Create 999 groups
st.Wg.Add(1)
go func(idx int) {
startTime := time.Now()
defer func() {
elapsedTime := time.Since(startTime)
log.ZInfo(st.Ctx, "999 group creation completed",
"groupID", fmt.Sprintf("v2_StressTest_Group_1K_%d", idx),
"index", idx,
"duration", elapsedTime.String())
}()
defer st.Wg.Done()
defer func() {
st.Mutex.Lock()
st.Create999GroupCounter++
st.Mutex.Unlock()
}()
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i <= Max1kUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, Max1kUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
// log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
if err != nil {
log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
continue
}
if len(InviteUserIDs) == 0 {
// log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
// Send message to 100K groups
st.Wg.Wait()
fmt.Println("All groups created successfully, starting to send messages...")
log.ZInfo(ctx, "All groups created successfully, starting to send messages...")
var groups100K []string
var groups999 []string
for i := range Max100KGroup {
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i)
groups100K = append(groups100K, groupID)
}
for i := range Max999Group {
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i)
groups999 = append(groups999, groupID)
}
send100kGroupLimiter := make(chan struct{}, 20)
send999GroupLimiter := make(chan struct{}, 100)
// execute Send message to 100K groups
go func() {
ticker := time.NewTicker(SendMsgTo100KGroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 100K Group")
return
case <-ticker.C:
// Send message to 100K groups
for _, groupID := range groups100K {
send100kGroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send100kGroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 100K group failed.", err)
}
}(groupID)
}
// log.ZInfo(st.Ctx, "Send message to 100K groups successfully.")
}
}
}()
// execute Send message to 999 groups
go func() {
ticker := time.NewTicker(SendMsgTo999GroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 999 Group")
return
case <-ticker.C:
// Send message to 999 groups
for _, groupID := range groups999 {
send999GroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send999GroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 999 group failed.", err)
}
}(groupID)
}
// log.ZInfo(st.Ctx, "Send message to 999 groups successfully.")
}
}
}()
<-st.Ctx.Done()
fmt.Println("Received signal to exit, shutting down...")
}

View File

@ -0,0 +1,19 @@
# Stress Test
## Usage
You need set `TestTargetUserList` and `DefaultGroupID` variables.
### Build
```bash
go build -o test/stress-test/stress-test test/stress-test/main.go
```
### Excute
```bash
tools/stress-test/stress-test -c config/
```

458
test/stress-test/main.go Executable file
View File

@ -0,0 +1,458 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
)
/*
1. Create one user every minute
2. Import target users as friends
3. Add users to the default group
4. Send a message to the default group every second, containing index and current timestamp
5. Create a new group every minute and invite target users to join
*/
// !!! ATTENTION: This variable is must be added!
var (
// Use default userIDs List for testing, need to be created.
TestTargetUserList = []string{
"<need-update-it>",
}
DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created.
)
var (
ApiAddress string
// API method
GetAdminToken = "/auth/get_admin_token"
CreateUser = "/user/user_register"
ImportFriend = "/friend/import_friend"
InviteToGroup = "/group/invite_user_to_group"
SendMsg = "/msg/send_msg"
CreateGroup = "/group/create_group"
GetUserToken = "/auth/user_token"
)
const (
MaxUser = 10000
MaxGroup = 1000
CreateUserTicker = 1 * time.Minute // Ticker is 1min in create user
SendMessageTicker = 1 * time.Second // Ticker is 1s in send message
CreateGroupTicker = 1 * time.Minute
)
type BaseResp struct {
ErrCode int `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data json.RawMessage `json:"data"`
}
type StressTest struct {
Conf *conf
AdminUserID string
AdminToken string
DefaultGroupID string
DefaultUserID string
UserCounter int
GroupCounter int
MsgCounter int
CreatedUsers []string
CreatedGroups []string
Mutex sync.Mutex
Ctx context.Context
Cancel context.CancelFunc
HttpClient *http.Client
Wg sync.WaitGroup
Once sync.Once
}
type conf struct {
Share config.Share
Api config.API
}
func initConfig(configDir string) (*config.Share, *config.API, error) {
var (
share = &config.Share{}
apiConfig = &config.API{}
)
err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share)
if err != nil {
return nil, nil, err
}
err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig)
if err != nil {
return nil, nil, err
}
return share, apiConfig, nil
}
// Post Request
func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) {
// Marshal body
jsonBody, err := json.Marshal(reqbody)
if err != nil {
log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody)
return nil, err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("operationID", st.AdminUserID)
if st.AdminToken != "" {
req.Header.Set("token", st.AdminToken)
}
// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken)
resp, err := st.HttpClient.Do(req)
if err != nil {
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody)
return nil, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
log.ZError(ctx, "Failed to read response body", err, "url", url)
return nil, err
}
var baseResp BaseResp
if err := json.Unmarshal(respBody, &baseResp); err != nil {
log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody))
return nil, err
}
if baseResp.ErrCode != 0 {
err = fmt.Errorf(baseResp.ErrMsg)
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp)
return nil, err
}
return baseResp.Data, nil
}
func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) {
req := auth.GetAdminTokenReq{
Secret: st.Conf.Share.Secret,
UserID: st.AdminUserID,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req)
if err != nil {
return "", err
}
data := &auth.GetAdminTokenResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return "", err
}
return data.Token, nil
}
func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
req := pbuser.UserRegisterReq{
Users: []*sdkws.UserInfo{user},
}
_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return "", err
}
st.UserCounter++
return userID, nil
}
func (st *StressTest) ImportFriend(ctx context.Context, userID string) error {
req := relation.ImportFriendReq{
OwnerUserID: userID,
FriendUserIDs: TestTargetUserList,
}
_, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error {
req := group.InviteUserToGroupReq{
GroupID: st.DefaultGroupID,
InvitedUserIDs: []string{userID},
}
_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) SendMsg(ctx context.Context, userID string) error {
contentObj := map[string]any{
"content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")),
}
req := &apistruct.SendMsgReq{
SendMsg: apistruct.SendMsg{
SendID: userID,
SenderNickname: userID,
GroupID: st.DefaultGroupID,
ContentType: constant.Text,
SessionType: constant.ReadGroupChatType,
Content: contentObj,
},
}
_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req)
if err != nil {
log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req)
return err
}
st.MsgCounter++
return nil
}
func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) {
groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405"))
groupInfo := &sdkws.GroupInfo{
GroupID: groupID,
GroupName: groupID,
GroupType: constant.WorkingGroup,
}
req := group.CreateGroupReq{
OwnerUserID: userID,
MemberUserIDs: TestTargetUserList,
GroupInfo: groupInfo,
}
resp := group.CreateGroupResp{}
response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req)
if err != nil {
return "", err
}
if err := json.Unmarshal(response, &resp); err != nil {
return "", err
}
st.GroupCounter++
return resp.GroupInfo.GroupID, nil
}
func main() {
var configPath string
// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config")
// flag.StringVar(&configPath, "c", defaultConfigDir, "config path")
flag.StringVar(&configPath, "c", "", "config path")
flag.Parse()
if configPath == "" {
_, _ = fmt.Fprintln(os.Stderr, "config path is empty")
os.Exit(1)
return
}
fmt.Printf(" Config Path: %s\n", configPath)
share, apiConfig, err := initConfig(configPath)
if err != nil {
program.ExitWithError(err)
return
}
ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0]))
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
defer cancel()
st := &StressTest{
Conf: &conf{
Share: *share,
Api: *apiConfig,
},
AdminUserID: share.IMAdminUserID[0],
Ctx: ctx,
Cancel: cancel,
HttpClient: &http.Client{
Timeout: 50 * time.Second,
},
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\nReceived stop signal, stopping...")
select {
case <-ch:
default:
close(ch)
}
st.Cancel()
}()
token, err := st.GetAdminToken(st.Ctx)
if err != nil {
log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID)
}
st.AdminToken = token
fmt.Println("Admin Token:", st.AdminToken)
fmt.Println("ApiAddress:", ApiAddress)
st.DefaultGroupID = DefaultGroupID
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
ticker := time.NewTicker(CreateUserTicker)
defer ticker.Stop()
for st.UserCounter < MaxUser {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done")
return
case <-ticker.C:
// Create User
userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405"))
userCreatedID, err := st.CreateUser(st.Ctx, userID)
if err != nil {
log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID)
os.Exit(1)
return
}
// fmt.Println("User Created ID:", userCreatedID)
// Import Friend
if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil {
log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID)
os.Exit(1)
return
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID)
os.Exit(1)
return
}
st.Once.Do(func() {
st.DefaultUserID = userCreatedID
fmt.Println("Default Send User Created ID:", userCreatedID)
close(ch)
})
}
}
}()
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
ticker := time.NewTicker(SendMessageTicker)
defer ticker.Stop()
<-ch
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done")
return
case <-ticker.C:
// Send Message
if err = st.SendMsg(st.Ctx, st.DefaultUserID); err != nil {
log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultUserID)
continue
}
}
}
}()
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
ticker := time.NewTicker(CreateGroupTicker)
defer ticker.Stop()
<-ch
for st.GroupCounter < MaxGroup {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done")
return
case <-ticker.C:
// Create Group
_, err := st.CreateGroup(st.Ctx, st.DefaultUserID)
if err != nil {
log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultUserID)
os.Exit(1)
return
}
// fmt.Println("Group Created ID:", groupID)
}
}
}()
st.Wg.Wait()
}

View File

@ -4,6 +4,11 @@ import (
"context"
"errors"
"fmt"
"log"
"net/http"
"path/filepath"
"time"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
@ -19,10 +24,6 @@ import (
"github.com/openimsdk/tools/s3/oss"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/mongo"
"log"
"net/http"
"path/filepath"
"time"
)
const defaultTimeout = time.Second * 10
@ -159,7 +160,7 @@ func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Res
if err != nil {
return nil, err
}
putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour)
putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour, &s3.PutOption{ContentType: obj.ContentType})
if err != nil {
return nil, err
}
@ -176,7 +177,7 @@ func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Res
return nil, fmt.Errorf("download object failed %s", downloadResp.Status)
}
log.Printf("file size %d", obj.Size)
request, err := http.NewRequest(http.MethodPut, putURL, downloadResp.Body)
request, err := http.NewRequest(http.MethodPut, putURL.URL, downloadResp.Body)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,736 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
)
// 1. Create 100K New Users
// 2. Create 100 100K Groups
// 3. Create 1000 999 Groups
// 4. Send message to 100K Groups every second
// 5. Send message to 999 Groups every minute
var (
// Use default userIDs List for testing, need to be created.
TestTargetUserList = []string{
// "<need-update-it>",
}
// DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created.
)
var (
ApiAddress string
// API method
GetAdminToken = "/auth/get_admin_token"
UserCheck = "/user/account_check"
CreateUser = "/user/user_register"
ImportFriend = "/friend/import_friend"
InviteToGroup = "/group/invite_user_to_group"
GetGroupMemberInfo = "/group/get_group_members_info"
SendMsg = "/msg/send_msg"
CreateGroup = "/group/create_group"
GetUserToken = "/auth/user_token"
)
const (
MaxUser = 100000
Max100KGroup = 100
Max999Group = 1000
MaxInviteUserLimit = 999
CreateUserTicker = 1 * time.Second
CreateGroupTicker = 1 * time.Second
Create100KGroupTicker = 1 * time.Second
Create999GroupTicker = 1 * time.Second
SendMsgTo100KGroupTicker = 1 * time.Second
SendMsgTo999GroupTicker = 1 * time.Minute
)
type BaseResp struct {
ErrCode int `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data json.RawMessage `json:"data"`
}
type StressTest struct {
Conf *conf
AdminUserID string
AdminToken string
DefaultGroupID string
DefaultUserID string
UserCounter int
CreateUserCounter int
Create100kGroupCounter int
Create999GroupCounter int
MsgCounter int
CreatedUsers []string
CreatedGroups []string
Mutex sync.Mutex
Ctx context.Context
Cancel context.CancelFunc
HttpClient *http.Client
Wg sync.WaitGroup
Once sync.Once
}
type conf struct {
Share config.Share
Api config.API
}
func initConfig(configDir string) (*config.Share, *config.API, error) {
var (
share = &config.Share{}
apiConfig = &config.API{}
)
err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share)
if err != nil {
return nil, nil, err
}
err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig)
if err != nil {
return nil, nil, err
}
return share, apiConfig, nil
}
// Post Request
func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) {
// Marshal body
jsonBody, err := json.Marshal(reqbody)
if err != nil {
log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody)
return nil, err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("operationID", st.AdminUserID)
if st.AdminToken != "" {
req.Header.Set("token", st.AdminToken)
}
// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken)
resp, err := st.HttpClient.Do(req)
if err != nil {
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody)
return nil, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
log.ZError(ctx, "Failed to read response body", err, "url", url)
return nil, err
}
var baseResp BaseResp
if err := json.Unmarshal(respBody, &baseResp); err != nil {
log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody))
return nil, err
}
if baseResp.ErrCode != 0 {
err = fmt.Errorf(baseResp.ErrMsg)
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp)
return nil, err
}
return baseResp.Data, nil
}
func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) {
req := auth.GetAdminTokenReq{
Secret: st.Conf.Share.Secret,
UserID: st.AdminUserID,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req)
if err != nil {
return "", err
}
data := &auth.GetAdminTokenResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return "", err
}
return data.Token, nil
}
func (st *StressTest) CheckUser(ctx context.Context, userIDs []string) ([]string, error) {
req := pbuser.AccountCheckReq{
CheckUserIDs: userIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+UserCheck, &req)
if err != nil {
return nil, err
}
data := &pbuser.AccountCheckResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return nil, err
}
unRegisteredUserIDs := make([]string, 0)
for _, res := range data.Results {
if res.AccountStatus == constant.UnRegistered {
unRegisteredUserIDs = append(unRegisteredUserIDs, res.UserID)
}
}
return unRegisteredUserIDs, nil
}
func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
req := pbuser.UserRegisterReq{
Users: []*sdkws.UserInfo{user},
}
_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return "", err
}
st.UserCounter++
return userID, nil
}
func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) error {
// The method can import a large number of users at once.
var userList []*sdkws.UserInfo
defer st.Once.Do(
func() {
st.DefaultUserID = userIDs[0]
fmt.Println("Default Send User Created ID:", st.DefaultUserID)
})
needUserIDs, err := st.CheckUser(ctx, userIDs)
if err != nil {
return err
}
for _, userID := range needUserIDs {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
userList = append(userList, user)
}
req := pbuser.UserRegisterReq{
Users: userList,
}
_, err = st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return err
}
st.UserCounter += len(userList)
return nil
}
func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) {
needInviteUserIDs := make([]string, 0)
const maxBatchSize = 500
if len(userIDs) > maxBatchSize {
for i := 0; i < len(userIDs); i += maxBatchSize {
end := min(i+maxBatchSize, len(userIDs))
batchUserIDs := userIDs[i:end]
// log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1,
// "batchUserCount", len(batchUserIDs))
// Process a single batch
batchReq := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: batchUserIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq)
if err != nil {
log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1)
continue
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1)
continue
}
// Process the batch results
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range batchUserIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
}
return needInviteUserIDs, nil
}
req := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: userIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req)
if err != nil {
return nil, err
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return nil, err
}
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range userIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
return needInviteUserIDs, nil
}
func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs []string) error {
req := group.InviteUserToGroupReq{
GroupID: groupID,
InvitedUserIDs: userIDs,
}
_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error {
contentObj := map[string]any{
// "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")),
"content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")),
}
req := &apistruct.SendMsgReq{
SendMsg: apistruct.SendMsg{
SendID: userID,
SenderNickname: userID,
GroupID: groupID,
ContentType: constant.Text,
SessionType: constant.ReadGroupChatType,
Content: contentObj,
},
}
_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req)
if err != nil {
log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req)
return err
}
st.MsgCounter++
return nil
}
// Max userIDs number is 1000
func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) {
groupInfo := &sdkws.GroupInfo{
GroupID: groupID,
GroupName: groupID,
GroupType: constant.WorkingGroup,
}
req := group.CreateGroupReq{
OwnerUserID: userID,
MemberUserIDs: userIDsList,
GroupInfo: groupInfo,
}
resp := group.CreateGroupResp{}
response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req)
if err != nil {
return "", err
}
if err := json.Unmarshal(response, &resp); err != nil {
return "", err
}
// st.GroupCounter++
return resp.GroupInfo.GroupID, nil
}
func main() {
var configPath string
// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config")
// flag.StringVar(&configPath, "c", defaultConfigDir, "config path")
flag.StringVar(&configPath, "c", "", "config path")
flag.Parse()
if configPath == "" {
_, _ = fmt.Fprintln(os.Stderr, "config path is empty")
os.Exit(1)
return
}
fmt.Printf(" Config Path: %s\n", configPath)
share, apiConfig, err := initConfig(configPath)
if err != nil {
program.ExitWithError(err)
return
}
ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0]))
ctx, cancel := context.WithCancel(context.Background())
// ch := make(chan struct{})
st := &StressTest{
Conf: &conf{
Share: *share,
Api: *apiConfig,
},
AdminUserID: share.IMAdminUserID[0],
Ctx: ctx,
Cancel: cancel,
HttpClient: &http.Client{
Timeout: 50 * time.Second,
},
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\nReceived stop signal, stopping...")
go func() {
// time.Sleep(5 * time.Second)
fmt.Println("Force exit")
os.Exit(0)
}()
st.Cancel()
}()
token, err := st.GetAdminToken(st.Ctx)
if err != nil {
log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID)
}
st.AdminToken = token
fmt.Println("Admin Token:", st.AdminToken)
fmt.Println("ApiAddress:", ApiAddress)
for i := range MaxUser {
userID := fmt.Sprintf("v2_StressTest_User_%d", i)
st.CreatedUsers = append(st.CreatedUsers, userID)
st.CreateUserCounter++
}
// err = st.CreateUserBatch(st.Ctx, st.CreatedUsers)
// if err != nil {
// log.ZError(ctx, "Create user failed.", err)
// }
const batchSize = 1000
totalUsers := len(st.CreatedUsers)
successCount := 0
if st.DefaultUserID == "" && len(st.CreatedUsers) > 0 {
st.DefaultUserID = st.CreatedUsers[0]
}
for i := 0; i < totalUsers; i += batchSize {
end := min(i+batchSize, totalUsers)
userBatch := st.CreatedUsers[i:end]
log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch))
err = st.CreateUserBatch(st.Ctx, userBatch)
if err != nil {
log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1)
} else {
successCount += len(userBatch)
log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1,
"progress", fmt.Sprintf("%d/%d", successCount, totalUsers))
}
}
// Execute create 100k group
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
create100kGroupTicker := time.NewTicker(Create100KGroupTicker)
defer create100kGroupTicker.Stop()
for i := range Max100KGroup {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 100K Group")
return
case <-create100kGroupTicker.C:
// Create 100K groups
st.Wg.Add(1)
go func(idx int) {
defer st.Wg.Done()
defer func() {
st.Create100kGroupCounter++
}()
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
if err != nil {
log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
continue
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
// create 999 groups
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
create999GroupTicker := time.NewTicker(Create999GroupTicker)
defer create999GroupTicker.Stop()
for i := range Max999Group {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 999 Group")
return
case <-create999GroupTicker.C:
// Create 999 groups
st.Wg.Add(1)
go func(idx int) {
defer st.Wg.Done()
defer func() {
st.Create999GroupCounter++
}()
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
if err != nil {
log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
continue
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
// Send message to 100K groups
st.Wg.Wait()
fmt.Println("All groups created successfully, starting to send messages...")
log.ZInfo(ctx, "All groups created successfully, starting to send messages...")
var groups100K []string
var groups999 []string
for i := range Max100KGroup {
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i)
groups100K = append(groups100K, groupID)
}
for i := range Max999Group {
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i)
groups999 = append(groups999, groupID)
}
send100kGroupLimiter := make(chan struct{}, 20)
send999GroupLimiter := make(chan struct{}, 100)
// execute Send message to 100K groups
go func() {
ticker := time.NewTicker(SendMsgTo100KGroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 100K Group")
return
case <-ticker.C:
// Send message to 100K groups
for _, groupID := range groups100K {
send100kGroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send100kGroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 100K group failed.", err)
}
}(groupID)
}
// log.ZInfo(st.Ctx, "Send message to 100K groups successfully.")
}
}
}()
// execute Send message to 999 groups
go func() {
ticker := time.NewTicker(SendMsgTo999GroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 999 Group")
return
case <-ticker.C:
// Send message to 999 groups
for _, groupID := range groups999 {
send999GroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send999GroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 999 group failed.", err)
}
}(groupID)
}
// log.ZInfo(st.Ctx, "Send message to 999 groups successfully.")
}
}
}()
<-st.Ctx.Done()
fmt.Println("Received signal to exit, shutting down...")
}

View File

@ -0,0 +1,25 @@
# Stress Test
## Usage
You need set `TestTargetUserList` and `DefaultGroupID` variables.
### Build
```bash
go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go
# or
go build -o tools/stress-test/stress-test tools/stress-test/main.go
```
### Excute
```bash
_output/bin/tools/linux/amd64/stress-test -c config/
#or
tools/stress-test/stress-test -c config/
```

459
tools/stress-test/main.go Executable file
View File

@ -0,0 +1,459 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
)
/*
1. Create one user every minute
2. Import target users as friends
3. Add users to the default group
4. Send a message to the default group every second, containing index and current timestamp
5. Create a new group every minute and invite target users to join
*/
// !!! ATTENTION: This variable is must be added!
var (
// Use default userIDs List for testing, need to be created.
TestTargetUserList = []string{
"<need-update-it>",
}
DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created.
)
var (
ApiAddress string
// API method
GetAdminToken = "/auth/get_admin_token"
CreateUser = "/user/user_register"
ImportFriend = "/friend/import_friend"
InviteToGroup = "/group/invite_user_to_group"
SendMsg = "/msg/send_msg"
CreateGroup = "/group/create_group"
GetUserToken = "/auth/user_token"
)
const (
MaxUser = 10000
MaxGroup = 1000
CreateUserTicker = 1 * time.Minute // Ticker is 1min in create user
SendMessageTicker = 1 * time.Second // Ticker is 1s in send message
CreateGroupTicker = 1 * time.Minute
)
type BaseResp struct {
ErrCode int `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data json.RawMessage `json:"data"`
}
type StressTest struct {
Conf *conf
AdminUserID string
AdminToken string
DefaultGroupID string
DefaultUserID string
UserCounter int
GroupCounter int
MsgCounter int
CreatedUsers []string
CreatedGroups []string
Mutex sync.Mutex
Ctx context.Context
Cancel context.CancelFunc
HttpClient *http.Client
Wg sync.WaitGroup
Once sync.Once
}
type conf struct {
Share config.Share
Api config.API
}
func initConfig(configDir string) (*config.Share, *config.API, error) {
var (
share = &config.Share{}
apiConfig = &config.API{}
)
err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share)
if err != nil {
return nil, nil, err
}
err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig)
if err != nil {
return nil, nil, err
}
return share, apiConfig, nil
}
// Post Request
func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) {
// Marshal body
jsonBody, err := json.Marshal(reqbody)
if err != nil {
log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody)
return nil, err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("operationID", st.AdminUserID)
if st.AdminToken != "" {
req.Header.Set("token", st.AdminToken)
}
// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken)
resp, err := st.HttpClient.Do(req)
if err != nil {
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody)
return nil, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
log.ZError(ctx, "Failed to read response body", err, "url", url)
return nil, err
}
var baseResp BaseResp
if err := json.Unmarshal(respBody, &baseResp); err != nil {
log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody))
return nil, err
}
if baseResp.ErrCode != 0 {
err = fmt.Errorf(baseResp.ErrMsg)
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp)
return nil, err
}
return baseResp.Data, nil
}
func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) {
req := auth.GetAdminTokenReq{
Secret: st.Conf.Share.Secret,
UserID: st.AdminUserID,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req)
if err != nil {
return "", err
}
data := &auth.GetAdminTokenResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return "", err
}
return data.Token, nil
}
func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
req := pbuser.UserRegisterReq{
Users: []*sdkws.UserInfo{user},
}
_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return "", err
}
st.UserCounter++
return userID, nil
}
func (st *StressTest) ImportFriend(ctx context.Context, userID string) error {
req := relation.ImportFriendReq{
OwnerUserID: userID,
FriendUserIDs: TestTargetUserList,
}
_, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error {
req := group.InviteUserToGroupReq{
GroupID: st.DefaultGroupID,
InvitedUserIDs: []string{userID},
}
_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) SendMsg(ctx context.Context, userID string) error {
contentObj := map[string]any{
"content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")),
}
req := &apistruct.SendMsgReq{
SendMsg: apistruct.SendMsg{
SendID: userID,
SenderNickname: userID,
GroupID: st.DefaultGroupID,
ContentType: constant.Text,
SessionType: constant.ReadGroupChatType,
Content: contentObj,
},
}
_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req)
if err != nil {
log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req)
return err
}
st.MsgCounter++
return nil
}
func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) {
groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405"))
groupInfo := &sdkws.GroupInfo{
GroupID: groupID,
GroupName: groupID,
GroupType: constant.WorkingGroup,
}
req := group.CreateGroupReq{
OwnerUserID: userID,
MemberUserIDs: TestTargetUserList,
GroupInfo: groupInfo,
}
resp := group.CreateGroupResp{}
response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req)
if err != nil {
return "", err
}
if err := json.Unmarshal(response, &resp); err != nil {
return "", err
}
st.GroupCounter++
return resp.GroupInfo.GroupID, nil
}
func main() {
var configPath string
// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config")
// flag.StringVar(&configPath, "c", defaultConfigDir, "config path")
flag.StringVar(&configPath, "c", "", "config path")
flag.Parse()
if configPath == "" {
_, _ = fmt.Fprintln(os.Stderr, "config path is empty")
os.Exit(1)
return
}
fmt.Printf(" Config Path: %s\n", configPath)
share, apiConfig, err := initConfig(configPath)
if err != nil {
program.ExitWithError(err)
return
}
ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0]))
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
defer cancel()
st := &StressTest{
Conf: &conf{
Share: *share,
Api: *apiConfig,
},
AdminUserID: share.IMAdminUserID[0],
Ctx: ctx,
Cancel: cancel,
HttpClient: &http.Client{
Timeout: 50 * time.Second,
},
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\nReceived stop signal, stopping...")
select {
case <-ch:
default:
close(ch)
}
st.Cancel()
}()
token, err := st.GetAdminToken(st.Ctx)
if err != nil {
log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID)
}
st.AdminToken = token
fmt.Println("Admin Token:", st.AdminToken)
fmt.Println("ApiAddress:", ApiAddress)
st.DefaultGroupID = DefaultGroupID
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
ticker := time.NewTicker(CreateUserTicker)
defer ticker.Stop()
for st.UserCounter < MaxUser {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done")
return
case <-ticker.C:
// Create User
userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405"))
userCreatedID, err := st.CreateUser(st.Ctx, userID)
if err != nil {
log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID)
os.Exit(1)
return
}
// fmt.Println("User Created ID:", userCreatedID)
// Import Friend
if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil {
log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID)
os.Exit(1)
return
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID)
os.Exit(1)
return
}
st.Once.Do(func() {
st.DefaultUserID = userCreatedID
fmt.Println("Default Send User Created ID:", userCreatedID)
close(ch)
})
}
}
}()
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
ticker := time.NewTicker(SendMessageTicker)
defer ticker.Stop()
<-ch
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done")
return
case <-ticker.C:
// Send Message
if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil {
log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID)
continue
}
}
}
}()
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
ticker := time.NewTicker(CreateGroupTicker)
defer ticker.Stop()
<-ch
for st.GroupCounter < MaxGroup {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done")
return
case <-ticker.C:
// Create Group
_, err := st.CreateGroup(st.Ctx, st.DefaultUserID)
if err != nil {
log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultUserID)
os.Exit(1)
return
}
// fmt.Println("Group Created ID:", groupID)
}
}
}()
st.Wg.Wait()
}

View File

@ -1,6 +1,14 @@
package version
import _ "embed"
import (
_ "embed"
"strings"
)
//go:embed version
var Version string
func init() {
Version = strings.Trim(Version, "\n")
Version = strings.TrimSpace(Version)
}