From 6750e4002276153da8847acc8daca065629b43a8 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 15 May 2025 14:23:27 +0800 Subject: [PATCH] fix: merge conflicts --- internal/api/msg.go | 3 +- internal/api/router.go | 1 - .../msgtransfer/online_history_msg_handler.go | 1 - .../online_msg_to_mongo_handler.go | 1 - internal/push/offlinepush_handler.go | 1 - internal/push/push_handler.go | 1 - internal/rpc/conversation/notification.go | 7 +- internal/rpc/group/db_map.go | 24 +- internal/rpc/group/group.go | 74 +-- internal/rpc/group/sync.go | 44 +- internal/rpc/msg/notification.go | 6 +- internal/rpc/msg/server.go | 11 +- internal/rpc/relation/notification.go | 5 +- internal/rpc/user/notification.go | 5 +- pkg/common/storage/controller/auth.go | 24 +- pkg/common/storage/controller/msg.go | 1 - pkg/common/storage/controller/msg_transfer.go | 1 - pkg/common/storage/controller/push.go | 1 - tools/check-component/main.go | 2 +- tools/stress-test/README.md | 25 - tools/stress-test/main.go | 459 ------------------ 21 files changed, 125 insertions(+), 572 deletions(-) delete mode 100644 tools/stress-test/README.md delete mode 100755 tools/stress-test/main.go diff --git a/internal/api/msg.go b/internal/api/msg.go index 8be4832e6..5349faf87 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -379,7 +379,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { IsSendMsg: req.SendMsg, ReliabilityLevel: *req.ReliabilityLevel, UnreadCount: false, - }), + }, nil), }, } respPb, err := m.Client.SendMsg(c, &sendMsgReq) @@ -524,6 +524,7 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) { apiresp.GinError(c, err) return } + m.ginRespSendMsg(c, sendReq, respPb) } diff --git a/internal/api/router.go b/internal/api/router.go index 7afe61fec..31c535e53 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -12,7 +12,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - clientv3 "go.etcd.io/etcd/client/v3" "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/openimsdk/open-im-server/v3/pkg/common/config" diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 41bbe081c..a2d0cca67 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -28,7 +28,6 @@ import ( "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 407a4ae28..ae14d02a1 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -19,7 +19,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index d99f2647d..eaf6b8ed8 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -6,7 +6,6 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 92eec3cb1..418c4c7f2 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -9,7 +9,6 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index c6368b916..370865c1a 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -16,21 +16,22 @@ package conversation import ( "context" + + "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" ) type ConversationNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender } func NewConversationNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient) *ConversationNotificationSender { - return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + return &ConversationNotificationSender{notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) }))} } diff --git a/internal/rpc/group/db_map.go b/internal/rpc/group/db_map.go index e99f9e772..7504bc851 100644 --- a/internal/rpc/group/db_map.go +++ b/internal/rpc/group/db_map.go @@ -16,6 +16,7 @@ package group import ( "context" + "strings" "time" pbgroup "github.com/openimsdk/protocol/group" @@ -55,41 +56,52 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s return m } -func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (map[string]any, error) { - m := make(map[string]any) +func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (m map[string]any, normalFlag, groupNameFlag, notificationFlag bool, err error) { + m = make(map[string]any) if group.GroupName != nil { - if group.GroupName.Value != "" { + if strings.TrimSpace(group.GroupName.Value) != "" { m["group_name"] = group.GroupName.Value + groupNameFlag = true } else { - return nil, errs.ErrArgs.WrapMsg("group name is empty") + return nil, normalFlag, notificationFlag, groupNameFlag, errs.ErrArgs.WrapMsg("group name is empty") } } + if group.Notification != nil { + notificationFlag = true + group.Notification.Value = strings.TrimSpace(group.Notification.Value) // if Notification only contains spaces, set it to empty string + m["notification"] = group.Notification.Value - m["notification_update_time"] = time.Now() m["notification_user_id"] = mcontext.GetOpUserID(ctx) + m["notification_update_time"] = time.Now() } if group.Introduction != nil { m["introduction"] = group.Introduction.Value + normalFlag = true } if group.FaceURL != nil { m["face_url"] = group.FaceURL.Value + normalFlag = true } if group.NeedVerification != nil { m["need_verification"] = group.NeedVerification.Value + normalFlag = true } if group.LookMemberInfo != nil { m["look_member_info"] = group.LookMemberInfo.Value + normalFlag = true } if group.ApplyMemberFriend != nil { m["apply_member_friend"] = group.ApplyMemberFriend.Value + normalFlag = true } if group.Ex != nil { m["ex"] = group.Ex.Value + normalFlag = true } - return m, nil + return m, normalFlag, groupNameFlag, notificationFlag, nil } func UpdateGroupStatusMap(status int) map[string]any { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 1255e4eba..10cdc2546 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -26,6 +26,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "google.golang.org/grpc" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -51,7 +53,6 @@ import ( "github.com/openimsdk/tools/mw/specialerror" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/encrypt" - "google.golang.org/grpc" ) type groupServer struct { @@ -284,13 +285,14 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR break } } - g.notification.GroupCreatedNotification(ctx, tips) + g.notification.GroupCreatedNotification(ctx, tips, req.SendMessage) if req.GroupInfo.Notification != "" { + notificationFlag := true g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{ Group: tips.Group, OpUser: tips.OpUser, - }) + }, ¬ificationFlag) } reqCallBackAfter := &pbgroup.CreateGroupReq{ @@ -613,7 +615,7 @@ func (g *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou for _, userID := range req.KickedUserIDs { tips.KickedUserList = append(tips.KickedUserList, convert.Db2PbGroupMember(memberMap[userID])) } - g.notification.MemberKickedNotification(ctx, tips) + g.notification.MemberKickedNotification(ctx, tips, req.SendMessage) if err := g.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil { return nil, err } @@ -822,8 +824,14 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup if member == nil { log.ZDebug(ctx, "GroupApplicationResponse", "member is nil") } else { - if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, groupRequest.InviterUserID, req.FromUserID); err != nil { - return nil, err + if groupRequest.InviterUserID == "" { + if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil { + return nil, err + } + } else { + if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, nil, groupRequest.InviterUserID, req.FromUserID); err != nil { + return nil, err + } } } case constant.GroupResponseRefuse: @@ -1025,7 +1033,8 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) } }() - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + notficationFlag := true + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ficationFlag) } if req.GroupInfoForSet.GroupName != "" { num-- @@ -1086,7 +1095,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI return nil, err } - updatedData, err := UpdateGroupInfoExMap(ctx, req) + updatedData, normalFlag, groupNameFlag, notificationFlag, err := UpdateGroupInfoExMap(ctx, req) if len(updatedData) == 0 { return &pbgroup.SetGroupInfoExResp{}, nil } @@ -1114,41 +1123,38 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI tips.OpUser = g.groupMemberDB2PB(opMember, 0) } - num := len(updatedData) - - if req.Notification != nil { - num -= 3 - + if notificationFlag { if req.Notification.Value != "" { - func() { - conversation := &pbconv.ConversationReq{ - ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), - ConversationType: constant.ReadGroupChatType, - GroupID: req.GroupID, - } + conversation := &pbconv.ConversationReq{ + ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), + ConversationType: constant.ReadGroupChatType, + GroupID: req.GroupID, + } - resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) - if err != nil { - log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) - return - } + resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) + if err != nil { + log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) + return nil, err + } - conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} - if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { - log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) - } - }() + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} + if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { + log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) + } - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) + } else { + notificationFlag = false + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) } } - if req.GroupName != nil { - num-- + if groupNameFlag { g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) } - if num > 0 { + // if updatedData > 0, send the normal notification + if normalFlag { g.notification.GroupInfoSetNotification(ctx, tips) } @@ -1369,7 +1375,7 @@ func (g *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou if mcontext.GetOpUserID(ctx) == owner.UserID { tips.OpUser = g.groupMemberDB2PB(owner, 0) } - g.notification.GroupDismissedNotification(ctx, tips) + g.notification.GroupDismissedNotification(ctx, tips, req.SendMessage) } membersID, err := g.db.FindGroupMemberUserID(ctx, group.GroupID) if err != nil { diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index ed608dea3..822b15307 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -52,7 +52,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF if err != nil { return nil, err } - groupIDs, err := s.db.FindJoinGroupID(ctx, req.UserID) + groupIDs, err := g.db.FindJoinGroupID(ctx, req.UserID) if err != nil { return nil, err } @@ -68,8 +68,8 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF }, nil } -func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { - group, err := s.db.TakeGroup(ctx, req.GroupID) +func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { + group, err := g.db.TakeGroup(ctx, req.GroupID) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou VersionID: req.VersionID, VersionNumber: req.Version, Version: func(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { - vl, err := s.db.FindMemberIncrVersion(ctx, groupID, version, limit) + vl, err := g.db.FindMemberIncrVersion(ctx, groupID, version, limit) if err != nil { return nil, err } @@ -112,9 +112,9 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou } return vl, nil }, - CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, + CacheMaxVersion: g.db.FindMaxGroupMemberVersionCache, Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { - return s.getGroupMembersInfo(ctx, req.GroupID, ids) + return g.getGroupMembersInfo(ctx, req.GroupID, ids) }, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ @@ -133,15 +133,15 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou return nil, err } if resp.Full || hasGroupUpdate { - count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) + count, err := g.db.FindGroupMemberNum(ctx, group.GroupID) if err != nil { return nil, err } - owner, err := s.db.TakeGroupOwner(ctx, group.GroupID) + owner, err := g.db.TakeGroupOwner(ctx, group.GroupID) if err != nil { return nil, err } - resp.Group = s.groupDB2PB(group, owner.UserID, count) + resp.Group = g.groupDB2PB(group, owner.UserID, count) } return resp, nil } @@ -155,9 +155,9 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. VersionKey: req.UserID, VersionID: req.VersionID, VersionNumber: req.Version, - Version: s.db.FindJoinIncrVersion, - CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache, - Find: s.getGroupsInfo, + Version: g.db.FindJoinIncrVersion, + CacheMaxVersion: g.db.FindMaxJoinGroupVersionCache, + Find: g.getGroupsInfo, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { return &pbgroup.GetIncrementalJoinGroupResp{ VersionID: version.ID.Hex(), @@ -171,3 +171,23 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. } return opt.Build() } + +func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { + var num int + resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) + for _, memberReq := range req.ReqList { + if _, ok := resp[memberReq.GroupID]; ok { + continue + } + memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq) + if err != nil { + return nil, err + } + resp[memberReq.GroupID] = memberResp + num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete) + if num >= versionSyncLimit { + break + } + } + return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil +} diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index d5604286a..0418823d6 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -23,11 +23,11 @@ import ( ) type MsgNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender } -func NewMsgNotificationSender(config *Config, opts ...rpcclient.NotificationSenderOptions) *MsgNotificationSender { - return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.NotificationConfig, opts...)} +func NewMsgNotificationSender(config *Config, opts ...notification.NotificationSenderOptions) *MsgNotificationSender { + return &MsgNotificationSender{notification.NewNotificationSender(&config.NotificationConfig, opts...)} } func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index fd360295f..5ebe79dd9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -58,15 +58,14 @@ type Config struct { // MsgServer encapsulates dependencies required for message handling. type msgServer struct { msg.UnimplementedMsgServer - RegisterCenter discovery.Conn // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. - StreamMsgDatabase controller.StreamMsgDatabase + RegisterCenter discovery.Conn // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. Handlers MessageInterceptorChain // Chain of handlers for processing messages. - notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. + notificationSender *notification.NotificationSender // RPC client for sending notifications. msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client @@ -147,8 +146,8 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr conversationClient: conversationClient, } - s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) - s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) + s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg)) + s.msgNotificationSender = NewMsgNotificationSender(config, notification.WithLocalSendMsg(s.SendMsg)) msg.RegisterMsgServer(server, s) diff --git a/internal/rpc/relation/notification.go b/internal/rpc/relation/notification.go index a34a4d322..caf2dafe1 100644 --- a/internal/rpc/relation/notification.go +++ b/internal/rpc/relation/notification.go @@ -16,6 +16,7 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" @@ -36,7 +37,7 @@ import ( ) type FriendNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender // Target not found err getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) // db controller @@ -89,7 +90,7 @@ func WithRpcFunc( func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient, opts ...friendNotificationSenderOptions) *FriendNotificationSender { f := &FriendNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + NotificationSender: notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) })), } diff --git a/internal/rpc/user/notification.go b/internal/rpc/user/notification.go index 03fdf95bd..4fb214f74 100644 --- a/internal/rpc/user/notification.go +++ b/internal/rpc/user/notification.go @@ -16,6 +16,7 @@ package user import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" @@ -29,7 +30,7 @@ import ( ) type UserNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) // db controller db controller.UserDatabase @@ -63,7 +64,7 @@ func WithUserFunc( func NewUserNotificationSender(config *Config, msgClient *rpcli.MsgClient, opts ...userNotificationSenderOptions) *UserNotificationSender { f := &UserNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + NotificationSender: notification.NewNotificationSender(&config.NotificationConfig, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) })), } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index a3260d1bc..488a116c3 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -44,7 +44,8 @@ func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire i return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLogin: multiLoginConfig{ Policy: multiLogin.Policy, MaxNumOneEnd: multiLogin.MaxNumOneEnd, - }, adminUserIDs: adminUserIDs, + }, + adminUserIDs: adminUserIDs, } } @@ -90,23 +91,25 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return "", err } - deleteTokenKey, kickedTokenKey, err := a.checkToken(ctx, tokens, platformID) + deleteTokenKey, kickedTokenKey, adminTokens, err := a.checkToken(ctx, tokens, platformID) if err != nil { return "", err } if len(deleteTokenKey) != 0 { - err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) + err = a.cache.DeleteTokenByTokenMap(ctx, userID, deleteTokenKey) if err != nil { return "", err } } if len(kickedTokenKey) != 0 { - for _, k := range kickedTokenKey { - err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken) - if err != nil { - return "", err + for plt, ks := range kickedTokenKey { + for _, k := range ks { + err := a.cache.SetTokenFlagEx(ctx, userID, plt, k, constant.KickedToken) + if err != nil { + return "", err + } + log.ZDebug(ctx, "kicked token in create token", "token", k) } - log.ZDebug(ctx, "kicked token in create token", "token", k) } } if len(adminTokens) != 0 { @@ -242,8 +245,9 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string //if l > adminTokenMaxNum { // kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...) //} + var deleteAdminToken []string if platformID == constant.AdminPlatformID { - kickToken = append(kickToken, adminToken...) + deleteAdminToken = adminToken } - return deleteToken, kickToken, nil + return deleteToken, kickToken, deleteAdminToken, nil } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 117d6492d..53dd7f13d 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -35,7 +35,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index 011182b10..28392d66e 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -11,7 +11,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go index d792346e8..ce62a7258 100644 --- a/pkg/common/storage/controller/push.go +++ b/pkg/common/storage/controller/push.go @@ -18,7 +18,6 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" diff --git a/tools/check-component/main.go b/tools/check-component/main.go index cce2e7c95..993f549be 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -25,11 +25,11 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" + "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/system/program" ) diff --git a/tools/stress-test/README.md b/tools/stress-test/README.md deleted file mode 100644 index 531233a20..000000000 --- a/tools/stress-test/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# Stress Test - -## Usage - -You need set `TestTargetUserList` and `DefaultGroupID` variables. - -### Build - -```bash -go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go - -# or - -go build -o tools/stress-test/stress-test tools/stress-test/main.go -``` - -### Excute - -```bash -_output/bin/tools/linux/amd64/stress-test -c config/ - -#or - -tools/stress-test/stress-test -c config/ -``` diff --git a/tools/stress-test/main.go b/tools/stress-test/main.go deleted file mode 100755 index f845b5e93..000000000 --- a/tools/stress-test/main.go +++ /dev/null @@ -1,459 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "flag" - "fmt" - "io" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/apistruct" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/protocol/auth" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/relation" - "github.com/openimsdk/protocol/sdkws" - pbuser "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/system/program" -) - -/* - 1. Create one user every minute - 2. Import target users as friends - 3. Add users to the default group - 4. Send a message to the default group every second, containing index and current timestamp - 5. Create a new group every minute and invite target users to join -*/ - -// !!! ATTENTION: This variable is must be added! -var ( - // Use default userIDs List for testing, need to be created. - TestTargetUserList = []string{ - "", - } - DefaultGroupID = "" // Use default group ID for testing, need to be created. -) - -var ( - ApiAddress string - - // API method - GetAdminToken = "/auth/get_admin_token" - CreateUser = "/user/user_register" - ImportFriend = "/friend/import_friend" - InviteToGroup = "/group/invite_user_to_group" - SendMsg = "/msg/send_msg" - CreateGroup = "/group/create_group" - GetUserToken = "/auth/user_token" -) - -const ( - MaxUser = 10000 - MaxGroup = 1000 - - CreateUserTicker = 1 * time.Minute // Ticker is 1min in create user - SendMessageTicker = 1 * time.Second // Ticker is 1s in send message - CreateGroupTicker = 1 * time.Minute -) - -type BaseResp struct { - ErrCode int `json:"errCode"` - ErrMsg string `json:"errMsg"` - Data json.RawMessage `json:"data"` -} - -type StressTest struct { - Conf *conf - AdminUserID string - AdminToken string - DefaultGroupID string - DefaultUserID string - UserCounter int - GroupCounter int - MsgCounter int - CreatedUsers []string - CreatedGroups []string - Mutex sync.Mutex - Ctx context.Context - Cancel context.CancelFunc - HttpClient *http.Client - Wg sync.WaitGroup - Once sync.Once -} - -type conf struct { - Share config.Share - Api config.API -} - -func initConfig(configDir string) (*config.Share, *config.API, error) { - var ( - share = &config.Share{} - apiConfig = &config.API{} - ) - - err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) - if err != nil { - return nil, nil, err - } - - err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) - if err != nil { - return nil, nil, err - } - - return share, apiConfig, nil -} - -// Post Request -func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { - // Marshal body - jsonBody, err := json.Marshal(reqbody) - if err != nil { - log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) - return nil, err - } - - req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("operationID", st.AdminUserID) - if st.AdminToken != "" { - req.Header.Set("token", st.AdminToken) - } - - // log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) - - resp, err := st.HttpClient.Do(req) - if err != nil { - log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) - return nil, err - } - defer resp.Body.Close() - - respBody, err := io.ReadAll(resp.Body) - if err != nil { - log.ZError(ctx, "Failed to read response body", err, "url", url) - return nil, err - } - - var baseResp BaseResp - if err := json.Unmarshal(respBody, &baseResp); err != nil { - log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) - return nil, err - } - - if baseResp.ErrCode != 0 { - err = fmt.Errorf(baseResp.ErrMsg) - log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) - return nil, err - } - - return baseResp.Data, nil -} - -func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { - req := auth.GetAdminTokenReq{ - Secret: st.Conf.Share.Secret, - UserID: st.AdminUserID, - } - - resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) - if err != nil { - return "", err - } - - data := &auth.GetAdminTokenResp{} - if err := json.Unmarshal(resp, &data); err != nil { - return "", err - } - - return data.Token, nil -} - -func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { - user := &sdkws.UserInfo{ - UserID: userID, - Nickname: userID, - } - - req := pbuser.UserRegisterReq{ - Users: []*sdkws.UserInfo{user}, - } - - _, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) - if err != nil { - return "", err - } - - st.UserCounter++ - return userID, nil -} - -func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { - req := relation.ImportFriendReq{ - OwnerUserID: userID, - FriendUserIDs: TestTargetUserList, - } - - _, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) - if err != nil { - return err - } - - return nil -} - -func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { - req := group.InviteUserToGroupReq{ - GroupID: st.DefaultGroupID, - InvitedUserIDs: []string{userID}, - } - _, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) - if err != nil { - return err - } - - return nil -} - -func (st *StressTest) SendMsg(ctx context.Context, userID string) error { - contentObj := map[string]any{ - "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), - } - - req := &apistruct.SendMsgReq{ - SendMsg: apistruct.SendMsg{ - SendID: userID, - SenderNickname: userID, - GroupID: st.DefaultGroupID, - ContentType: constant.Text, - SessionType: constant.ReadGroupChatType, - Content: contentObj, - }, - } - - _, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) - if err != nil { - log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) - return err - } - - st.MsgCounter++ - - return nil -} - -func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { - groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) - - groupInfo := &sdkws.GroupInfo{ - GroupID: groupID, - GroupName: groupID, - GroupType: constant.WorkingGroup, - } - - req := group.CreateGroupReq{ - OwnerUserID: userID, - MemberUserIDs: TestTargetUserList, - GroupInfo: groupInfo, - } - - resp := group.CreateGroupResp{} - - response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) - if err != nil { - return "", err - } - - if err := json.Unmarshal(response, &resp); err != nil { - return "", err - } - - st.GroupCounter++ - - return resp.GroupInfo.GroupID, nil -} - -func main() { - var configPath string - // defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") - // flag.StringVar(&configPath, "c", defaultConfigDir, "config path") - flag.StringVar(&configPath, "c", "", "config path") - flag.Parse() - - if configPath == "" { - _, _ = fmt.Fprintln(os.Stderr, "config path is empty") - os.Exit(1) - return - } - - fmt.Printf(" Config Path: %s\n", configPath) - - share, apiConfig, err := initConfig(configPath) - if err != nil { - program.ExitWithError(err) - return - } - - ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) - - ctx, cancel := context.WithCancel(context.Background()) - ch := make(chan struct{}) - - defer cancel() - - st := &StressTest{ - Conf: &conf{ - Share: *share, - Api: *apiConfig, - }, - AdminUserID: share.IMAdminUserID[0], - Ctx: ctx, - Cancel: cancel, - HttpClient: &http.Client{ - Timeout: 50 * time.Second, - }, - } - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - fmt.Println("\nReceived stop signal, stopping...") - - select { - case <-ch: - default: - close(ch) - } - - st.Cancel() - }() - - token, err := st.GetAdminToken(st.Ctx) - if err != nil { - log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) - } - - st.AdminToken = token - fmt.Println("Admin Token:", st.AdminToken) - fmt.Println("ApiAddress:", ApiAddress) - - st.DefaultGroupID = DefaultGroupID - - st.Wg.Add(1) - go func() { - defer st.Wg.Done() - - ticker := time.NewTicker(CreateUserTicker) - defer ticker.Stop() - - for st.UserCounter < MaxUser { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") - return - - case <-ticker.C: - // Create User - userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) - - userCreatedID, err := st.CreateUser(st.Ctx, userID) - if err != nil { - log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) - os.Exit(1) - return - } - // fmt.Println("User Created ID:", userCreatedID) - - // Import Friend - if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { - log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) - os.Exit(1) - return - } - - // Invite To Group - if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { - log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) - os.Exit(1) - return - } - - st.Once.Do(func() { - st.DefaultUserID = userCreatedID - fmt.Println("Default Send User Created ID:", userCreatedID) - close(ch) - }) - } - } - }() - - st.Wg.Add(1) - go func() { - defer st.Wg.Done() - - ticker := time.NewTicker(SendMessageTicker) - defer ticker.Stop() - <-ch - - for { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") - return - - case <-ticker.C: - // Send Message - if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { - log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) - continue - } - } - } - }() - - st.Wg.Add(1) - go func() { - defer st.Wg.Done() - - ticker := time.NewTicker(CreateGroupTicker) - defer ticker.Stop() - <-ch - - for st.GroupCounter < MaxGroup { - - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") - return - - case <-ticker.C: - - // Create Group - _, err := st.CreateGroup(st.Ctx, st.DefaultUserID) - if err != nil { - log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultUserID) - os.Exit(1) - return - } - - // fmt.Println("Group Created ID:", groupID) - } - } - }() - - st.Wg.Wait() -}