mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 12:42:12 +08:00 
			
		
		
		
	feat: Implement stress test v2. (#3292)
* feat: improve stress test code. * feat: Implement stress test v2.
This commit is contained in:
		
							parent
							
								
									c29e2a9a28
								
							
						
					
					
						commit
						338600c3e0
					
				
							
								
								
									
										736
									
								
								tools/stress-test-v2/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										736
									
								
								tools/stress-test-v2/main.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,736 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"flag" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"sync" | ||||||
|  | 	"syscall" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
|  | 	"github.com/openimsdk/protocol/auth" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | 	"github.com/openimsdk/protocol/group" | ||||||
|  | 	"github.com/openimsdk/protocol/sdkws" | ||||||
|  | 	pbuser "github.com/openimsdk/protocol/user" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
|  | 	"github.com/openimsdk/tools/system/program" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // 1. Create 100K New Users | ||||||
|  | // 2. Create 100 100K Groups | ||||||
|  | // 3. Create 1000 999 Groups | ||||||
|  | // 4. Send message to 100K Groups every second | ||||||
|  | // 5. Send message to 999 Groups every minute | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	//  Use default userIDs List for testing, need to be created. | ||||||
|  | 	TestTargetUserList = []string{ | ||||||
|  | 		// "<need-update-it>", | ||||||
|  | 	} | ||||||
|  | 	// DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created. | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	ApiAddress string | ||||||
|  | 
 | ||||||
|  | 	// API method | ||||||
|  | 	GetAdminToken      = "/auth/get_admin_token" | ||||||
|  | 	UserCheck          = "/user/account_check" | ||||||
|  | 	CreateUser         = "/user/user_register" | ||||||
|  | 	ImportFriend       = "/friend/import_friend" | ||||||
|  | 	InviteToGroup      = "/group/invite_user_to_group" | ||||||
|  | 	GetGroupMemberInfo = "/group/get_group_members_info" | ||||||
|  | 	SendMsg            = "/msg/send_msg" | ||||||
|  | 	CreateGroup        = "/group/create_group" | ||||||
|  | 	GetUserToken       = "/auth/user_token" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	MaxUser            = 100000 | ||||||
|  | 	Max100KGroup       = 100 | ||||||
|  | 	Max999Group        = 1000 | ||||||
|  | 	MaxInviteUserLimit = 999 | ||||||
|  | 
 | ||||||
|  | 	CreateUserTicker         = 1 * time.Second | ||||||
|  | 	CreateGroupTicker        = 1 * time.Second | ||||||
|  | 	Create100KGroupTicker    = 1 * time.Second | ||||||
|  | 	Create999GroupTicker     = 1 * time.Second | ||||||
|  | 	SendMsgTo100KGroupTicker = 1 * time.Second | ||||||
|  | 	SendMsgTo999GroupTicker  = 1 * time.Minute | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type BaseResp struct { | ||||||
|  | 	ErrCode int             `json:"errCode"` | ||||||
|  | 	ErrMsg  string          `json:"errMsg"` | ||||||
|  | 	Data    json.RawMessage `json:"data"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type StressTest struct { | ||||||
|  | 	Conf                   *conf | ||||||
|  | 	AdminUserID            string | ||||||
|  | 	AdminToken             string | ||||||
|  | 	DefaultGroupID         string | ||||||
|  | 	DefaultUserID          string | ||||||
|  | 	UserCounter            int | ||||||
|  | 	CreateUserCounter      int | ||||||
|  | 	Create100kGroupCounter int | ||||||
|  | 	Create999GroupCounter  int | ||||||
|  | 	MsgCounter             int | ||||||
|  | 	CreatedUsers           []string | ||||||
|  | 	CreatedGroups          []string | ||||||
|  | 	Mutex                  sync.Mutex | ||||||
|  | 	Ctx                    context.Context | ||||||
|  | 	Cancel                 context.CancelFunc | ||||||
|  | 	HttpClient             *http.Client | ||||||
|  | 	Wg                     sync.WaitGroup | ||||||
|  | 	Once                   sync.Once | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type conf struct { | ||||||
|  | 	Share config.Share | ||||||
|  | 	Api   config.API | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func initConfig(configDir string) (*config.Share, *config.API, error) { | ||||||
|  | 	var ( | ||||||
|  | 		share     = &config.Share{} | ||||||
|  | 		apiConfig = &config.API{} | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return share, apiConfig, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Post Request | ||||||
|  | func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { | ||||||
|  | 	// Marshal body | ||||||
|  | 	jsonBody, err := json.Marshal(reqbody) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	req.Header.Set("Content-Type", "application/json") | ||||||
|  | 	req.Header.Set("operationID", st.AdminUserID) | ||||||
|  | 	if st.AdminToken != "" { | ||||||
|  | 		req.Header.Set("token", st.AdminToken) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) | ||||||
|  | 
 | ||||||
|  | 	resp, err := st.HttpClient.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	defer resp.Body.Close() | ||||||
|  | 
 | ||||||
|  | 	respBody, err := io.ReadAll(resp.Body) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to read response body", err, "url", url) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var baseResp BaseResp | ||||||
|  | 	if err := json.Unmarshal(respBody, &baseResp); err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if baseResp.ErrCode != 0 { | ||||||
|  | 		err = fmt.Errorf(baseResp.ErrMsg) | ||||||
|  | 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return baseResp.Data, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { | ||||||
|  | 	req := auth.GetAdminTokenReq{ | ||||||
|  | 		Secret: st.Conf.Share.Secret, | ||||||
|  | 		UserID: st.AdminUserID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	data := &auth.GetAdminTokenResp{} | ||||||
|  | 	if err := json.Unmarshal(resp, &data); err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return data.Token, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) CheckUser(ctx context.Context, userIDs []string) ([]string, error) { | ||||||
|  | 	req := pbuser.AccountCheckReq{ | ||||||
|  | 		CheckUserIDs: userIDs, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	resp, err := st.PostRequest(ctx, ApiAddress+UserCheck, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	data := &pbuser.AccountCheckResp{} | ||||||
|  | 	if err := json.Unmarshal(resp, &data); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	unRegisteredUserIDs := make([]string, 0) | ||||||
|  | 
 | ||||||
|  | 	for _, res := range data.Results { | ||||||
|  | 		if res.AccountStatus == constant.UnRegistered { | ||||||
|  | 			unRegisteredUserIDs = append(unRegisteredUserIDs, res.UserID) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return unRegisteredUserIDs, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { | ||||||
|  | 	user := &sdkws.UserInfo{ | ||||||
|  | 		UserID:   userID, | ||||||
|  | 		Nickname: userID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := pbuser.UserRegisterReq{ | ||||||
|  | 		Users: []*sdkws.UserInfo{user}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.UserCounter++ | ||||||
|  | 	return userID, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) error { | ||||||
|  | 	// The method can import a large number of users at once. | ||||||
|  | 	var userList []*sdkws.UserInfo | ||||||
|  | 
 | ||||||
|  | 	defer st.Once.Do( | ||||||
|  | 		func() { | ||||||
|  | 			st.DefaultUserID = userIDs[0] | ||||||
|  | 			fmt.Println("Default Send User Created ID:", st.DefaultUserID) | ||||||
|  | 		}) | ||||||
|  | 
 | ||||||
|  | 	needUserIDs, err := st.CheckUser(ctx, userIDs) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, userID := range needUserIDs { | ||||||
|  | 		user := &sdkws.UserInfo{ | ||||||
|  | 			UserID:   userID, | ||||||
|  | 			Nickname: userID, | ||||||
|  | 		} | ||||||
|  | 		userList = append(userList, user) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := pbuser.UserRegisterReq{ | ||||||
|  | 		Users: userList, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	_, err = st.PostRequest(ctx, ApiAddress+CreateUser, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.UserCounter += len(userList) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) { | ||||||
|  | 	needInviteUserIDs := make([]string, 0) | ||||||
|  | 
 | ||||||
|  | 	const maxBatchSize = 500 | ||||||
|  | 	if len(userIDs) > maxBatchSize { | ||||||
|  | 		for i := 0; i < len(userIDs); i += maxBatchSize { | ||||||
|  | 			end := min(i+maxBatchSize, len(userIDs)) | ||||||
|  | 			batchUserIDs := userIDs[i:end] | ||||||
|  | 
 | ||||||
|  | 			// log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1, | ||||||
|  | 			// 	"batchUserCount", len(batchUserIDs)) | ||||||
|  | 
 | ||||||
|  | 			// Process a single batch | ||||||
|  | 			batchReq := group.GetGroupMembersInfoReq{ | ||||||
|  | 				GroupID: groupID, | ||||||
|  | 				UserIDs: batchUserIDs, | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			data := &group.GetGroupMembersInfoResp{} | ||||||
|  | 			if err := json.Unmarshal(resp, &data); err != nil { | ||||||
|  | 				log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			// Process the batch results | ||||||
|  | 			existingMembers := make(map[string]bool) | ||||||
|  | 			for _, member := range data.Members { | ||||||
|  | 				existingMembers[member.UserID] = true | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			for _, userID := range batchUserIDs { | ||||||
|  | 				if !existingMembers[userID] { | ||||||
|  | 					needInviteUserIDs = append(needInviteUserIDs, userID) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return needInviteUserIDs, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := group.GetGroupMembersInfoReq{ | ||||||
|  | 		GroupID: groupID, | ||||||
|  | 		UserIDs: userIDs, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	data := &group.GetGroupMembersInfoResp{} | ||||||
|  | 	if err := json.Unmarshal(resp, &data); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	existingMembers := make(map[string]bool) | ||||||
|  | 	for _, member := range data.Members { | ||||||
|  | 		existingMembers[member.UserID] = true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, userID := range userIDs { | ||||||
|  | 		if !existingMembers[userID] { | ||||||
|  | 			needInviteUserIDs = append(needInviteUserIDs, userID) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return needInviteUserIDs, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs []string) error { | ||||||
|  | 	req := group.InviteUserToGroupReq{ | ||||||
|  | 		GroupID:        groupID, | ||||||
|  | 		InvitedUserIDs: userIDs, | ||||||
|  | 	} | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error { | ||||||
|  | 	contentObj := map[string]any{ | ||||||
|  | 		// "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), | ||||||
|  | 		"content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := &apistruct.SendMsgReq{ | ||||||
|  | 		SendMsg: apistruct.SendMsg{ | ||||||
|  | 			SendID:         userID, | ||||||
|  | 			SenderNickname: userID, | ||||||
|  | 			GroupID:        groupID, | ||||||
|  | 			ContentType:    constant.Text, | ||||||
|  | 			SessionType:    constant.ReadGroupChatType, | ||||||
|  | 			Content:        contentObj, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.MsgCounter++ | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Max userIDs number is 1000 | ||||||
|  | func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) { | ||||||
|  | 	groupInfo := &sdkws.GroupInfo{ | ||||||
|  | 		GroupID:   groupID, | ||||||
|  | 		GroupName: groupID, | ||||||
|  | 		GroupType: constant.WorkingGroup, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := group.CreateGroupReq{ | ||||||
|  | 		OwnerUserID:   userID, | ||||||
|  | 		MemberUserIDs: userIDsList, | ||||||
|  | 		GroupInfo:     groupInfo, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	resp := group.CreateGroupResp{} | ||||||
|  | 
 | ||||||
|  | 	response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := json.Unmarshal(response, &resp); err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// st.GroupCounter++ | ||||||
|  | 
 | ||||||
|  | 	return resp.GroupInfo.GroupID, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func main() { | ||||||
|  | 	var configPath string | ||||||
|  | 	// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") | ||||||
|  | 	// flag.StringVar(&configPath, "c", defaultConfigDir, "config path") | ||||||
|  | 	flag.StringVar(&configPath, "c", "", "config path") | ||||||
|  | 	flag.Parse() | ||||||
|  | 
 | ||||||
|  | 	if configPath == "" { | ||||||
|  | 		_, _ = fmt.Fprintln(os.Stderr, "config path is empty") | ||||||
|  | 		os.Exit(1) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fmt.Printf(" Config Path: %s\n", configPath) | ||||||
|  | 
 | ||||||
|  | 	share, apiConfig, err := initConfig(configPath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		program.ExitWithError(err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) | ||||||
|  | 
 | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	// ch := make(chan struct{}) | ||||||
|  | 
 | ||||||
|  | 	st := &StressTest{ | ||||||
|  | 		Conf: &conf{ | ||||||
|  | 			Share: *share, | ||||||
|  | 			Api:   *apiConfig, | ||||||
|  | 		}, | ||||||
|  | 		AdminUserID: share.IMAdminUserID[0], | ||||||
|  | 		Ctx:         ctx, | ||||||
|  | 		Cancel:      cancel, | ||||||
|  | 		HttpClient: &http.Client{ | ||||||
|  | 			Timeout: 50 * time.Second, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c := make(chan os.Signal, 1) | ||||||
|  | 	signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||||||
|  | 	go func() { | ||||||
|  | 		<-c | ||||||
|  | 		fmt.Println("\nReceived stop signal, stopping...") | ||||||
|  | 
 | ||||||
|  | 		go func() { | ||||||
|  | 			// time.Sleep(5 * time.Second) | ||||||
|  | 			fmt.Println("Force exit") | ||||||
|  | 			os.Exit(0) | ||||||
|  | 		}() | ||||||
|  | 
 | ||||||
|  | 		st.Cancel() | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	token, err := st.GetAdminToken(st.Ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.AdminToken = token | ||||||
|  | 	fmt.Println("Admin Token:", st.AdminToken) | ||||||
|  | 	fmt.Println("ApiAddress:", ApiAddress) | ||||||
|  | 
 | ||||||
|  | 	for i := range MaxUser { | ||||||
|  | 		userID := fmt.Sprintf("v2_StressTest_User_%d", i) | ||||||
|  | 		st.CreatedUsers = append(st.CreatedUsers, userID) | ||||||
|  | 		st.CreateUserCounter++ | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// err = st.CreateUserBatch(st.Ctx, st.CreatedUsers) | ||||||
|  | 	// if err != nil { | ||||||
|  | 	// 	log.ZError(ctx, "Create user failed.", err) | ||||||
|  | 	// } | ||||||
|  | 
 | ||||||
|  | 	const batchSize = 1000 | ||||||
|  | 	totalUsers := len(st.CreatedUsers) | ||||||
|  | 	successCount := 0 | ||||||
|  | 
 | ||||||
|  | 	if st.DefaultUserID == "" && len(st.CreatedUsers) > 0 { | ||||||
|  | 		st.DefaultUserID = st.CreatedUsers[0] | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for i := 0; i < totalUsers; i += batchSize { | ||||||
|  | 		end := min(i+batchSize, totalUsers) | ||||||
|  | 
 | ||||||
|  | 		userBatch := st.CreatedUsers[i:end] | ||||||
|  | 		log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch)) | ||||||
|  | 
 | ||||||
|  | 		err = st.CreateUserBatch(st.Ctx, userBatch) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1) | ||||||
|  | 		} else { | ||||||
|  | 			successCount += len(userBatch) | ||||||
|  | 			log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1, | ||||||
|  | 				"progress", fmt.Sprintf("%d/%d", successCount, totalUsers)) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Execute create 100k group | ||||||
|  | 	st.Wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer st.Wg.Done() | ||||||
|  | 
 | ||||||
|  | 		create100kGroupTicker := time.NewTicker(Create100KGroupTicker) | ||||||
|  | 		defer create100kGroupTicker.Stop() | ||||||
|  | 
 | ||||||
|  | 		for i := range Max100KGroup { | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Create 100K Group") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-create100kGroupTicker.C: | ||||||
|  | 				// Create 100K groups | ||||||
|  | 				st.Wg.Add(1) | ||||||
|  | 				go func(idx int) { | ||||||
|  | 					defer st.Wg.Done() | ||||||
|  | 					defer func() { | ||||||
|  | 						st.Create100kGroupCounter++ | ||||||
|  | 					}() | ||||||
|  | 
 | ||||||
|  | 					groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx) | ||||||
|  | 
 | ||||||
|  | 					if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { | ||||||
|  | 						log.ZError(st.Ctx, "Create group failed.", err) | ||||||
|  | 						// continue | ||||||
|  | 					} | ||||||
|  | 
 | ||||||
|  | 					for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { | ||||||
|  | 						InviteUserIDs := make([]string, 0) | ||||||
|  | 						// ensure TargetUserList is in group | ||||||
|  | 						InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) | ||||||
|  | 
 | ||||||
|  | 						startIdx := max(i*MaxInviteUserLimit, 1) | ||||||
|  | 						endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) | ||||||
|  | 
 | ||||||
|  | 						for j := startIdx; j < endIdx; j++ { | ||||||
|  | 							userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) | ||||||
|  | 							InviteUserIDs = append(InviteUserIDs, userCreatedID) | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						if len(InviteUserIDs) == 0 { | ||||||
|  | 							log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) | ||||||
|  | 							continue | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) | ||||||
|  | 						if err != nil { | ||||||
|  | 							log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) | ||||||
|  | 							continue | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						if len(InviteUserIDs) == 0 { | ||||||
|  | 							log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) | ||||||
|  | 							continue | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						// Invite To Group | ||||||
|  | 						if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { | ||||||
|  | 							log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) | ||||||
|  | 							continue | ||||||
|  | 							// os.Exit(1) | ||||||
|  | 							// return | ||||||
|  | 						} | ||||||
|  | 					} | ||||||
|  | 				}(i) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// create 999 groups | ||||||
|  | 	st.Wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer st.Wg.Done() | ||||||
|  | 
 | ||||||
|  | 		create999GroupTicker := time.NewTicker(Create999GroupTicker) | ||||||
|  | 		defer create999GroupTicker.Stop() | ||||||
|  | 
 | ||||||
|  | 		for i := range Max999Group { | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Create 999 Group") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-create999GroupTicker.C: | ||||||
|  | 				// Create 999 groups | ||||||
|  | 				st.Wg.Add(1) | ||||||
|  | 				go func(idx int) { | ||||||
|  | 					defer st.Wg.Done() | ||||||
|  | 					defer func() { | ||||||
|  | 						st.Create999GroupCounter++ | ||||||
|  | 					}() | ||||||
|  | 
 | ||||||
|  | 					groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx) | ||||||
|  | 
 | ||||||
|  | 					if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { | ||||||
|  | 						log.ZError(st.Ctx, "Create group failed.", err) | ||||||
|  | 						// continue | ||||||
|  | 					} | ||||||
|  | 					for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { | ||||||
|  | 						InviteUserIDs := make([]string, 0) | ||||||
|  | 						// ensure TargetUserList is in group | ||||||
|  | 						InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) | ||||||
|  | 
 | ||||||
|  | 						startIdx := max(i*MaxInviteUserLimit, 1) | ||||||
|  | 						endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) | ||||||
|  | 
 | ||||||
|  | 						for j := startIdx; j < endIdx; j++ { | ||||||
|  | 							userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) | ||||||
|  | 							InviteUserIDs = append(InviteUserIDs, userCreatedID) | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						if len(InviteUserIDs) == 0 { | ||||||
|  | 							log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) | ||||||
|  | 							continue | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) | ||||||
|  | 						if err != nil { | ||||||
|  | 							log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) | ||||||
|  | 							continue | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						if len(InviteUserIDs) == 0 { | ||||||
|  | 							log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) | ||||||
|  | 							continue | ||||||
|  | 						} | ||||||
|  | 
 | ||||||
|  | 						// Invite To Group | ||||||
|  | 						if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { | ||||||
|  | 							log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) | ||||||
|  | 							continue | ||||||
|  | 							// os.Exit(1) | ||||||
|  | 							// return | ||||||
|  | 						} | ||||||
|  | 					} | ||||||
|  | 				}(i) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// Send message to 100K groups | ||||||
|  | 	st.Wg.Wait() | ||||||
|  | 	fmt.Println("All groups created successfully, starting to send messages...") | ||||||
|  | 	log.ZInfo(ctx, "All groups created successfully, starting to send messages...") | ||||||
|  | 
 | ||||||
|  | 	var groups100K []string | ||||||
|  | 	var groups999 []string | ||||||
|  | 
 | ||||||
|  | 	for i := range Max100KGroup { | ||||||
|  | 		groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i) | ||||||
|  | 		groups100K = append(groups100K, groupID) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for i := range Max999Group { | ||||||
|  | 		groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i) | ||||||
|  | 		groups999 = append(groups999, groupID) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	send100kGroupLimiter := make(chan struct{}, 20) | ||||||
|  | 	send999GroupLimiter := make(chan struct{}, 100) | ||||||
|  | 
 | ||||||
|  | 	// execute Send message to 100K groups | ||||||
|  | 	go func() { | ||||||
|  | 		ticker := time.NewTicker(SendMsgTo100KGroupTicker) | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 
 | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Send Message to 100K Group") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-ticker.C: | ||||||
|  | 				// Send message to 100K groups | ||||||
|  | 				for _, groupID := range groups100K { | ||||||
|  | 					send100kGroupLimiter <- struct{}{} | ||||||
|  | 					go func(groupID string) { | ||||||
|  | 						defer func() { <-send100kGroupLimiter }() | ||||||
|  | 						if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { | ||||||
|  | 							log.ZError(st.Ctx, "Send message to 100K group failed.", err) | ||||||
|  | 						} | ||||||
|  | 					}(groupID) | ||||||
|  | 				} | ||||||
|  | 				// log.ZInfo(st.Ctx, "Send message to 100K groups successfully.") | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// execute Send message to 999 groups | ||||||
|  | 	go func() { | ||||||
|  | 		ticker := time.NewTicker(SendMsgTo999GroupTicker) | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 
 | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Send Message to 999 Group") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-ticker.C: | ||||||
|  | 				// Send message to 999 groups | ||||||
|  | 				for _, groupID := range groups999 { | ||||||
|  | 					send999GroupLimiter <- struct{}{} | ||||||
|  | 					go func(groupID string) { | ||||||
|  | 						defer func() { <-send999GroupLimiter }() | ||||||
|  | 
 | ||||||
|  | 						if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { | ||||||
|  | 							log.ZError(st.Ctx, "Send message to 999 group failed.", err) | ||||||
|  | 						} | ||||||
|  | 					}(groupID) | ||||||
|  | 				} | ||||||
|  | 				// log.ZInfo(st.Ctx, "Send message to 999 groups successfully.") | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	<-st.Ctx.Done() | ||||||
|  | 	fmt.Println("Received signal to exit, shutting down...") | ||||||
|  | } | ||||||
							
								
								
									
										458
									
								
								tools/stress-test/main.go
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										458
									
								
								tools/stress-test/main.go
									
									
									
									
									
										Executable file
									
								
							| @ -0,0 +1,458 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"flag" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"sync" | ||||||
|  | 	"syscall" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
|  | 	"github.com/openimsdk/protocol/auth" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | 	"github.com/openimsdk/protocol/group" | ||||||
|  | 	"github.com/openimsdk/protocol/relation" | ||||||
|  | 	"github.com/openimsdk/protocol/sdkws" | ||||||
|  | 	pbuser "github.com/openimsdk/protocol/user" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
|  | 	"github.com/openimsdk/tools/system/program" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | /* | ||||||
|  |  1. Create one user every minute | ||||||
|  |  2. Import target users as friends | ||||||
|  |  3. Add users to the default group | ||||||
|  |  4. Send a message to the default group every second, containing index and current timestamp | ||||||
|  |  5. Create a new group every minute and invite target users to join | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | // !!! ATTENTION: This variable is must be added! | ||||||
|  | var ( | ||||||
|  | 	//  Use default userIDs List for testing, need to be created. | ||||||
|  | 	TestTargetUserList = []string{ | ||||||
|  | 		"<need-update-it>", | ||||||
|  | 	} | ||||||
|  | 	DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created. | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	ApiAddress string | ||||||
|  | 
 | ||||||
|  | 	// API method | ||||||
|  | 	GetAdminToken = "/auth/get_admin_token" | ||||||
|  | 	CreateUser    = "/user/user_register" | ||||||
|  | 	ImportFriend  = "/friend/import_friend" | ||||||
|  | 	InviteToGroup = "/group/invite_user_to_group" | ||||||
|  | 	SendMsg       = "/msg/send_msg" | ||||||
|  | 	CreateGroup   = "/group/create_group" | ||||||
|  | 	GetUserToken  = "/auth/user_token" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	MaxUser  = 10000 | ||||||
|  | 	MaxGroup = 1000 | ||||||
|  | 
 | ||||||
|  | 	CreateUserTicker  = 1 * time.Minute // Ticker is 1min in create user | ||||||
|  | 	SendMessageTicker = 1 * time.Second // Ticker is 1s in send message | ||||||
|  | 	CreateGroupTicker = 1 * time.Minute | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type BaseResp struct { | ||||||
|  | 	ErrCode int             `json:"errCode"` | ||||||
|  | 	ErrMsg  string          `json:"errMsg"` | ||||||
|  | 	Data    json.RawMessage `json:"data"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type StressTest struct { | ||||||
|  | 	Conf           *conf | ||||||
|  | 	AdminUserID    string | ||||||
|  | 	AdminToken     string | ||||||
|  | 	DefaultGroupID string | ||||||
|  | 	DefaultUserID  string | ||||||
|  | 	UserCounter    int | ||||||
|  | 	GroupCounter   int | ||||||
|  | 	MsgCounter     int | ||||||
|  | 	CreatedUsers   []string | ||||||
|  | 	CreatedGroups  []string | ||||||
|  | 	Mutex          sync.Mutex | ||||||
|  | 	Ctx            context.Context | ||||||
|  | 	Cancel         context.CancelFunc | ||||||
|  | 	HttpClient     *http.Client | ||||||
|  | 	Wg             sync.WaitGroup | ||||||
|  | 	Once           sync.Once | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type conf struct { | ||||||
|  | 	Share config.Share | ||||||
|  | 	Api   config.API | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func initConfig(configDir string) (*config.Share, *config.API, error) { | ||||||
|  | 	var ( | ||||||
|  | 		share     = &config.Share{} | ||||||
|  | 		apiConfig = &config.API{} | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return share, apiConfig, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Post Request | ||||||
|  | func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { | ||||||
|  | 	// Marshal body | ||||||
|  | 	jsonBody, err := json.Marshal(reqbody) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	req.Header.Set("Content-Type", "application/json") | ||||||
|  | 	req.Header.Set("operationID", st.AdminUserID) | ||||||
|  | 	if st.AdminToken != "" { | ||||||
|  | 		req.Header.Set("token", st.AdminToken) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) | ||||||
|  | 
 | ||||||
|  | 	resp, err := st.HttpClient.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	defer resp.Body.Close() | ||||||
|  | 
 | ||||||
|  | 	respBody, err := io.ReadAll(resp.Body) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to read response body", err, "url", url) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var baseResp BaseResp | ||||||
|  | 	if err := json.Unmarshal(respBody, &baseResp); err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if baseResp.ErrCode != 0 { | ||||||
|  | 		err = fmt.Errorf(baseResp.ErrMsg) | ||||||
|  | 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return baseResp.Data, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { | ||||||
|  | 	req := auth.GetAdminTokenReq{ | ||||||
|  | 		Secret: st.Conf.Share.Secret, | ||||||
|  | 		UserID: st.AdminUserID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	data := &auth.GetAdminTokenResp{} | ||||||
|  | 	if err := json.Unmarshal(resp, &data); err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return data.Token, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { | ||||||
|  | 	user := &sdkws.UserInfo{ | ||||||
|  | 		UserID:   userID, | ||||||
|  | 		Nickname: userID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := pbuser.UserRegisterReq{ | ||||||
|  | 		Users: []*sdkws.UserInfo{user}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.UserCounter++ | ||||||
|  | 	return userID, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { | ||||||
|  | 	req := relation.ImportFriendReq{ | ||||||
|  | 		OwnerUserID:   userID, | ||||||
|  | 		FriendUserIDs: TestTargetUserList, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { | ||||||
|  | 	req := group.InviteUserToGroupReq{ | ||||||
|  | 		GroupID:        st.DefaultGroupID, | ||||||
|  | 		InvitedUserIDs: []string{userID}, | ||||||
|  | 	} | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) SendMsg(ctx context.Context, userID string) error { | ||||||
|  | 	contentObj := map[string]any{ | ||||||
|  | 		"content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := &apistruct.SendMsgReq{ | ||||||
|  | 		SendMsg: apistruct.SendMsg{ | ||||||
|  | 			SendID:         userID, | ||||||
|  | 			SenderNickname: userID, | ||||||
|  | 			GroupID:        st.DefaultGroupID, | ||||||
|  | 			ContentType:    constant.Text, | ||||||
|  | 			SessionType:    constant.ReadGroupChatType, | ||||||
|  | 			Content:        contentObj, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.MsgCounter++ | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { | ||||||
|  | 	groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) | ||||||
|  | 
 | ||||||
|  | 	groupInfo := &sdkws.GroupInfo{ | ||||||
|  | 		GroupID:   groupID, | ||||||
|  | 		GroupName: groupID, | ||||||
|  | 		GroupType: constant.WorkingGroup, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req := group.CreateGroupReq{ | ||||||
|  | 		OwnerUserID:   userID, | ||||||
|  | 		MemberUserIDs: TestTargetUserList, | ||||||
|  | 		GroupInfo:     groupInfo, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	resp := group.CreateGroupResp{} | ||||||
|  | 
 | ||||||
|  | 	response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := json.Unmarshal(response, &resp); err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.GroupCounter++ | ||||||
|  | 
 | ||||||
|  | 	return resp.GroupInfo.GroupID, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func main() { | ||||||
|  | 	var configPath string | ||||||
|  | 	// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") | ||||||
|  | 	// flag.StringVar(&configPath, "c", defaultConfigDir, "config path") | ||||||
|  | 	flag.StringVar(&configPath, "c", "", "config path") | ||||||
|  | 	flag.Parse() | ||||||
|  | 
 | ||||||
|  | 	if configPath == "" { | ||||||
|  | 		_, _ = fmt.Fprintln(os.Stderr, "config path is empty") | ||||||
|  | 		os.Exit(1) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fmt.Printf(" Config Path: %s\n", configPath) | ||||||
|  | 
 | ||||||
|  | 	share, apiConfig, err := initConfig(configPath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		program.ExitWithError(err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) | ||||||
|  | 
 | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	ch := make(chan struct{}) | ||||||
|  | 
 | ||||||
|  | 	defer cancel() | ||||||
|  | 
 | ||||||
|  | 	st := &StressTest{ | ||||||
|  | 		Conf: &conf{ | ||||||
|  | 			Share: *share, | ||||||
|  | 			Api:   *apiConfig, | ||||||
|  | 		}, | ||||||
|  | 		AdminUserID: share.IMAdminUserID[0], | ||||||
|  | 		Ctx:         ctx, | ||||||
|  | 		Cancel:      cancel, | ||||||
|  | 		HttpClient: &http.Client{ | ||||||
|  | 			Timeout: 50 * time.Second, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c := make(chan os.Signal, 1) | ||||||
|  | 	signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||||||
|  | 	go func() { | ||||||
|  | 		<-c | ||||||
|  | 		fmt.Println("\nReceived stop signal, stopping...") | ||||||
|  | 
 | ||||||
|  | 		select { | ||||||
|  | 		case <-ch: | ||||||
|  | 		default: | ||||||
|  | 			close(ch) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		st.Cancel() | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	token, err := st.GetAdminToken(st.Ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	st.AdminToken = token | ||||||
|  | 	fmt.Println("Admin Token:", st.AdminToken) | ||||||
|  | 	fmt.Println("ApiAddress:", ApiAddress) | ||||||
|  | 
 | ||||||
|  | 	st.DefaultGroupID = DefaultGroupID | ||||||
|  | 
 | ||||||
|  | 	st.Wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer st.Wg.Done() | ||||||
|  | 
 | ||||||
|  | 		ticker := time.NewTicker(CreateUserTicker) | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 
 | ||||||
|  | 		for st.UserCounter < MaxUser { | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-ticker.C: | ||||||
|  | 				// Create User | ||||||
|  | 				userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) | ||||||
|  | 
 | ||||||
|  | 				userCreatedID, err := st.CreateUser(st.Ctx, userID) | ||||||
|  | 				if err != nil { | ||||||
|  | 					log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) | ||||||
|  | 					os.Exit(1) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				// fmt.Println("User Created ID:", userCreatedID) | ||||||
|  | 
 | ||||||
|  | 				// Import Friend | ||||||
|  | 				if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { | ||||||
|  | 					log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) | ||||||
|  | 					os.Exit(1) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				// Invite To Group | ||||||
|  | 				if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { | ||||||
|  | 					log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) | ||||||
|  | 					os.Exit(1) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 
 | ||||||
|  | 				st.Once.Do(func() { | ||||||
|  | 					st.DefaultUserID = userCreatedID | ||||||
|  | 					fmt.Println("Default Send User Created ID:", userCreatedID) | ||||||
|  | 					close(ch) | ||||||
|  | 				}) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	st.Wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer st.Wg.Done() | ||||||
|  | 
 | ||||||
|  | 		ticker := time.NewTicker(SendMessageTicker) | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 		<-ch | ||||||
|  | 
 | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-ticker.C: | ||||||
|  | 				// Send Message | ||||||
|  | 				if err = st.SendMsg(st.Ctx, st.DefaultUserID); err != nil { | ||||||
|  | 					log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultUserID) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	st.Wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer st.Wg.Done() | ||||||
|  | 
 | ||||||
|  | 		ticker := time.NewTicker(CreateGroupTicker) | ||||||
|  | 		defer ticker.Stop() | ||||||
|  | 		<-ch | ||||||
|  | 
 | ||||||
|  | 		for st.GroupCounter < MaxGroup { | ||||||
|  | 
 | ||||||
|  | 			select { | ||||||
|  | 			case <-st.Ctx.Done(): | ||||||
|  | 				log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") | ||||||
|  | 				return | ||||||
|  | 
 | ||||||
|  | 			case <-ticker.C: | ||||||
|  | 
 | ||||||
|  | 				// Create Group | ||||||
|  | 				_, err := st.CreateGroup(st.Ctx, st.DefaultUserID) | ||||||
|  | 				if err != nil { | ||||||
|  | 					log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultUserID) | ||||||
|  | 					os.Exit(1) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 
 | ||||||
|  | 				// fmt.Println("Group Created ID:", groupID) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	st.Wg.Wait() | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user