From 8158a03b2deb17c8472192ab0c8da46bc0106b2e Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Jun 2023 19:55:47 +0800 Subject: [PATCH 1/6] fix friend --- internal/rpc/friend/friend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 04c24f62d..c341336bc 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -113,7 +113,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.RespondFriendApplyReq) (resp *pbfriend.RespondFriendApplyResp, err error) { defer log.ZInfo(ctx, utils.GetFuncName()+" Return") resp = &pbfriend.RespondFriendApplyResp{} - if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil { + if err := tokenverify.CheckAccessV3(ctx, req.ToUserID); err != nil { return nil, err } From 1598c72794aefaf3d77f1855fef22cba063e98cb Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Jun 2023 20:30:59 +0800 Subject: [PATCH 2/6] conn --- pkg/discoveryregistry/zookeeper/discover.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index cc426c636..0b1a7629e 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -95,9 +95,20 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp return ret, nil } +// func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +// // newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) +// return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(s.options, opts...)...) +// } + func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - // newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) - return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(s.options, opts...)...) + conns, err := s.GetConns(ctx, serviceName, opts...) + if err != nil { + return nil, err + } + if len(conns) == 0 { + return nil, ErrConnIsNil + } + return conns[0], nil } func (s *ZkClient) GetFirstConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { From 1a9df45ef66c162079faf586af9f27ad4129b1ac Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Jun 2023 20:35:08 +0800 Subject: [PATCH 3/6] conn --- pkg/discoveryregistry/zookeeper/discover.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index 0b1a7629e..cc426c636 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -95,20 +95,9 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp return ret, nil } -// func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { -// // newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) -// return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(s.options, opts...)...) -// } - func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - conns, err := s.GetConns(ctx, serviceName, opts...) - if err != nil { - return nil, err - } - if len(conns) == 0 { - return nil, ErrConnIsNil - } - return conns[0], nil + // newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) + return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(s.options, opts...)...) } func (s *ZkClient) GetFirstConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { From 7bf5a629af27b6222ae9290522b03cbc83b04911 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Jun 2023 21:01:30 +0800 Subject: [PATCH 4/6] discov --- pkg/discoveryregistry/zookeeper/discover.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index cc426c636..07a2bce17 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -96,8 +96,8 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp } func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - // newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) - return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(s.options, opts...)...) + newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) + return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) } func (s *ZkClient) GetFirstConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { From ff7b6e200cc4142fe611fdcf99de7cff429a7d9a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 1 Jun 2023 21:28:16 +0800 Subject: [PATCH 5/6] rpc --- pkg/common/db/localcache/conversation.go | 21 ++++---- pkg/rpcclient/black.go | 17 +++---- pkg/rpcclient/conversation.go | 62 +++++++++++++++++------- pkg/rpcclient/friend.go | 29 ++++++----- pkg/rpcclient/group.go | 44 ++++++++++++----- pkg/rpcclient/meta.go | 31 ++++++++++++ pkg/rpcclient/msg.go | 31 ++++++++---- pkg/rpcclient/push.go | 20 +++++--- pkg/rpcclient/user.go | 26 ++++++---- 9 files changed, 197 insertions(+), 84 deletions(-) create mode 100644 pkg/rpcclient/meta.go diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index 9f49dc3b6..734fe2e6d 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -7,14 +7,13 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" - "google.golang.org/grpc" ) type ConversationLocalCache struct { lock sync.Mutex SuperGroupRecvMsgNotNotifyUserIDs map[string]Hash ConversationIDs map[string]Hash - conn *grpc.ClientConn + client discoveryregistry.SvcDiscoveryRegistry } type Hash struct { @@ -23,19 +22,19 @@ type Hash struct { } func NewConversationLocalCache(client discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache { - conn, err := client.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImConversationName) - if err != nil { - panic(err) - } return &ConversationLocalCache{ SuperGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), ConversationIDs: make(map[string]Hash), - conn: conn, + client: client, } } func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - client := conversation.NewConversationClient(g.conn) + conn, err := g.client.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) + if err != nil { + return nil, err + } + client := conversation.NewConversationClient(conn) resp, err := client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{ GroupID: groupID, }) @@ -46,7 +45,11 @@ func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, } func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { - client := conversation.NewConversationClient(g.conn) + conn, err := g.client.GetConn(ctx, config.Config.RpcRegisterName.OpenImConversationName) + if err != nil { + return nil, err + } + client := conversation.NewConversationClient(conn) resp, err := client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{ OwnerUserID: userID, }) diff --git a/pkg/rpcclient/black.go b/pkg/rpcclient/black.go index aaab0b563..161adb40c 100644 --- a/pkg/rpcclient/black.go +++ b/pkg/rpcclient/black.go @@ -6,24 +6,23 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" discoveryRegistry "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend" - "google.golang.org/grpc" ) type BlackClient struct { - conn *grpc.ClientConn + *MetaClient } -func NewBlackClient(discov discoveryRegistry.SvcDiscoveryRegistry) *BlackClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImFriendName) - if err != nil { - panic(err) - } - return &BlackClient{conn: conn} +func NewBlackClient(zk discoveryRegistry.SvcDiscoveryRegistry) *BlackClient { + return &BlackClient{NewMetaClient(zk, config.Config.RpcRegisterName.OpenImFriendName)} } // possibleBlackUserID是否被userID拉黑,也就是是否在userID的黑名单中 func (b *BlackClient) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { - r, err := friend.NewFriendClient(b.conn).IsBlack(ctx, &friend.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}) + cc, err := b.getConn(ctx) + if err != nil { + return false, err + } + r, err := friend.NewFriendClient(cc).IsBlack(ctx, &friend.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}) if err != nil { return false, err } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 1a2e23818..4e3dcede9 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -5,32 +5,36 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" discoveryRegistry "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" - "google.golang.org/grpc" ) type ConversationClient struct { - conn *grpc.ClientConn + *MetaClient } -func NewConversationClient(discov discoveryRegistry.SvcDiscoveryRegistry) *ConversationClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImConversationName) - if err != nil { - panic(err) - } - return &ConversationClient{conn: conn} +func NewConversationClient(zk discoveryRegistry.SvcDiscoveryRegistry) *ConversationClient { + return &ConversationClient{NewMetaClient(zk, config.Config.RpcRegisterName.OpenImConversationName)} } func (c *ConversationClient) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) error { - _, err := pbConversation.NewConversationClient(c.conn).ModifyConversationField(ctx, req) + cc, err := c.getConn(ctx) + if err != nil { + return err + } + _, err = conversation.NewConversationClient(cc).ModifyConversationField(ctx, req) return err } func (c *ConversationClient) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) { - var req pbConversation.GetConversationReq + cc, err := c.getConn(ctx) + if err != nil { + return 0, err + } + var req conversation.GetConversationReq req.OwnerUserID = userID req.ConversationID = conversationID - conversation, err := pbConversation.NewConversationClient(c.conn).GetConversation(ctx, &req) + conversation, err := conversation.NewConversationClient(cc).GetConversation(ctx, &req) if err != nil { return 0, err } @@ -38,31 +42,55 @@ func (c *ConversationClient) GetSingleConversationRecvMsgOpt(ctx context.Context } func (c *ConversationClient) SingleChatFirstCreateConversation(ctx context.Context, recvID, sendID string) error { - _, err := pbConversation.NewConversationClient(c.conn).CreateSingleChatConversations(ctx, &pbConversation.CreateSingleChatConversationsReq{RecvID: recvID, SendID: sendID}) + cc, err := c.getConn(ctx) + if err != nil { + return err + } + _, err = conversation.NewConversationClient(cc).CreateSingleChatConversations(ctx, &pbConversation.CreateSingleChatConversationsReq{RecvID: recvID, SendID: sendID}) return err } func (c *ConversationClient) GroupChatFirstCreateConversation(ctx context.Context, groupID string, userIDs []string) error { - _, err := pbConversation.NewConversationClient(c.conn).CreateGroupChatConversations(ctx, &pbConversation.CreateGroupChatConversationsReq{UserIDs: userIDs, GroupID: groupID}) + cc, err := c.getConn(ctx) + if err != nil { + return err + } + _, err = conversation.NewConversationClient(cc).CreateGroupChatConversations(ctx, &pbConversation.CreateGroupChatConversationsReq{UserIDs: userIDs, GroupID: groupID}) return err } func (c *ConversationClient) DelGroupChatConversations(ctx context.Context, ownerUserIDs []string, groupID string, maxSeq int64) error { - _, err := pbConversation.NewConversationClient(c.conn).DelGroupChatConversations(ctx, &pbConversation.DelGroupChatConversationsReq{OwnerUserID: ownerUserIDs, GroupID: groupID, MaxSeq: maxSeq}) + cc, err := c.getConn(ctx) + if err != nil { + return err + } + _, err = conversation.NewConversationClient(cc).DelGroupChatConversations(ctx, &pbConversation.DelGroupChatConversationsReq{OwnerUserID: ownerUserIDs, GroupID: groupID, MaxSeq: maxSeq}) return err } func (c *ConversationClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { - resp, err := pbConversation.NewConversationClient(c.conn).GetConversationIDs(ctx, &pbConversation.GetConversationIDsReq{UserID: ownerUserID}) + cc, err := c.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := conversation.NewConversationClient(cc).GetConversationIDs(ctx, &pbConversation.GetConversationIDsReq{UserID: ownerUserID}) return resp.ConversationIDs, err } func (c *ConversationClient) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*pbConversation.Conversation, error) { - resp, err := pbConversation.NewConversationClient(c.conn).GetConversation(ctx, &pbConversation.GetConversationReq{OwnerUserID: ownerUserID, ConversationID: conversationID}) + cc, err := c.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := pbConversation.NewConversationClient(cc).GetConversation(ctx, &pbConversation.GetConversationReq{OwnerUserID: ownerUserID, ConversationID: conversationID}) return resp.Conversation, err } func (c *ConversationClient) GetConversationByConversationID(ctx context.Context, conversationID string) (*pbConversation.Conversation, error) { - resp, err := pbConversation.NewConversationClient(c.conn).GetConversationByConversationID(ctx, &pbConversation.GetConversationByConversationIDReq{ConversationID: conversationID}) + cc, err := c.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := pbConversation.NewConversationClient(cc).GetConversationByConversationID(ctx, &pbConversation.GetConversationByConversationIDReq{ConversationID: conversationID}) return resp.Conversation, err } diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index d290cc958..44745e4c2 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -7,23 +7,22 @@ import ( discoveryRegistry "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "google.golang.org/grpc" ) type FriendClient struct { - conn *grpc.ClientConn + *MetaClient } -func NewFriendClient(discov discoveryRegistry.SvcDiscoveryRegistry) *FriendClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImFriendName) - if err != nil { - panic(err) - } - return &FriendClient{conn: conn} +func NewFriendClient(zk discoveryRegistry.SvcDiscoveryRegistry) *FriendClient { + return &FriendClient{NewMetaClient(zk, config.Config.RpcRegisterName.OpenImFriendName)} } func (f *FriendClient) GetFriendsInfo(ctx context.Context, ownerUserID, friendUserID string) (resp *sdkws.FriendInfo, err error) { - r, err := friend.NewFriendClient(f.conn).GetDesignatedFriends(ctx, &friend.GetDesignatedFriendsReq{OwnerUserID: ownerUserID, FriendUserIDs: []string{friendUserID}}) + cc, err := f.getConn(ctx) + if err != nil { + return nil, err + } + r, err := friend.NewFriendClient(cc).GetDesignatedFriends(ctx, &friend.GetDesignatedFriendsReq{OwnerUserID: ownerUserID, FriendUserIDs: []string{friendUserID}}) if err != nil { return nil, err } @@ -33,7 +32,11 @@ func (f *FriendClient) GetFriendsInfo(ctx context.Context, ownerUserID, friendUs // possibleFriendUserID是否在userID的好友中 func (f *FriendClient) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { - resp, err := friend.NewFriendClient(f.conn).IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}) + cc, err := f.getConn(ctx) + if err != nil { + return false, err + } + resp, err := friend.NewFriendClient(cc).IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}) if err != nil { return false, err } @@ -42,8 +45,12 @@ func (f *FriendClient) IsFriend(ctx context.Context, possibleFriendUserID, userI } func (f *FriendClient) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { + cc, err := f.getConn(ctx) + if err != nil { + return nil, err + } req := friend.GetFriendIDsReq{UserID: ownerUserID} - resp, err := friend.NewFriendClient(f.conn).GetFriendIDs(ctx, &req) + resp, err := friend.NewFriendClient(cc).GetFriendIDs(ctx, &req) if err != nil { return nil, err } diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 8be048d08..f480a95b4 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -11,23 +11,27 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "google.golang.org/grpc" ) type GroupClient struct { - conn *grpc.ClientConn + MetaClient } -func NewGroupClient(discov discoveryregistry.SvcDiscoveryRegistry) *GroupClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImGroupName) - if err != nil { - panic(err) +func NewGroupClient(client discoveryregistry.SvcDiscoveryRegistry) *GroupClient { + return &GroupClient{ + MetaClient: MetaClient{ + client: client, + rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName, + }, } - return &GroupClient{conn: conn} } func (g *GroupClient) GetGroupInfos(ctx context.Context, groupIDs []string, complete bool) ([]*sdkws.GroupInfo, error) { - resp, err := group.NewGroupClient(g.conn).GetGroupsInfo(ctx, &group.GetGroupsInfoReq{ + cc, err := g.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := group.NewGroupClient(cc).GetGroupsInfo(ctx, &group.GetGroupsInfoReq{ GroupIDs: groupIDs, }) if err != nil { @@ -62,7 +66,11 @@ func (g *GroupClient) GetGroupInfoMap(ctx context.Context, groupIDs []string, co } func (g *GroupClient) GetGroupMemberInfos(ctx context.Context, groupID string, userIDs []string, complete bool) ([]*sdkws.GroupMemberFullInfo, error) { - resp, err := group.NewGroupClient(g.conn).GetGroupMembersInfo(ctx, &group.GetGroupMembersInfoReq{ + cc, err := g.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := group.NewGroupClient(cc).GetGroupMembersInfo(ctx, &group.GetGroupMembersInfoReq{ GroupID: groupID, UserIDs: userIDs, }) @@ -98,7 +106,11 @@ func (g *GroupClient) GetGroupMemberInfoMap(ctx context.Context, groupID string, } func (g *GroupClient) GetOwnerAndAdminInfos(ctx context.Context, groupID string) ([]*sdkws.GroupMemberFullInfo, error) { - resp, err := group.NewGroupClient(g.conn).GetGroupMemberRoleLevel(ctx, &group.GetGroupMemberRoleLevelReq{ + cc, err := g.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := group.NewGroupClient(cc).GetGroupMemberRoleLevel(ctx, &group.GetGroupMemberRoleLevelReq{ GroupID: groupID, RoleLevels: []int32{constant.GroupOwner, constant.GroupAdmin}, }) @@ -109,7 +121,11 @@ func (g *GroupClient) GetOwnerAndAdminInfos(ctx context.Context, groupID string) } func (g *GroupClient) GetOwnerInfo(ctx context.Context, groupID string) (*sdkws.GroupMemberFullInfo, error) { - resp, err := group.NewGroupClient(g.conn).GetGroupMemberRoleLevel(ctx, &group.GetGroupMemberRoleLevelReq{ + cc, err := g.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := group.NewGroupClient(cc).GetGroupMemberRoleLevel(ctx, &group.GetGroupMemberRoleLevelReq{ GroupID: groupID, RoleLevels: []int32{constant.GroupOwner}, }) @@ -117,7 +133,11 @@ func (g *GroupClient) GetOwnerInfo(ctx context.Context, groupID string) (*sdkws. } func (g *GroupClient) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := group.NewGroupClient(g.conn).GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{ + cc, err := g.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := group.NewGroupClient(cc).GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{ GroupID: groupID, }) if err != nil { diff --git a/pkg/rpcclient/meta.go b/pkg/rpcclient/meta.go new file mode 100644 index 000000000..00bd81737 --- /dev/null +++ b/pkg/rpcclient/meta.go @@ -0,0 +1,31 @@ +package rpcclient + +import ( + "context" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "google.golang.org/grpc" +) + +type MetaClient struct { + // contains filtered or unexported fields + client discoveryregistry.SvcDiscoveryRegistry + rpcRegisterName string +} + +func NewMetaClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string, opts ...MetaClientOptions) *MetaClient { + c := &MetaClient{ + client: client, + rpcRegisterName: rpcRegisterName, + } + for _, opt := range opts { + opt(c) + } + return c +} + +type MetaClientOptions func(*MetaClient) + +func (m *MetaClient) getConn(ctx context.Context) (*grpc.ClientConn, error) { + return m.client.GetConn(ctx, m.rpcRegisterName) +} diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index cf9dd557a..e94b0a0d5 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -100,33 +100,46 @@ func newSessionTypeConf() map[int32]int32 { type MsgClient struct { conn *grpc.ClientConn + *MetaClient } func NewMsgClient(discov discoveryregistry.SvcDiscoveryRegistry) *MsgClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImMsgName) - if err != nil { - panic(err) - } - return &MsgClient{conn: conn} + return &MsgClient{MetaClient: NewMetaClient(discov, config.Config.RpcRegisterName.OpenImMsgName)} } func (m *MsgClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { - resp, err := msg.NewMsgClient(m.conn).SendMsg(ctx, req) + cc, err := m.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := msg.NewMsgClient(cc).SendMsg(ctx, req) return resp, err } func (m *MsgClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { - resp, err := msg.NewMsgClient(m.conn).GetMaxSeq(ctx, req) + cc, err := m.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := msg.NewMsgClient(cc).GetMaxSeq(ctx, req) return resp, err } func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { - resp, err := msg.NewMsgClient(m.conn).PullMessageBySeqs(ctx, req) + cc, err := m.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := msg.NewMsgClient(cc).PullMessageBySeqs(ctx, req) return resp, err } func (m *MsgClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) { - resp, err := msg.NewMsgClient(m.conn).GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID}) + cc, err := m.getConn(ctx) + if err != nil { + return 0, err + } + resp, err := msg.NewMsgClient(cc).GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID}) if err != nil { return 0, err } diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go index 2640535f6..53cc2bee8 100644 --- a/pkg/rpcclient/push.go +++ b/pkg/rpcclient/push.go @@ -6,23 +6,27 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" - "google.golang.org/grpc" ) type PushClient struct { - conn *grpc.ClientConn + MetaClient } -func NewPushClient(discov discoveryregistry.SvcDiscoveryRegistry) *PushClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImPushName) - if err != nil { - panic(err) +func NewPushClient(client discoveryregistry.SvcDiscoveryRegistry) *PushClient { + return &PushClient{ + MetaClient: MetaClient{ + client: client, + rpcRegisterName: config.Config.RpcRegisterName.OpenImPushName, + }, } - return &PushClient{conn: conn} } func (p *PushClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { - resp, err := push.NewPushMsgServiceClient(p.conn).DelUserPushToken(ctx, req) + cc, err := p.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := push.NewPushMsgServiceClient(cc).DelUserPushToken(ctx, req) if err != nil { return nil, err } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 3dcb95572..6324edd47 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -11,23 +11,27 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/user" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "google.golang.org/grpc" ) type UserClient struct { - conn *grpc.ClientConn + MetaClient } -func NewUserClient(discov discoveryregistry.SvcDiscoveryRegistry) *UserClient { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImUserName) - if err != nil { - panic(err) +func NewUserClient(client discoveryregistry.SvcDiscoveryRegistry) *UserClient { + return &UserClient{ + MetaClient: MetaClient{ + client: client, + rpcRegisterName: config.Config.RpcRegisterName.OpenImUserName, + }, } - return &UserClient{conn: conn} } func (u *UserClient) GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { - resp, err := user.NewUserClient(u.conn).GetDesignateUsers(ctx, &user.GetDesignateUsersReq{ + cc, err := u.getConn(ctx) + if err != nil { + return nil, err + } + resp, err := user.NewUserClient(cc).GetDesignateUsers(ctx, &user.GetDesignateUsersReq{ UserIDs: userIDs, }) if err != nil { @@ -93,7 +97,11 @@ func (u *UserClient) GetPublicUserInfoMap(ctx context.Context, userIDs []string, } func (u *UserClient) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (int32, error) { - resp, err := user.NewUserClient(u.conn).GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{ + cc, err := u.getConn(ctx) + if err != nil { + return 0, err + } + resp, err := user.NewUserClient(cc).GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{ UserID: userID, }) if err != nil { From 826b6113f8643233169a30f86aeca2622671c3b0 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 2 Jun 2023 14:40:22 +0800 Subject: [PATCH 6/6] refactor: remove limit --- internal/rpc/msg/revoke.go | 3 --- pkg/errs/coderr.go | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 111ac074a..b6b0daf9d 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -40,9 +40,6 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if len(msgs) == 0 || msgs[0] == nil { return nil, errs.ErrRecordNotFound.Wrap("msg not found") } - if msgs[0].SendID == "" || msgs[0].RecvID == "" { - return nil, errs.ErrRecordNotFound.Wrap("sendID or recvID is empty") - } // todo: 判断是否已经撤回 data, _ := json.Marshal(msgs[0]) log.ZInfo(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data)) diff --git a/pkg/errs/coderr.go b/pkg/errs/coderr.go index 445ee0ddd..b8371bd5b 100644 --- a/pkg/errs/coderr.go +++ b/pkg/errs/coderr.go @@ -82,7 +82,7 @@ func (e *codeError) Is(err error, loose ...bool) bool { } func (e *codeError) Error() string { - return fmt.Sprintf("[%d]%s", e.code, e.msg) + return fmt.Sprintf("%s", e.msg) } func Unwrap(err error) error {