mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-25 20:52:11 +08:00 
			
		
		
		
	fix: merge conflicts
This commit is contained in:
		
							parent
							
								
									5079b99e97
								
							
						
					
					
						commit
						6750e40022
					
				| @ -379,7 +379,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { | ||||
| 				IsSendMsg:        req.SendMsg, | ||||
| 				ReliabilityLevel: *req.ReliabilityLevel, | ||||
| 				UnreadCount:      false, | ||||
| 			}), | ||||
| 			}, nil), | ||||
| 		}, | ||||
| 	} | ||||
| 	respPb, err := m.Client.SendMsg(c, &sendMsgReq) | ||||
| @ -524,6 +524,7 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) { | ||||
| 		apiresp.GinError(c, err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	m.ginRespSendMsg(c, sendReq, respPb) | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -12,7 +12,6 @@ import ( | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||
| 	"github.com/openimsdk/tools/mcontext" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	clientv3 "go.etcd.io/etcd/client/v3" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
|  | ||||
| @ -28,7 +28,6 @@ import ( | ||||
| 	"github.com/go-redis/redis" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
|  | ||||
| @ -19,7 +19,6 @@ import ( | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	pbmsg "github.com/openimsdk/protocol/msg" | ||||
| 	"github.com/openimsdk/tools/log" | ||||
| 	"google.golang.org/protobuf/proto" | ||||
|  | ||||
| @ -6,7 +6,6 @@ import ( | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	pbpush "github.com/openimsdk/protocol/push" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
|  | ||||
| @ -9,7 +9,6 @@ import ( | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpccache" | ||||
|  | ||||
| @ -16,21 +16,22 @@ package conversation | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/notification" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 	"github.com/openimsdk/protocol/msg" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/notification" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
| ) | ||||
| 
 | ||||
| type ConversationNotificationSender struct { | ||||
| 	*rpcclient.NotificationSender | ||||
| 	*notification.NotificationSender | ||||
| } | ||||
| 
 | ||||
| func NewConversationNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient) *ConversationNotificationSender { | ||||
| 	return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { | ||||
| 	return &ConversationNotificationSender{notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { | ||||
| 		return msgClient.SendMsg(ctx, req) | ||||
| 	}))} | ||||
| } | ||||
|  | ||||
| @ -16,6 +16,7 @@ package group | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| 
 | ||||
| 	pbgroup "github.com/openimsdk/protocol/group" | ||||
| @ -55,41 +56,52 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s | ||||
| 	return m | ||||
| } | ||||
| 
 | ||||
| func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (map[string]any, error) { | ||||
| 	m := make(map[string]any) | ||||
| func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (m map[string]any, normalFlag, groupNameFlag, notificationFlag bool, err error) { | ||||
| 	m = make(map[string]any) | ||||
| 
 | ||||
| 	if group.GroupName != nil { | ||||
| 		if group.GroupName.Value != "" { | ||||
| 		if strings.TrimSpace(group.GroupName.Value) != "" { | ||||
| 			m["group_name"] = group.GroupName.Value | ||||
| 			groupNameFlag = true | ||||
| 		} else { | ||||
| 			return nil, errs.ErrArgs.WrapMsg("group name is empty") | ||||
| 			return nil, normalFlag, notificationFlag, groupNameFlag, errs.ErrArgs.WrapMsg("group name is empty") | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if group.Notification != nil { | ||||
| 		notificationFlag = true | ||||
| 		group.Notification.Value = strings.TrimSpace(group.Notification.Value) // if Notification only contains spaces, set it to empty string | ||||
| 
 | ||||
| 		m["notification"] = group.Notification.Value | ||||
| 		m["notification_update_time"] = time.Now() | ||||
| 		m["notification_user_id"] = mcontext.GetOpUserID(ctx) | ||||
| 		m["notification_update_time"] = time.Now() | ||||
| 	} | ||||
| 	if group.Introduction != nil { | ||||
| 		m["introduction"] = group.Introduction.Value | ||||
| 		normalFlag = true | ||||
| 	} | ||||
| 	if group.FaceURL != nil { | ||||
| 		m["face_url"] = group.FaceURL.Value | ||||
| 		normalFlag = true | ||||
| 	} | ||||
| 	if group.NeedVerification != nil { | ||||
| 		m["need_verification"] = group.NeedVerification.Value | ||||
| 		normalFlag = true | ||||
| 	} | ||||
| 	if group.LookMemberInfo != nil { | ||||
| 		m["look_member_info"] = group.LookMemberInfo.Value | ||||
| 		normalFlag = true | ||||
| 	} | ||||
| 	if group.ApplyMemberFriend != nil { | ||||
| 		m["apply_member_friend"] = group.ApplyMemberFriend.Value | ||||
| 		normalFlag = true | ||||
| 	} | ||||
| 	if group.Ex != nil { | ||||
| 		m["ex"] = group.Ex.Value | ||||
| 		normalFlag = true | ||||
| 	} | ||||
| 
 | ||||
| 	return m, nil | ||||
| 	return m, normalFlag, groupNameFlag, notificationFlag, nil | ||||
| } | ||||
| 
 | ||||
| func UpdateGroupStatusMap(status int) map[string]any { | ||||
|  | ||||
| @ -26,6 +26,8 @@ import ( | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 
 | ||||
| 	"google.golang.org/grpc" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| @ -51,7 +53,6 @@ import ( | ||||
| 	"github.com/openimsdk/tools/mw/specialerror" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"github.com/openimsdk/tools/utils/encrypt" | ||||
| 	"google.golang.org/grpc" | ||||
| ) | ||||
| 
 | ||||
| type groupServer struct { | ||||
| @ -284,13 +285,14 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 	g.notification.GroupCreatedNotification(ctx, tips) | ||||
| 	g.notification.GroupCreatedNotification(ctx, tips, req.SendMessage) | ||||
| 
 | ||||
| 	if req.GroupInfo.Notification != "" { | ||||
| 		notificationFlag := true | ||||
| 		g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{ | ||||
| 			Group:  tips.Group, | ||||
| 			OpUser: tips.OpUser, | ||||
| 		}) | ||||
| 		}, ¬ificationFlag) | ||||
| 	} | ||||
| 
 | ||||
| 	reqCallBackAfter := &pbgroup.CreateGroupReq{ | ||||
| @ -613,7 +615,7 @@ func (g *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou | ||||
| 	for _, userID := range req.KickedUserIDs { | ||||
| 		tips.KickedUserList = append(tips.KickedUserList, convert.Db2PbGroupMember(memberMap[userID])) | ||||
| 	} | ||||
| 	g.notification.MemberKickedNotification(ctx, tips) | ||||
| 	g.notification.MemberKickedNotification(ctx, tips, req.SendMessage) | ||||
| 	if err := g.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @ -822,8 +824,14 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup | ||||
| 		if member == nil { | ||||
| 			log.ZDebug(ctx, "GroupApplicationResponse", "member is nil") | ||||
| 		} else { | ||||
| 			if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, groupRequest.InviterUserID, req.FromUserID); err != nil { | ||||
| 				return nil, err | ||||
| 			if groupRequest.InviterUserID == "" { | ||||
| 				if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil { | ||||
| 					return nil, err | ||||
| 				} | ||||
| 			} else { | ||||
| 				if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, nil, groupRequest.InviterUserID, req.FromUserID); err != nil { | ||||
| 					return nil, err | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	case constant.GroupResponseRefuse: | ||||
| @ -1025,7 +1033,8 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf | ||||
| 				log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) | ||||
| 			} | ||||
| 		}() | ||||
| 		g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) | ||||
| 		notficationFlag := true | ||||
| 		g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ficationFlag) | ||||
| 	} | ||||
| 	if req.GroupInfoForSet.GroupName != "" { | ||||
| 		num-- | ||||
| @ -1086,7 +1095,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	updatedData, err := UpdateGroupInfoExMap(ctx, req) | ||||
| 	updatedData, normalFlag, groupNameFlag, notificationFlag, err := UpdateGroupInfoExMap(ctx, req) | ||||
| 	if len(updatedData) == 0 { | ||||
| 		return &pbgroup.SetGroupInfoExResp{}, nil | ||||
| 	} | ||||
| @ -1114,41 +1123,38 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI | ||||
| 		tips.OpUser = g.groupMemberDB2PB(opMember, 0) | ||||
| 	} | ||||
| 
 | ||||
| 	num := len(updatedData) | ||||
| 
 | ||||
| 	if req.Notification != nil { | ||||
| 		num -= 3 | ||||
| 
 | ||||
| 	if notificationFlag { | ||||
| 		if req.Notification.Value != "" { | ||||
| 			func() { | ||||
| 				conversation := &pbconv.ConversationReq{ | ||||
| 					ConversationID:   msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), | ||||
| 					ConversationType: constant.ReadGroupChatType, | ||||
| 					GroupID:          req.GroupID, | ||||
| 				} | ||||
| 			conversation := &pbconv.ConversationReq{ | ||||
| 				ConversationID:   msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), | ||||
| 				ConversationType: constant.ReadGroupChatType, | ||||
| 				GroupID:          req.GroupID, | ||||
| 			} | ||||
| 
 | ||||
| 				resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) | ||||
| 				if err != nil { | ||||
| 					log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) | ||||
| 					return | ||||
| 				} | ||||
| 			resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) | ||||
| 			if err != nil { | ||||
| 				log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) | ||||
| 				return nil, err | ||||
| 			} | ||||
| 
 | ||||
| 				conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} | ||||
| 				if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { | ||||
| 					log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) | ||||
| 				} | ||||
| 			}() | ||||
| 			conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} | ||||
| 			if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { | ||||
| 				log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) | ||||
| 			} | ||||
| 
 | ||||
