From dea9a4c8fa87bc24cfc86edb5b7de0611ef9373f Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 14 Feb 2023 19:34:19 +0800 Subject: [PATCH] Error code standardization --- cmd/rpc/auth/main.go | 21 +--- cmd/rpc/friend/main.go | 19 +--- cmd/rpc/user/main.go | 21 +--- go.mod | 2 +- internal/common/convert/convert.go | 22 ++-- internal/rpc/auth/auth.go | 77 +++++-------- internal/rpc/friend/friend.go | 129 +++++++-------------- internal/rpc/user/user.go | 174 +++-------------------------- 8 files changed, 105 insertions(+), 360 deletions(-) diff --git a/cmd/rpc/auth/main.go b/cmd/rpc/auth/main.go index b9485454d..4625fa2d0 100644 --- a/cmd/rpc/auth/main.go +++ b/cmd/rpc/auth/main.go @@ -1,26 +1,11 @@ package main import ( - rpcAuth "Open_IM/internal/rpc/auth" + "Open_IM/internal/rpc/auth" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImAuthPort - rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port") - flag.Parse() - fmt.Println("start auth rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + startrpc.Start(config.Config.RpcPort.OpenImAuthPort[0], config.Config.RpcRegisterName.OpenImAuthName, config.Config.Prometheus.AuthPrometheusPort[0], auth.Start) } diff --git a/cmd/rpc/friend/main.go b/cmd/rpc/friend/main.go index 3b88d6964..907ee7dbd 100644 --- a/cmd/rpc/friend/main.go +++ b/cmd/rpc/friend/main.go @@ -2,25 +2,10 @@ package main import ( "Open_IM/internal/rpc/friend" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImFriendPort - rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port") - flag.Parse() - fmt.Println("start friend rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := friend.NewFriendServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + startrpc.Start(config.Config.RpcPort.OpenImFriendPort[0], config.Config.RpcRegisterName.OpenImFriendName, config.Config.Prometheus.FriendPrometheusPort[0], friend.Start) } diff --git a/cmd/rpc/user/main.go b/cmd/rpc/user/main.go index f6191280f..331daa8f4 100644 --- a/cmd/rpc/user/main.go +++ b/cmd/rpc/user/main.go @@ -2,25 +2,12 @@ package main import ( "Open_IM/internal/rpc/user" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImUserPort - rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port") - flag.Parse() - fmt.Println("start user rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := user.NewUserServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + + startrpc.Start(config.Config.RpcPort.OpenImUserPort[0], config.Config.RpcRegisterName.OpenImUserName, config.Config.Prometheus.UserPrometheusPort[0], user.Start) + } diff --git a/go.mod b/go.mod index 5781ef0f5..238c77914 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe github.com/swaggo/gin-swagger v1.5.0 - github.com/swaggo/swag v1.8.3 + github.com/swaggo/swag v1.8.3 // indirect github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca go.etcd.io/etcd/client/v3 v3.5.6 // indirect go.mongodb.org/mongo-driver v1.8.3 diff --git a/internal/common/convert/convert.go b/internal/common/convert/convert.go index 5b344770e..861027167 100644 --- a/internal/common/convert/convert.go +++ b/internal/common/convert/convert.go @@ -46,27 +46,27 @@ func (db *DBFriend) DB2PB(ctx context.Context, friends []*relation.FriendModel) return nil, err } for _, v := range friends { - pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} - utils.CopyStructFields(pbFriend, users[v.OwnerUserID]) - utils.CopyStructFields(pbFriend.FriendUser, users[v.FriendUserID]) - pbFriend.CreateTime = v.CreateTime.Unix() - pbFriend.FriendUser.CreateTime = v.CreateTime.Unix() + pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} + utils.CopyStructFields(pbfriend, users[v.OwnerUserID]) + utils.CopyStructFields(pbfriend.FriendUser, users[v.FriendUserID]) + pbfriend.CreateTime = v.CreateTime.Unix() + pbfriend.FriendUser.CreateTime = v.CreateTime.Unix() } return } func (db *DBFriend) Convert(ctx context.Context) (*sdk.FriendInfo, error) { - pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} - utils.CopyStructFields(pbFriend, db) + pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} + utils.CopyStructFields(pbfriend, db) user, err := db.userCheck.GetUsersInfo(ctx, db.FriendUserID) if err != nil { return nil, err } - utils.CopyStructFields(pbFriend.FriendUser, user) - pbFriend.CreateTime = db.CreateTime.Unix() + utils.CopyStructFields(pbfriend.FriendUser, user) + pbfriend.CreateTime = db.CreateTime.Unix() - pbFriend.FriendUser.CreateTime = db.CreateTime.Unix() - return pbFriend, nil + pbfriend.FriendUser.CreateTime = db.CreateTime.Unix() + return pbfriend, nil } func (pb *PBFriend) Convert() (*relation.FriendModel, error) { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 603f60c22..e3a3bea04 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -2,68 +2,45 @@ package auth import ( "Open_IM/internal/common/check" - "Open_IM/internal/common/rpcserver" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/controller" + "Open_IM/pkg/common/db/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/log" - promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" + discoveryRegistry "Open_IM/pkg/discoveryregistry" pbAuth "Open_IM/pkg/proto/auth" pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/utils" "context" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/OpenIMSDK/openKeeper" "google.golang.org/grpc" ) -func NewRpcAuthServer(port int) *rpcAuth { - r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() if err != nil { - panic(err) + return err } - var redis cache.RedisClient - redis.InitRedis() - return &rpcAuth{ - RpcServer: r, - AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), + if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil { + return err } + redis, err := cache.NewRedis() + if err != nil { + return err + } + pbAuth.RegisterAuthServer(server, &authServer{ + userCheck: check.NewUserCheck(client), + RegisterCenter: client, + AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), + }) + return nil } -func (s *rpcAuth) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "rpc auth start...") - listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "listen network success ", listener, address) - var grpcOpts []grpc.ServerOption - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - promePkg.NewUserRegisterCounter() - promePkg.NewUserLoginCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - pbAuth.RegisterAuthServer(srv, s) - err = srv.Serve(listener) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "rpc auth ok") -} - -func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) { +func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) { resp := pbAuth.UserTokenResp{} if _, err := s.userCheck.GetUsersInfo(ctx, req.UserID); err != nil { return nil, err @@ -77,7 +54,7 @@ func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbA return &resp, nil } -func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { +func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { claims, err = tokenverify.GetClaimFromToken(tokensString) if err != nil { return nil, utils.Wrap(err, "") @@ -102,7 +79,7 @@ func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims * return nil, constant.ErrTokenNotExist.Wrap() } -func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) { +func (s *authServer) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) { resp = &pbAuth.ParseTokenResp{} claims, err := s.parseToken(ctx, req.Token) if err != nil { @@ -114,7 +91,7 @@ func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (re return resp, nil } -func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) { +func (s *authServer) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) { resp := pbAuth.ForceLogoutResp{} if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err @@ -125,7 +102,7 @@ func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) ( return &resp, nil } -func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { +func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName) if err != nil { return err @@ -140,8 +117,8 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in return constant.ErrInternalServer.Wrap() } -type rpcAuth struct { - *rpcserver.RpcServer +type authServer struct { controller.AuthInterface - userCheck *check.UserCheck + userCheck *check.UserCheck + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 7a350283a..734e1d7ed 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -4,100 +4,49 @@ import ( "Open_IM/internal/common/check" "Open_IM/internal/common/convert" "Open_IM/internal/common/notification" - "Open_IM/internal/common/rpcserver" - "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" relationTb "Open_IM/pkg/common/db/table/relation" - "Open_IM/pkg/common/log" - "Open_IM/pkg/common/middleware" - promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" - pbFriend "Open_IM/pkg/proto/friend" + discoveryRegistry "Open_IM/pkg/discoveryregistry" + pbfriend "Open_IM/pkg/proto/friend" "Open_IM/pkg/utils" "context" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/OpenIMSDK/openKeeper" "google.golang.org/grpc" ) type friendServer struct { - *rpcserver.RpcServer controller.FriendInterface controller.BlackInterface - notification *notification.Check - userCheck *check.UserCheck + notification *notification.Check + userCheck *check.UserCheck + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry } -func NewFriendServer(port int) *friendServer { - r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() if err != nil { - panic(err) + return err } - //mysql init - var mysql relation.Mysql - var model relation.FriendGorm - err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendModel{}) - if err != nil { - panic("db init err:" + err.Error()) - } - err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendRequestModel{}) - if err != nil { - panic("db init err:" + err.Error()) - } - err = mysql.InitConn().AutoMigrateModel(&relationTb.BlackModel{}) - if err != nil { - panic("db init err:" + err.Error()) - } - if mysql.GormConn() != nil { - model.DB = mysql.GormConn() - } else { - panic("db init err:" + "conn is nil") - } - return &friendServer{ - RpcServer: r, - FriendInterface: controller.NewFriendController(model.DB), - BlackInterface: controller.NewBlackController(model.DB), - } -} - -func (s *friendServer) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "friendServer run...") - listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) - if err != nil { - panic(err) - } - - log.NewInfo(operationID, "listen ok ", address) - defer listener.Close() - //grpc server - var grpcOpts []grpc.ServerOption - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(middleware.RpcServerInterceptor)) - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - pbFriend.RegisterFriendServer(srv, s) - err = srv.Serve(listener) - if err != nil { - log.NewError(operationID, "Serve failed ", err.Error(), listener) - return + if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil { + return err } + pbfriend.RegisterFriendServer(server, &friendServer{ + FriendInterface: controller.NewFriendController(mysql), + BlackInterface: controller.NewBlackController(mysql), + notification: notification.NewCheck(client), + userCheck: check.NewUserCheck(client), + RegisterCenter: client, + }) + return nil } // ok -func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.ApplyToAddFriendReq) (resp *pbFriend.ApplyToAddFriendResp, err error) { - resp = &pbFriend.ApplyToAddFriendResp{} +func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) { + resp = &pbfriend.ApplyToAddFriendResp{} if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil { return nil, err } @@ -125,8 +74,8 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.Apply } // ok -func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFriendReq) (resp *pbFriend.ImportFriendResp, err error) { - resp = &pbFriend.ImportFriendResp{} +func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFriendReq) (resp *pbfriend.ImportFriendResp, err error) { + resp = &pbfriend.ImportFriendResp{} if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err } @@ -148,8 +97,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFr } // ok -func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.RespondFriendApplyReq) (resp *pbFriend.RespondFriendApplyResp, err error) { - resp = &pbFriend.RespondFriendApplyResp{} +func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.RespondFriendApplyReq) (resp *pbfriend.RespondFriendApplyResp, err error) { + resp = &pbfriend.RespondFriendApplyResp{} if err := s.userCheck.Access(ctx, req.ToUserID); err != nil { return nil, err } @@ -174,8 +123,8 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.Res } // ok -func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFriendReq) (resp *pbFriend.DeleteFriendResp, err error) { - resp = &pbFriend.DeleteFriendResp{} +func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendReq) (resp *pbfriend.DeleteFriendResp, err error) { + resp = &pbfriend.DeleteFriendResp{} if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil { return nil, err } @@ -191,8 +140,8 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri } // ok -func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFriendRemarkReq) (resp *pbFriend.SetFriendRemarkResp, err error) { - resp = &pbFriend.SetFriendRemarkResp{} +func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) { + resp = &pbfriend.SetFriendRemarkResp{} if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil { return nil, err } @@ -208,8 +157,8 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri } // ok -func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) { - resp = &pbFriend.GetDesignatedFriendsResp{} +func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) { + resp = &pbfriend.GetDesignatedFriendsResp{} if err := s.userCheck.Access(ctx, req.UserID); err != nil { return nil, err } @@ -226,8 +175,8 @@ func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.G } // ok 获取接收到的好友申请(即别人主动申请的) -func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyToReq) (resp *pbFriend.GetPaginationFriendsApplyToResp, err error) { - resp = &pbFriend.GetPaginationFriendsApplyToResp{} +func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq) (resp *pbfriend.GetPaginationFriendsApplyToResp, err error) { + resp = &pbfriend.GetPaginationFriendsApplyToResp{} if err := s.userCheck.Access(ctx, req.UserID); err != nil { return nil, err } @@ -244,8 +193,8 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbF } // ok 获取主动发出去的好友申请列表 -func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyFromReq) (resp *pbFriend.GetPaginationFriendsApplyFromResp, err error) { - resp = &pbFriend.GetPaginationFriendsApplyFromResp{} +func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq) (resp *pbfriend.GetPaginationFriendsApplyFromResp, err error) { + resp = &pbfriend.GetPaginationFriendsApplyFromResp{} if err := s.userCheck.Access(ctx, req.UserID); err != nil { return nil, err } @@ -262,8 +211,8 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p } // ok -func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) (resp *pbFriend.IsFriendResp, err error) { - resp = &pbFriend.IsFriendResp{} +func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq) (resp *pbfriend.IsFriendResp, err error) { + resp = &pbfriend.IsFriendResp{} resp.InUser1Friends, resp.InUser2Friends, err = s.FriendInterface.CheckIn(ctx, req.UserID1, req.UserID2) if err != nil { return nil, err @@ -272,8 +221,8 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) } // ok -func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbFriend.GetPaginationFriendsReq) (resp *pbFriend.GetPaginationFriendsResp, err error) { - resp = &pbFriend.GetPaginationFriendsResp{} +func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) { + resp = &pbfriend.GetPaginationFriendsResp{} if utils.Duplicate(req.FriendUserIDs) { return nil, constant.ErrArgs.Wrap("friend userID repeated") } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 658de155d..72ed92759 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -4,184 +4,46 @@ import ( "Open_IM/internal/common/check" "Open_IM/internal/common/convert" "Open_IM/internal/common/notification" - "Open_IM/internal/common/rpcserver" - "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" tablerelation "Open_IM/pkg/common/db/table/relation" - "Open_IM/pkg/common/log" - prome "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" + discoveryRegistry "Open_IM/pkg/discoveryregistry" "Open_IM/pkg/proto/sdkws" pbuser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - + "github.com/OpenIMSDK/openKeeper" "google.golang.org/grpc" ) type userServer struct { - *rpcserver.RpcServer controller.UserInterface - notification *notification.Check - userCheck *check.UserCheck + notification *notification.Check + userCheck *check.UserCheck + ConversationChecker *check.ConversationChecker + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry } -func NewUserServer(port int) *userServer { - r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() if err != nil { - panic(err) + return err } - //mysql init - var mysql relation.Mysql - var model relation.UserGorm - err = mysql.InitConn().AutoMigrateModel(&model) - if err != nil { - panic("db init err:" + err.Error()) + if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil { + return err } - if mysql.GormConn() != nil { - model.DB = mysql.GormConn() - } else { - panic("db init err:" + "conn is nil") - } - return &userServer{RpcServer: r, UserInterface: controller.NewUserController(model.DB)} + pbuser.RegisterUserServer(server, &userServer{ + UserInterface: controller.NewUserController(mysql), + notification: notification.NewCheck(client), + userCheck: check.NewUserCheck(client), + RegisterCenter: client, + }) + return nil } -func (s *userServer) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "rpc user start...") - listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "listen ok ", address) - defer listener.Close() - //grpc server - var grpcOpts []grpc.ServerOption - if config.Config.Prometheus.Enable { - prome.NewGrpcRequestCounter() - prome.NewGrpcRequestFailedCounter() - prome.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - //Service registers with etcd - pbuser.RegisterUserServer(srv, s) - - err = srv.Serve(listener) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "rpc user success") -} - -// ok -//func (s *userServer) SyncJoinedGroupMemberFaceURL(ctx context.Context, userID string, faceURL string, operationID string, opUserID string) { -// members, err := s.GetJoinedGroupMembers(ctx, userID) -// if err != nil { -// return -// } -// groupIDs := make([]string, 0) -// for _, v := range members { -// groupIDs = append(groupIDs, v.GroupID) -// } -// if s.SetGroupMemberInfo(ctx, "", faceURL, "", 0, groupIDs, userID) != nil { -// return -// } -// for _, v := range groupIDs { -// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID) -// } -//} - -// ok -//func (s *userServer) SyncJoinedGroupMemberNickname(ctx context.Context, userID string, newNickname, oldNickname string, operationID string, opUserID string) { -// members, err := s.GetJoinedGroupMembers(ctx, userID) -// if err != nil { -// return -// } -// groupIDs := make([]string, 0) -// for _, v := range members { -// if v.Nickname == oldNickname { -// groupIDs = append(groupIDs, v.GroupID) -// } -// } -// s.SetGroupMemberInfo(ctx, newNickname, "", "", 0, groupIDs, userID) -// for _, v := range groupIDs { -// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID) -// } -//} - -// 设置群头像 -//func (s *userServer) SetGroupMemberInfo(ctx context.Context, nickname, faceURL, ex string, roleLevel int32, groupIDs []string, userID string) (err error) { -// -// req := pbgroup.SetGroupMemberInfo{UserID: userID} -// if nickname != "" { -// req.Nickname = &wrappers.StringValue{Value: nickname} -// } -// if faceURL != "" { -// req.FaceURL = &wrappers.StringValue{Value: faceURL} -// } -// if ex != "" { -// req.Ex = &wrappers.StringValue{Value: ex} -// } -// if roleLevel != 0 { -// req.RoleLevel = &wrappers.Int32Value{Value: roleLevel} -// } -// -// setGroupMemberInfoReq := &pbgroup.SetGroupMemberInfoReq{} -// for _, v := range groupIDs { -// req.GroupID = v -// setGroupMemberInfoReq.Members = append(setGroupMemberInfoReq.Members, &req) -// } -// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName) -// if err != nil { -// return err -// } -// client := group.NewGroupClient(conn) -// _, err = client.SetGroupMemberInfo(ctx, setGroupMemberInfoReq) -// return -//} - -// 获取加入的群成员信息 -//func (s *userServer) GetJoinedGroupMembers(ctx context.Context, userID string) (members []*sdkws.GroupMemberFullInfo, err error) { -// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName) -// if err != nil { -// return nil, err -// } -// -// client := group.NewGroupClient(conn) -// for { -// idx := int32(0) -// req := pbgroup.GetJoinedGroupListReq{FromUserID: userID, Pagination: &sdkws.RequestPagination{PageNumber: idx, ShowNumber: constant.ShowNumber}} -// resp, err := client.GetJoinedGroupList(ctx, &req) -// if err != nil { -// return nil, err -// } -// groupIDs := make([]string, 0) -// -// for _, v := range resp.Groups { -// groupIDs = append(groupIDs, v.GroupID) -// } -// -// client.GetGroupMembersInfo() -// -// if len(resp.Groups) < constant.ShowNumber { -// break -// } -// idx++ -// } -// -// return -//} - // ok func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) { resp = &pbuser.GetDesignateUsersResp{}