diff --git a/config/share.yml b/config/share.yml index a5fbeac75..0913c1e88 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,9 +1,9 @@ secret: openIM123 -imAdminUserID: [ imAdmin ] +imAdminUserID: [imAdmin] # 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time multiLogin: policy: 1 # max num of tokens in one end - maxNumOneEnd: 30 \ No newline at end of file + maxNumOneEnd: 30 diff --git a/tools/stress-test/README.md b/tools/stress-test/README.md new file mode 100644 index 000000000..531233a20 --- /dev/null +++ b/tools/stress-test/README.md @@ -0,0 +1,25 @@ +# Stress Test + +## Usage + +You need set `TestTargetUserList` and `DefaultGroupID` variables. + +### Build + +```bash +go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go + +# or + +go build -o tools/stress-test/stress-test tools/stress-test/main.go +``` + +### Excute + +```bash +_output/bin/tools/linux/amd64/stress-test -c config/ + +#or + +tools/stress-test/stress-test -c config/ +``` diff --git a/tools/stress-test/main.go b/tools/stress-test/main.go new file mode 100755 index 000000000..ee58c9749 --- /dev/null +++ b/tools/stress-test/main.go @@ -0,0 +1,452 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/relation" + "github.com/openimsdk/protocol/sdkws" + pbuser "github.com/openimsdk/protocol/user" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/system/program" +) + +/* + 1. Create one user every minute + 2. Import target users as friends + 3. Add users to the default group + 4. Send a message to the default group every second, containing index and current timestamp + 5. Create a new group every minute and invite target users to join +*/ + +// !!! ATTENTION: This variable is must be added! +var ( + // Use default userIDs List for testing, need to be created. + TestTargetUserList = []string{ + "", + } + DefaultGroupID = "" // Use default group ID for testing, need to be created. +) + +var ( + ApiAddress string + + // API method + GetAdminToken = "/auth/get_admin_token" + CreateUser = "/user/user_register" + ImportFriend = "/friend/import_friend" + InviteToGroup = "/group/invite_user_to_group" + SendMsg = "/msg/send_msg" + CreateGroup = "/group/create_group" + GetUserToken = "/auth/user_token" +) + +const ( + MaxUser = 10000 + MaxGroup = 1000 + + CreateUserTicker = 1 * time.Minute // Ticker is 1min in create user + SendMessageTicker = 1 * time.Second // Ticker is 1s in send message + CreateGroupTicker = 1 * time.Minute +) + +type BaseResp struct { + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + Data json.RawMessage `json:"data"` +} + +type StressTest struct { + Conf *conf + AdminUserID string + AdminToken string + DefaultGroupID string + DefaultSendUserID string + UserCounter int + GroupCounter int + MsgCounter int + CreatedUsers []string + CreatedGroups []string + Mutex sync.Mutex + Ctx context.Context + Cancel context.CancelFunc + HttpClient *http.Client + Wg sync.WaitGroup + Once sync.Once +} + +type conf struct { + Share config.Share + Api config.API +} + +func initConfig(configDir string) (*config.Share, *config.API, error) { + var ( + share = &config.Share{} + apiConfig = &config.API{} + ) + + err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) + if err != nil { + return nil, nil, err + } + + err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) + if err != nil { + return nil, nil, err + } + + return share, apiConfig, nil +} + +// Post Request +func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { + // Marshal body + jsonBody, err := json.Marshal(reqbody) + if err != nil { + log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("operationID", st.AdminUserID) + if st.AdminToken != "" { + req.Header.Set("token", st.AdminToken) + } + + // log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) + + resp, err := st.HttpClient.Do(req) + if err != nil { + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) + return nil, err + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + log.ZError(ctx, "Failed to read response body", err, "url", url) + return nil, err + } + + var baseResp BaseResp + if err := json.Unmarshal(respBody, &baseResp); err != nil { + log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) + return nil, err + } + + if baseResp.ErrCode != 0 { + err = fmt.Errorf(baseResp.ErrMsg) + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) + return nil, err + } + + return baseResp.Data, nil +} + +func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { + req := auth.GetAdminTokenReq{ + Secret: st.Conf.Share.Secret, + UserID: st.AdminUserID, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) + if err != nil { + return "", err + } + + data := &auth.GetAdminTokenResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return "", err + } + + return data.Token, nil +} + +func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { + user := &sdkws.UserInfo{ + UserID: userID, + Nickname: userID, + } + + req := pbuser.UserRegisterReq{ + Users: []*sdkws.UserInfo{user}, + } + + _, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) + if err != nil { + return "", err + } + + st.UserCounter++ + return userID, nil +} + +func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { + req := relation.ImportFriendReq{ + OwnerUserID: userID, + FriendUserIDs: TestTargetUserList, + } + + _, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) + if err != nil { + return err + } + + return nil +} + +func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { + req := group.InviteUserToGroupReq{ + GroupID: st.DefaultGroupID, + InvitedUserIDs: []string{userID}, + } + _, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) + if err != nil { + return err + } + + return nil +} + +func (st *StressTest) SendMsg(ctx context.Context, userID string) error { + contentObj := map[string]any{ + "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), + } + + req := map[string]any{ + "sendID": userID, + "groupID": st.DefaultGroupID, + "contentType": constant.Text, + "sessionType": constant.ReadGroupChatType, + "content": contentObj, + } + + _, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) + if err != nil { + log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) + return err + } + + st.MsgCounter++ + + return nil +} + +func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { + groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) + + req := map[string]any{ + "memberUserIDs": TestTargetUserList, + "ownerUserID": userID, + "groupInfo": map[string]any{ + "groupID": groupID, + "groupName": groupID, + "groupType": constant.WorkingGroup, + }, + } + resp := group.CreateGroupResp{} + + response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) + if err != nil { + return "", err + } + + if err := json.Unmarshal(response, &resp); err != nil { + return "", err + } + + st.GroupCounter++ + + return resp.GroupInfo.GroupID, nil +} + +func main() { + var configPath string + // defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") + // flag.StringVar(&configPath, "c", defaultConfigDir, "config path") + flag.StringVar(&configPath, "c", "", "config path") + flag.Parse() + + if configPath == "" { + _, _ = fmt.Fprintln(os.Stderr, "config path is empty") + os.Exit(1) + return + } + + fmt.Printf(" Config Path: %s\n", configPath) + + share, apiConfig, err := initConfig(configPath) + if err != nil { + program.ExitWithError(err) + return + } + + ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + + defer cancel() + + st := &StressTest{ + Conf: &conf{ + Share: *share, + Api: *apiConfig, + }, + AdminUserID: share.IMAdminUserID[0], + Ctx: ctx, + Cancel: cancel, + HttpClient: &http.Client{ + Timeout: 50 * time.Second, + }, + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nReceived stop signal, stopping...") + + select { + case <-ch: + default: + close(ch) + } + + st.Cancel() + }() + + token, err := st.GetAdminToken(st.Ctx) + if err != nil { + log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) + } + + st.AdminToken = token + fmt.Println("Admin Token:", st.AdminToken) + fmt.Println("ApiAddress:", ApiAddress) + + st.DefaultGroupID = DefaultGroupID + + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + ticker := time.NewTicker(CreateUserTicker) + defer ticker.Stop() + + for st.UserCounter < MaxUser { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") + return + + case <-ticker.C: + // Create User + userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) + + userCreatedID, err := st.CreateUser(st.Ctx, userID) + if err != nil { + log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) + os.Exit(1) + return + } + // fmt.Println("User Created ID:", userCreatedID) + + // Import Friend + if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { + log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) + os.Exit(1) + return + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) + os.Exit(1) + return + } + + st.Once.Do(func() { + st.DefaultSendUserID = userCreatedID + fmt.Println("Default Send User Created ID:", userCreatedID) + close(ch) + }) + } + } + }() + + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + ticker := time.NewTicker(SendMessageTicker) + defer ticker.Stop() + <-ch + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") + return + + case <-ticker.C: + // Send Message + if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { + log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) + continue + } + } + } + }() + + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + ticker := time.NewTicker(CreateGroupTicker) + defer ticker.Stop() + <-ch + + for st.GroupCounter < MaxGroup { + + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") + return + + case <-ticker.C: + + // Create Group + _, err := st.CreateGroup(st.Ctx, st.DefaultSendUserID) + if err != nil { + log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultSendUserID) + os.Exit(1) + return + } + + // fmt.Println("Group Created ID:", groupID) + } + } + }() + + st.Wg.Wait() +}