| 			g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) | ||||
| 			g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) | ||||
| 		} else { | ||||
| 			notificationFlag = false | ||||
| 			g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if req.GroupName != nil { | ||||
| 		num-- | ||||
| 	if groupNameFlag { | ||||
| 		g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) | ||||
| 	} | ||||
| 
 | ||||
| 	if num > 0 { | ||||
| 	// if updatedData > 0, send the normal notification | ||||
| 	if normalFlag { | ||||
| 		g.notification.GroupInfoSetNotification(ctx, tips) | ||||
| 	} | ||||
| 
 | ||||
| @ -1369,7 +1375,7 @@ func (g *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou | ||||
| 		if mcontext.GetOpUserID(ctx) == owner.UserID { | ||||
| 			tips.OpUser = g.groupMemberDB2PB(owner, 0) | ||||
| 		} | ||||
| 		g.notification.GroupDismissedNotification(ctx, tips) | ||||
| 		g.notification.GroupDismissedNotification(ctx, tips, req.SendMessage) | ||||
| 	} | ||||
| 	membersID, err := g.db.FindGroupMemberUserID(ctx, group.GroupID) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -52,7 +52,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	groupIDs, err := s.db.FindJoinGroupID(ctx, req.UserID) | ||||
| 	groupIDs, err := g.db.FindJoinGroupID(ctx, req.UserID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @ -68,8 +68,8 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { | ||||
| 	group, err := s.db.TakeGroup(ctx, req.GroupID) | ||||
| func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { | ||||
| 	group, err := g.db.TakeGroup(ctx, req.GroupID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @ -89,7 +89,7 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou | ||||
| 		VersionID:     req.VersionID, | ||||
| 		VersionNumber: req.Version, | ||||
| 		Version: func(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { | ||||
| 			vl, err := s.db.FindMemberIncrVersion(ctx, groupID, version, limit) | ||||
| 			vl, err := g.db.FindMemberIncrVersion(ctx, groupID, version, limit) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| @ -112,9 +112,9 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou | ||||
| 			} | ||||
| 			return vl, nil | ||||
| 		}, | ||||
| 		CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, | ||||
| 		CacheMaxVersion: g.db.FindMaxGroupMemberVersionCache, | ||||
| 		Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { | ||||
| 			return s.getGroupMembersInfo(ctx, req.GroupID, ids) | ||||
| 			return g.getGroupMembersInfo(ctx, req.GroupID, ids) | ||||
| 		}, | ||||
| 		Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { | ||||
| 			return &pbgroup.GetIncrementalGroupMemberResp{ | ||||
| @ -133,15 +133,15 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if resp.Full || hasGroupUpdate { | ||||
| 		count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) | ||||
| 		count, err := g.db.FindGroupMemberNum(ctx, group.GroupID) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		owner, err := s.db.TakeGroupOwner(ctx, group.GroupID) | ||||
| 		owner, err := g.db.TakeGroupOwner(ctx, group.GroupID) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		resp.Group = s.groupDB2PB(group, owner.UserID, count) | ||||
| 		resp.Group = g.groupDB2PB(group, owner.UserID, count) | ||||
| 	} | ||||
| 	return resp, nil | ||||
| } | ||||
| @ -155,9 +155,9 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. | ||||
| 		VersionKey:      req.UserID, | ||||
| 		VersionID:       req.VersionID, | ||||
| 		VersionNumber:   req.Version, | ||||
| 		Version:         s.db.FindJoinIncrVersion, | ||||
| 		CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache, | ||||
| 		Find:            s.getGroupsInfo, | ||||
| 		Version:         g.db.FindJoinIncrVersion, | ||||
| 		CacheMaxVersion: g.db.FindMaxJoinGroupVersionCache, | ||||
| 		Find:            g.getGroupsInfo, | ||||
| 		Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { | ||||
| 			return &pbgroup.GetIncrementalJoinGroupResp{ | ||||
| 				VersionID: version.ID.Hex(), | ||||
| @ -171,3 +171,23 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. | ||||
| 	} | ||||
| 	return opt.Build() | ||||
| } | ||||
| 
 | ||||
| func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { | ||||
| 	var num int | ||||
| 	resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) | ||||
| 	for _, memberReq := range req.ReqList { | ||||
| 		if _, ok := resp[memberReq.GroupID]; ok { | ||||
| 			continue | ||||
| 		} | ||||
| 		memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		resp[memberReq.GroupID] = memberResp | ||||
| 		num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete) | ||||
| 		if num >= versionSyncLimit { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 	return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil | ||||
| } | ||||
|  | ||||
| @ -23,11 +23,11 @@ import ( | ||||
| ) | ||||
| 
 | ||||
| type MsgNotificationSender struct { | ||||
| 	*rpcclient.NotificationSender | ||||
| 	*notification.NotificationSender | ||||
| } | ||||
| 
 | ||||
| func NewMsgNotificationSender(config *Config, opts ...rpcclient.NotificationSenderOptions) *MsgNotificationSender { | ||||
| 	return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.NotificationConfig, opts...)} | ||||
| func NewMsgNotificationSender(config *Config, opts ...notification.NotificationSenderOptions) *MsgNotificationSender { | ||||
| 	return &MsgNotificationSender{notification.NewNotificationSender(&config.NotificationConfig, opts...)} | ||||
| } | ||||
| 
 | ||||
| func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) { | ||||
|  | ||||
| @ -58,15 +58,14 @@ type Config struct { | ||||
| // MsgServer encapsulates dependencies required for message handling. | ||||
| type msgServer struct { | ||||
| 	msg.UnimplementedMsgServer | ||||
| 	RegisterCenter         discovery.Conn               // Service discovery registry for service registration. | ||||
| 	MsgDatabase            controller.CommonMsgDatabase // Interface for message database operations. | ||||
| 	StreamMsgDatabase      controller.StreamMsgDatabase | ||||
| 	RegisterCenter         discovery.Conn                   // Service discovery registry for service registration. | ||||
| 	MsgDatabase            controller.CommonMsgDatabase     // Interface for message database operations. | ||||
| 	UserLocalCache         *rpccache.UserLocalCache         // Local cache for user data. | ||||
| 	FriendLocalCache       *rpccache.FriendLocalCache       // Local cache for friend data. | ||||
| 	GroupLocalCache        *rpccache.GroupLocalCache        // Local cache for group data. | ||||
| 	ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. | ||||
| 	Handlers               MessageInterceptorChain          // Chain of handlers for processing messages. | ||||
| 	notificationSender     *rpcclient.NotificationSender    // RPC client for sending notifications. | ||||
| 	notificationSender     *notification.NotificationSender // RPC client for sending notifications. | ||||
| 	msgNotificationSender  *MsgNotificationSender           // RPC client for sending msg notifications. | ||||
| 	config                 *Config                          // Global configuration settings. | ||||
| 	webhookClient          *webhook.Client | ||||
| @ -147,8 +146,8 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr | ||||
| 		conversationClient:     conversationClient, | ||||
| 	} | ||||
| 
 | ||||
| 	s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) | ||||
| 	s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) | ||||
| 	s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg)) | ||||
| 	s.msgNotificationSender = NewMsgNotificationSender(config, notification.WithLocalSendMsg(s.SendMsg)) | ||||
| 
 | ||||
| 	msg.RegisterMsgServer(server, s) | ||||
| 
 | ||||
|  | ||||
| @ -16,6 +16,7 @@ package relation | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 	"github.com/openimsdk/protocol/msg" | ||||
| 
 | ||||
| @ -36,7 +37,7 @@ import ( | ||||
| ) | ||||
| 
 | ||||
| type FriendNotificationSender struct { | ||||
| 	*rpcclient.NotificationSender | ||||
| 	*notification.NotificationSender | ||||
| 	// Target not found err | ||||
| 	getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) | ||||
| 	// db controller | ||||
| @ -89,7 +90,7 @@ func WithRpcFunc( | ||||
| 
 | ||||
| func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient, opts ...friendNotificationSenderOptions) *FriendNotificationSender { | ||||
| 	f := &FriendNotificationSender{ | ||||
| 		NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { | ||||
| 		NotificationSender: notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { | ||||
| 			return msgClient.SendMsg(ctx, req) | ||||
| 		})), | ||||
| 	} | ||||
|  | ||||
| @ -16,6 +16,7 @@ package user | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 	"github.com/openimsdk/protocol/msg" | ||||
| 
 | ||||
| @ -29,7 +30,7 @@ import ( | ||||
| ) | ||||
| 
 | ||||
| type UserNotificationSender struct { | ||||
| 	*rpcclient.NotificationSender | ||||
| 	*notification.NotificationSender | ||||
| 	getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) | ||||
| 	// db controller | ||||
| 	db controller.UserDatabase | ||||
| @ -63,7 +64,7 @@ func WithUserFunc( | ||||
| 
 | ||||
| func NewUserNotificationSender(config *Config, msgClient *rpcli.MsgClient, opts ...userNotificationSenderOptions) *UserNotificationSender { | ||||
| 	f := &UserNotificationSender{ | ||||
| 		NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { | ||||
| 		NotificationSender: notification.NewNotificationSender(&config.NotificationConfig, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { | ||||
| 			return msgClient.SendMsg(ctx, req) | ||||
| 		})), | ||||
| 	} | ||||
|  | ||||
| @ -44,7 +44,8 @@ func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire i | ||||
| 	return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLogin: multiLoginConfig{ | ||||
| 		Policy:       multiLogin.Policy, | ||||
| 		MaxNumOneEnd: multiLogin.MaxNumOneEnd, | ||||
| 	}, adminUserIDs: adminUserIDs, | ||||
| 	}, | ||||
| 		adminUserIDs: adminUserIDs, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| @ -90,23 +91,25 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	deleteTokenKey, kickedTokenKey, err := a.checkToken(ctx, tokens, platformID) | ||||
| 	deleteTokenKey, kickedTokenKey, adminTokens, err := a.checkToken(ctx, tokens, platformID) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 	if len(deleteTokenKey) != 0 { | ||||
| 		err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) | ||||
| 		err = a.cache.DeleteTokenByTokenMap(ctx, userID, deleteTokenKey) | ||||
| 		if err != nil { | ||||
| 			return "", err | ||||
| 		} | ||||
| 	} | ||||
| 	if len(kickedTokenKey) != 0 { | ||||
| 		for _, k := range kickedTokenKey { | ||||
| 			err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken) | ||||
| 			if err != nil { | ||||
| 				return "", err | ||||
| 		for plt, ks := range kickedTokenKey { | ||||
| 			for _, k := range ks { | ||||
| 				err := a.cache.SetTokenFlagEx(ctx, userID, plt, k, constant.KickedToken) | ||||
| 				if err != nil { | ||||
| 					return "", err | ||||
| 				} | ||||
| 				log.ZDebug(ctx, "kicked token in create token", "token", k) | ||||
| 			} | ||||
| 			log.ZDebug(ctx, "kicked token in create token", "token", k) | ||||
| 		} | ||||
| 	} | ||||
| 	if len(adminTokens) != 0 { | ||||
| @ -242,8 +245,9 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string | ||||
| 	//if l > adminTokenMaxNum { | ||||
| 	//	kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...) | ||||
| 	//} | ||||
| 	var deleteAdminToken []string | ||||
| 	if platformID == constant.AdminPlatformID { | ||||
| 		kickToken = append(kickToken, adminToken...) | ||||
| 		deleteAdminToken = adminToken | ||||
| 	} | ||||
| 	return deleteToken, kickToken, nil | ||||
| 	return deleteToken, kickToken, deleteAdminToken, nil | ||||
| } | ||||
|  | ||||
| @ -35,7 +35,6 @@ import ( | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	pbmsg "github.com/openimsdk/protocol/msg" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
|  | ||||
| @ -11,7 +11,6 @@ import ( | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 	pbmsg "github.com/openimsdk/protocol/msg" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
|  | ||||
| @ -18,7 +18,6 @@ import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/protocol/push" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
| 	"github.com/openimsdk/tools/log" | ||||
|  | ||||
| @ -25,11 +25,11 @@ import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" | ||||
| 	"github.com/openimsdk/tools/db/mongoutil" | ||||
| 	"github.com/openimsdk/tools/db/redisutil" | ||||
| 	"github.com/openimsdk/tools/discovery/etcd" | ||||
| 	"github.com/openimsdk/tools/discovery/zookeeper" | ||||
| 	"github.com/openimsdk/tools/mq/kafka" | ||||
| 	"github.com/openimsdk/tools/s3/minio" | ||||
| 	"github.com/openimsdk/tools/system/program" | ||||
| ) | ||||
|  | ||||
| @ -1,25 +0,0 @@ | ||||
| # Stress Test | ||||
| 
 | ||||
| ## Usage | ||||
| 
 | ||||
| You need set `TestTargetUserList` and `DefaultGroupID` variables. | ||||
| 
 | ||||
| ### Build | ||||
| 
 | ||||
| ```bash | ||||
| go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go | ||||
| 
 | ||||
| # or | ||||
| 
 | ||||
| go build -o tools/stress-test/stress-test tools/stress-test/main.go | ||||
| ``` | ||||
| 
 | ||||
| ### Excute | ||||
| 
 | ||||
| ```bash | ||||
| _output/bin/tools/linux/amd64/stress-test -c config/ | ||||
| 
 | ||||
| #or | ||||
| 
 | ||||
| tools/stress-test/stress-test -c config/ | ||||
| ``` | ||||
| @ -1,459 +0,0 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"flag" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"os/signal" | ||||
| 	"sync" | ||||
| 	"syscall" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/protocol/auth" | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	"github.com/openimsdk/protocol/group" | ||||
| 	"github.com/openimsdk/protocol/relation" | ||||
| 	"github.com/openimsdk/protocol/sdkws" | ||||
| 	pbuser "github.com/openimsdk/protocol/user" | ||||
| 	"github.com/openimsdk/tools/log" | ||||
| 	"github.com/openimsdk/tools/system/program" | ||||
| ) | ||||
| 
 | ||||
| /* | ||||
|  1. Create one user every minute | ||||
|  2. Import target users as friends | ||||
|  3. Add users to the default group | ||||
|  4. Send a message to the default group every second, containing index and current timestamp | ||||
|  5. Create a new group every minute and invite target users to join | ||||
| */ | ||||
| 
 | ||||
| // !!! ATTENTION: This variable is must be added! | ||||
| var ( | ||||
| 	//  Use default userIDs List for testing, need to be created. | ||||
| 	TestTargetUserList = []string{ | ||||
| 		"<need-update-it>", | ||||
| 	} | ||||
| 	DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created. | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	ApiAddress string | ||||
| 
 | ||||
| 	// API method | ||||
| 	GetAdminToken = "/auth/get_admin_token" | ||||
| 	CreateUser    = "/user/user_register" | ||||
| 	ImportFriend  = "/friend/import_friend" | ||||
| 	InviteToGroup = "/group/invite_user_to_group" | ||||
| 	SendMsg       = "/msg/send_msg" | ||||
| 	CreateGroup   = "/group/create_group" | ||||
| 	GetUserToken  = "/auth/user_token" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	MaxUser  = 10000 | ||||
| 	MaxGroup = 1000 | ||||
| 
 | ||||
| 	CreateUserTicker  = 1 * time.Minute // Ticker is 1min in create user | ||||
| 	SendMessageTicker = 1 * time.Second // Ticker is 1s in send message | ||||
| 	CreateGroupTicker = 1 * time.Minute | ||||
| ) | ||||
| 
 | ||||
| type BaseResp struct { | ||||
| 	ErrCode int             `json:"errCode"` | ||||
| 	ErrMsg  string          `json:"errMsg"` | ||||
| 	Data    json.RawMessage `json:"data"` | ||||
| } | ||||
| 
 | ||||
| type StressTest struct { | ||||
| 	Conf           *conf | ||||
| 	AdminUserID    string | ||||
| 	AdminToken     string | ||||
| 	DefaultGroupID string | ||||
| 	DefaultUserID  string | ||||
| 	UserCounter    int | ||||
| 	GroupCounter   int | ||||
| 	MsgCounter     int | ||||
| 	CreatedUsers   []string | ||||
| 	CreatedGroups  []string | ||||
| 	Mutex          sync.Mutex | ||||
| 	Ctx            context.Context | ||||
| 	Cancel         context.CancelFunc | ||||
| 	HttpClient     *http.Client | ||||
| 	Wg             sync.WaitGroup | ||||
| 	Once           sync.Once | ||||
| } | ||||
| 
 | ||||
| type conf struct { | ||||
| 	Share config.Share | ||||
| 	Api   config.API | ||||
| } | ||||
| 
 | ||||
| func initConfig(configDir string) (*config.Share, *config.API, error) { | ||||
| 	var ( | ||||
| 		share     = &config.Share{} | ||||
| 		apiConfig = &config.API{} | ||||
| 	) | ||||
| 
 | ||||
| 	err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return share, apiConfig, nil | ||||
| } | ||||
| 
 | ||||
| // Post Request | ||||
| func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { | ||||
| 	// Marshal body | ||||
| 	jsonBody, err := json.Marshal(reqbody) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	req.Header.Set("operationID", st.AdminUserID) | ||||
| 	if st.AdminToken != "" { | ||||
| 		req.Header.Set("token", st.AdminToken) | ||||
| 	} | ||||
| 
 | ||||
| 	// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) | ||||
| 
 | ||||
| 	resp, err := st.HttpClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| 
 | ||||
| 	respBody, err := io.ReadAll(resp.Body) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to read response body", err, "url", url) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	var baseResp BaseResp | ||||
| 	if err := json.Unmarshal(respBody, &baseResp); err != nil { | ||||
| 		log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if baseResp.ErrCode != 0 { | ||||
| 		err = fmt.Errorf(baseResp.ErrMsg) | ||||
| 		log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return baseResp.Data, nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { | ||||
| 	req := auth.GetAdminTokenReq{ | ||||
| 		Secret: st.Conf.Share.Secret, | ||||
| 		UserID: st.AdminUserID, | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	data := &auth.GetAdminTokenResp{} | ||||
| 	if err := json.Unmarshal(resp, &data); err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return data.Token, nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { | ||||
| 	user := &sdkws.UserInfo{ | ||||
| 		UserID:   userID, | ||||
| 		Nickname: userID, | ||||
| 	} | ||||
| 
 | ||||
| 	req := pbuser.UserRegisterReq{ | ||||
| 		Users: []*sdkws.UserInfo{user}, | ||||
| 	} | ||||
| 
 | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	st.UserCounter++ | ||||
| 	return userID, nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { | ||||
| 	req := relation.ImportFriendReq{ | ||||
| 		OwnerUserID:   userID, | ||||
| 		FriendUserIDs: TestTargetUserList, | ||||
| 	} | ||||
| 
 | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { | ||||
| 	req := group.InviteUserToGroupReq{ | ||||
| 		GroupID:        st.DefaultGroupID, | ||||
| 		InvitedUserIDs: []string{userID}, | ||||
| 	} | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) SendMsg(ctx context.Context, userID string) error { | ||||
| 	contentObj := map[string]any{ | ||||
| 		"content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), | ||||
| 	} | ||||
| 
 | ||||
| 	req := &apistruct.SendMsgReq{ | ||||
| 		SendMsg: apistruct.SendMsg{ | ||||
| 			SendID:         userID, | ||||
| 			SenderNickname: userID, | ||||
| 			GroupID:        st.DefaultGroupID, | ||||
| 			ContentType:    constant.Text, | ||||
| 			SessionType:    constant.ReadGroupChatType, | ||||
| 			Content:        contentObj, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	st.MsgCounter++ | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { | ||||
| 	groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) | ||||
| 
 | ||||
| 	groupInfo := &sdkws.GroupInfo{ | ||||
| 		GroupID:   groupID, | ||||
| 		GroupName: groupID, | ||||
| 		GroupType: constant.WorkingGroup, | ||||
| 	} | ||||
| 
 | ||||
| 	req := group.CreateGroupReq{ | ||||
| 		OwnerUserID:   userID, | ||||
| 		MemberUserIDs: TestTargetUserList, | ||||
| 		GroupInfo:     groupInfo, | ||||
| 	} | ||||
| 
 | ||||
| 	resp := group.CreateGroupResp{} | ||||
| 
 | ||||
| 	response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := json.Unmarshal(response, &resp); err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	st.GroupCounter++ | ||||
| 
 | ||||
| 	return resp.GroupInfo.GroupID, nil | ||||
| } | ||||
| 
 | ||||
| func main() { | ||||
| 	var configPath string | ||||
| 	// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") | ||||
| 	// flag.StringVar(&configPath, "c", defaultConfigDir, "config path") | ||||
| 	flag.StringVar(&configPath, "c", "", "config path") | ||||
| 	flag.Parse() | ||||
| 
 | ||||
| 	if configPath == "" { | ||||
| 		_, _ = fmt.Fprintln(os.Stderr, "config path is empty") | ||||
| 		os.Exit(1) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	fmt.Printf(" Config Path: %s\n", configPath) | ||||
| 
 | ||||
| 	share, apiConfig, err := initConfig(configPath) | ||||
| 	if err != nil { | ||||
| 		program.ExitWithError(err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) | ||||
| 
 | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	ch := make(chan struct{}) | ||||
| 
 | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	st := &StressTest{ | ||||
| 		Conf: &conf{ | ||||
| 			Share: *share, | ||||
| 			Api:   *apiConfig, | ||||
| 		}, | ||||
| 		AdminUserID: share.IMAdminUserID[0], | ||||
| 		Ctx:         ctx, | ||||
| 		Cancel:      cancel, | ||||
| 		HttpClient: &http.Client{ | ||||
| 			Timeout: 50 * time.Second, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	c := make(chan os.Signal, 1) | ||||
| 	signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||||
| 	go func() { | ||||
| 		<-c | ||||
| 		fmt.Println("\nReceived stop signal, stopping...") | ||||
| 
 | ||||
| 		select { | ||||
| 		case <-ch: | ||||
| 		default: | ||||
| 			close(ch) | ||||
| 		} | ||||
| 
 | ||||
| 		st.Cancel() | ||||
| 	}() | ||||
| 
 | ||||
| 	token, err := st.GetAdminToken(st.Ctx) | ||||
| 	if err != nil { | ||||
| 		log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) | ||||
| 	} | ||||
| 
 | ||||
| 	st.AdminToken = token | ||||
| 	fmt.Println("Admin Token:", st.AdminToken) | ||||
| 	fmt.Println("ApiAddress:", ApiAddress) | ||||
| 
 | ||||
| 	st.DefaultGroupID = DefaultGroupID | ||||
| 
 | ||||
| 	st.Wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer st.Wg.Done() | ||||
| 
 | ||||
| 		ticker := time.NewTicker(CreateUserTicker) | ||||
| 		defer ticker.Stop() | ||||
| 
 | ||||
| 		for st.UserCounter < MaxUser { | ||||
| 			select { | ||||
| 			case <-st.Ctx.Done(): | ||||
| 				log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") | ||||
| 				return | ||||
| 
 | ||||
| 			case <-ticker.C: | ||||
| 				// Create User | ||||
| 				userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) | ||||
| 
 | ||||
| 				userCreatedID, err := st.CreateUser(st.Ctx, userID) | ||||
| 				if err != nil { | ||||
| 					log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 				// fmt.Println("User Created ID:", userCreatedID) | ||||
| 
 | ||||
| 				// Import Friend | ||||
| 				if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { | ||||
| 					log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 
 | ||||
| 				// Invite To Group | ||||
| 				if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { | ||||
| 					log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 
 | ||||
| 				st.Once.Do(func() { | ||||
| 					st.DefaultUserID = userCreatedID | ||||
| 					fmt.Println("Default Send User Created ID:", userCreatedID) | ||||
| 					close(ch) | ||||
| 				}) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	st.Wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer st.Wg.Done() | ||||
| 
 | ||||
| 		ticker := time.NewTicker(SendMessageTicker) | ||||
| 		defer ticker.Stop() | ||||
| 		<-ch | ||||
| 
 | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-st.Ctx.Done(): | ||||
| 				log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") | ||||
| 				return | ||||
| 
 | ||||
| 			case <-ticker.C: | ||||
| 				// Send Message | ||||
| 				if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { | ||||
| 					log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	st.Wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer st.Wg.Done() | ||||
| 
 | ||||
| 		ticker := time.NewTicker(CreateGroupTicker) | ||||
| 		defer ticker.Stop() | ||||
| 		<-ch | ||||
| 
 | ||||
| 		for st.GroupCounter < MaxGroup { | ||||
| 
 | ||||
| 			select { | ||||
| 			case <-st.Ctx.Done(): | ||||
| 				log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") | ||||
| 				return | ||||
| 
 | ||||
| 			case <-ticker.C: | ||||
| 
 | ||||
| 				// Create Group | ||||
| 				_, err := st.CreateGroup(st.Ctx, st.DefaultUserID) | ||||
| 				if err != nil { | ||||
| 					log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultUserID) | ||||
| 					os.Exit(1) | ||||
| 					return | ||||
| 				} | ||||
| 
 | ||||
| 				// fmt.Println("Group Created ID:", groupID) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	st.Wg.Wait() | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user