diff --git a/cmd/rpc/auth/main.go b/cmd/rpc/auth/main.go index b9485454d..a89ff6845 100644 --- a/cmd/rpc/auth/main.go +++ b/cmd/rpc/auth/main.go @@ -1,26 +1,11 @@ package main import ( - rpcAuth "Open_IM/internal/rpc/auth" + "Open_IM/internal/rpc/auth" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImAuthPort - rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port") - flag.Parse() - fmt.Println("start auth rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + startrpc.Start(config.Config.RpcPort.OpenImAuthPort, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Prometheus.AuthPrometheusPort, auth.Start) } diff --git a/cmd/rpc/friend/main.go b/cmd/rpc/friend/main.go index 3b88d6964..907ee7dbd 100644 --- a/cmd/rpc/friend/main.go +++ b/cmd/rpc/friend/main.go @@ -2,25 +2,10 @@ package main import ( "Open_IM/internal/rpc/friend" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImFriendPort - rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port") - flag.Parse() - fmt.Println("start friend rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := friend.NewFriendServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + startrpc.Start(config.Config.RpcPort.OpenImFriendPort[0], config.Config.RpcRegisterName.OpenImFriendName, config.Config.Prometheus.FriendPrometheusPort[0], friend.Start) } diff --git a/cmd/rpc/group/main.go b/cmd/rpc/group/main.go index 2ccae9e6b..89451b6ef 100644 --- a/cmd/rpc/group/main.go +++ b/cmd/rpc/group/main.go @@ -7,5 +7,5 @@ import ( ) func main() { - startrpc.Start(config.Config.RpcPort.OpenImGroupPort[0], config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort[0], group.Start) + startrpc.Start(config.Config.RpcPort.OpenImGroupPort, config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort, group.Start) } diff --git a/cmd/rpc/msg/main.go b/cmd/rpc/msg/main.go index c9bcecf9c..7900b1d4e 100644 --- a/cmd/rpc/msg/main.go +++ b/cmd/rpc/msg/main.go @@ -2,25 +2,10 @@ package main import ( "Open_IM/internal/rpc/msg" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImMessagePort - rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessagePrometheusPort[0], "msgPrometheusPort default listen port") - flag.Parse() - fmt.Println("start msg rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := msg.NewRpcChatServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + startrpc.Start(config.Config.RpcPort.OpenImMessagePort, config.Config.RpcRegisterName.OpenImMsgName, config.Config.Prometheus.AuthPrometheusPort, msg.Start) } diff --git a/cmd/rpc/user/main.go b/cmd/rpc/user/main.go index f6191280f..331daa8f4 100644 --- a/cmd/rpc/user/main.go +++ b/cmd/rpc/user/main.go @@ -2,25 +2,12 @@ package main import ( "Open_IM/internal/rpc/user" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImUserPort - rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port") - flag.Parse() - fmt.Println("start user rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := user.NewUserServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + + startrpc.Start(config.Config.RpcPort.OpenImUserPort[0], config.Config.RpcRegisterName.OpenImUserName, config.Config.Prometheus.UserPrometheusPort[0], user.Start) + } diff --git a/go.mod b/go.mod index 5781ef0f5..238c77914 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe github.com/swaggo/gin-swagger v1.5.0 - github.com/swaggo/swag v1.8.3 + github.com/swaggo/swag v1.8.3 // indirect github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca go.etcd.io/etcd/client/v3 v3.5.6 // indirect go.mongodb.org/mongo-driver v1.8.3 diff --git a/internal/common/convert/convert.go b/internal/common/convert/convert.go index 5b344770e..861027167 100644 --- a/internal/common/convert/convert.go +++ b/internal/common/convert/convert.go @@ -46,27 +46,27 @@ func (db *DBFriend) DB2PB(ctx context.Context, friends []*relation.FriendModel) return nil, err } for _, v := range friends { - pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} - utils.CopyStructFields(pbFriend, users[v.OwnerUserID]) - utils.CopyStructFields(pbFriend.FriendUser, users[v.FriendUserID]) - pbFriend.CreateTime = v.CreateTime.Unix() - pbFriend.FriendUser.CreateTime = v.CreateTime.Unix() + pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} + utils.CopyStructFields(pbfriend, users[v.OwnerUserID]) + utils.CopyStructFields(pbfriend.FriendUser, users[v.FriendUserID]) + pbfriend.CreateTime = v.CreateTime.Unix() + pbfriend.FriendUser.CreateTime = v.CreateTime.Unix() } return } func (db *DBFriend) Convert(ctx context.Context) (*sdk.FriendInfo, error) { - pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} - utils.CopyStructFields(pbFriend, db) + pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}} + utils.CopyStructFields(pbfriend, db) user, err := db.userCheck.GetUsersInfo(ctx, db.FriendUserID) if err != nil { return nil, err } - utils.CopyStructFields(pbFriend.FriendUser, user) - pbFriend.CreateTime = db.CreateTime.Unix() + utils.CopyStructFields(pbfriend.FriendUser, user) + pbfriend.CreateTime = db.CreateTime.Unix() - pbFriend.FriendUser.CreateTime = db.CreateTime.Unix() - return pbFriend, nil + pbfriend.FriendUser.CreateTime = db.CreateTime.Unix() + return pbfriend, nil } func (pb *PBFriend) Convert() (*relation.FriendModel, error) { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 603f60c22..e3a3bea04 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -2,68 +2,45 @@ package auth import ( "Open_IM/internal/common/check" - "Open_IM/internal/common/rpcserver" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/controller" + "Open_IM/pkg/common/db/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/log" - promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" + discoveryRegistry "Open_IM/pkg/discoveryregistry" pbAuth "Open_IM/pkg/proto/auth" pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/utils" "context" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/OpenIMSDK/openKeeper" "google.golang.org/grpc" ) -func NewRpcAuthServer(port int) *rpcAuth { - r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() if err != nil { - panic(err) + return err } - var redis cache.RedisClient - redis.InitRedis() - return &rpcAuth{ - RpcServer: r, - AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), + if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil { + return err } + redis, err := cache.NewRedis() + if err != nil { + return err + } + pbAuth.RegisterAuthServer(server, &authServer{ + userCheck: check.NewUserCheck(client), + RegisterCenter: client, + AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), + }) + return nil } -func (s *rpcAuth) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "rpc auth start...") - listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "listen network success ", listener, address) - var grpcOpts []grpc.ServerOption - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - promePkg.NewUserRegisterCounter() - promePkg.NewUserLoginCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - pbAuth.RegisterAuthServer(srv, s) - err = srv.Serve(listener) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "rpc auth ok") -} - -func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) { +func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) { resp := pbAuth.UserTokenResp{} if _, err := s.userCheck.GetUsersInfo(ctx, req.UserID); err != nil { return nil, err @@ -77,7 +54,7 @@ func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbA return &resp, nil } -func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { +func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { claims, err = tokenverify.GetClaimFromToken(tokensString) if err != nil { return nil, utils.Wrap(err, "") @@ -102,7 +79,7 @@ func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims * return nil, constant.ErrTokenNotExist.Wrap() } -func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) { +func (s *authServer) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) { resp = &pbAuth.ParseTokenResp{} claims, err := s.parseToken(ctx, req.Token) if err != nil { @@ -114,7 +91,7 @@ func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (re return resp, nil } -func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) { +func (s *authServer) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) { resp := pbAuth.ForceLogoutResp{} if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err @@ -125,7 +102,7 @@ func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) ( return &resp, nil } -func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { +func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName) if err != nil { return err @@ -140,8 +117,8 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in return constant.ErrInternalServer.Wrap() } -type rpcAuth struct { - *rpcserver.RpcServer +type authServer struct { controller.AuthInterface - userCheck *check.UserCheck + userCheck *check.UserCheck + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 7a350283a..734e1d7ed 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -4,100 +4,49 @@ import ( "Open_IM/internal/common/check" "Open_IM/internal/common/convert" "Open_IM/internal/common/notification" - "Open_IM/internal/common/rpcserver" - "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" relationTb "Open_IM/pkg/common/db/table/relation" - "Open_IM/pkg/common/log" - "Open_IM/pkg/common/middleware" - promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" - pbFriend "Open_IM/pkg/proto/friend" + discoveryRegistry "Open_IM/pkg/discoveryregistry" + pbfriend "Open_IM/pkg/proto/friend" "Open_IM/pkg/utils" "context" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/OpenIMSDK/openKeeper" "google.golang.org/grpc" ) type friendServer struct { - *rpcserver.RpcServer controller.FriendInterface controller.BlackInterface - notification *notification.Check - userCheck *check.UserCheck + notification *notification.Check + userCheck *check.UserCheck + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry } -func NewFriendServer(port int) *friendServer { - r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() if err != nil { - panic(err) + return err } - //mysql init - var mysql relation.Mysql - var model relation.FriendGorm - err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendModel{}) - if err != nil { - panic("db init err:" + err.Error()) - } - err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendRequestModel{}) - if err != nil { - panic("db init err:" + err.Error()) - } - err = mysql.InitConn().AutoMigrateModel(&relationTb.BlackModel{}) - if err != nil { - panic("db init err:" + err.Error()) - } - if mysql.GormConn() != nil { - model.DB = mysql.GormConn() - } else { - panic("db init err:" + "conn is nil") - } - return &friendServer{ - RpcServer: r, - FriendInterface: controller.NewFriendController(model.DB), - BlackInterface: controller.NewBlackController(model.DB), - } -} - -func (s *friendServer) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "friendServer run...") - listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) - if err != nil { - panic(err) - } - - log.NewInfo(operationID, "listen ok ", address) - defer listener.Close() - //grpc server - var grpcOpts []grpc.ServerOption - grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(middleware.RpcServerInterceptor)) - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - pbFriend.RegisterFriendServer(srv, s) - err = srv.Serve(listener) - if err != nil { - log.NewError(operationID, "Serve failed ", err.Error(), listener) - return + if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil { + return err } + pbfriend.RegisterFriendServer(server, &friendServer{ + FriendInterface: controller.NewFriendController(mysql), + BlackInterface: controller.NewBlackController(mysql), + notification: notification.NewCheck(client), + userCheck: check.NewUserCheck(client), + RegisterCenter: client, + }) + return nil } // ok -func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.ApplyToAddFriendReq) (resp *pbFriend.ApplyToAddFriendResp, err error) { - resp = &pbFriend.ApplyToAddFriendResp{} +func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) { + resp = &pbfriend.ApplyToAddFriendResp{} if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil { return nil, err } @@ -125,8 +74,8 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.Apply } // ok -func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFriendReq) (resp *pbFriend.ImportFriendResp, err error) { - resp = &pbFriend.ImportFriendResp{} +func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFriendReq) (resp *pbfriend.ImportFriendResp, err error) { + resp = &pbfriend.ImportFriendResp{} if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err } @@ -148,8 +97,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFr } // ok -func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.RespondFriendApplyReq) (resp *pbFriend.RespondFriendApplyResp, err error) { - resp = &pbFriend.RespondFriendApplyResp{} +func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.RespondFriendApplyReq) (resp *pbfriend.RespondFriendApplyResp, err error) { + resp = &pbfriend.RespondFriendApplyResp{} if err := s.userCheck.Access(ctx, req.ToUserID); err != nil { return nil, err } @@ -174,8 +123,8 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.Res } // ok -func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFriendReq) (resp *pbFriend.DeleteFriendResp, err error) { - resp = &pbFriend.DeleteFriendResp{} +func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendReq) (resp *pbfriend.DeleteFriendResp, err error) { + resp = &pbfriend.DeleteFriendResp{} if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil { return nil, err } @@ -191,8 +140,8 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri } // ok -func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFriendRemarkReq) (resp *pbFriend.SetFriendRemarkResp, err error) { - resp = &pbFriend.SetFriendRemarkResp{} +func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) { + resp = &pbfriend.SetFriendRemarkResp{} if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil { return nil, err } @@ -208,8 +157,8 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri } // ok -func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) { - resp = &pbFriend.GetDesignatedFriendsResp{} +func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) { + resp = &pbfriend.GetDesignatedFriendsResp{} if err := s.userCheck.Access(ctx, req.UserID); err != nil { return nil, err } @@ -226,8 +175,8 @@ func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.G } // ok 获取接收到的好友申请(即别人主动申请的) -func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyToReq) (resp *pbFriend.GetPaginationFriendsApplyToResp, err error) { - resp = &pbFriend.GetPaginationFriendsApplyToResp{} +func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq) (resp *pbfriend.GetPaginationFriendsApplyToResp, err error) { + resp = &pbfriend.GetPaginationFriendsApplyToResp{} if err := s.userCheck.Access(ctx, req.UserID); err != nil { return nil, err } @@ -244,8 +193,8 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbF } // ok 获取主动发出去的好友申请列表 -func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyFromReq) (resp *pbFriend.GetPaginationFriendsApplyFromResp, err error) { - resp = &pbFriend.GetPaginationFriendsApplyFromResp{} +func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq) (resp *pbfriend.GetPaginationFriendsApplyFromResp, err error) { + resp = &pbfriend.GetPaginationFriendsApplyFromResp{} if err := s.userCheck.Access(ctx, req.UserID); err != nil { return nil, err } @@ -262,8 +211,8 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p } // ok -func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) (resp *pbFriend.IsFriendResp, err error) { - resp = &pbFriend.IsFriendResp{} +func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq) (resp *pbfriend.IsFriendResp, err error) { + resp = &pbfriend.IsFriendResp{} resp.InUser1Friends, resp.InUser2Friends, err = s.FriendInterface.CheckIn(ctx, req.UserID1, req.UserID2) if err != nil { return nil, err @@ -272,8 +221,8 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) } // ok -func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbFriend.GetPaginationFriendsReq) (resp *pbFriend.GetPaginationFriendsResp, err error) { - resp = &pbFriend.GetPaginationFriendsResp{} +func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) { + resp = &pbfriend.GetPaginationFriendsResp{} if utils.Duplicate(req.FriendUserIDs) { return nil, constant.ErrArgs.Wrap("friend userID repeated") } diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index 0c174b8bf..47fe90da4 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -12,6 +12,7 @@ import ( "Open_IM/pkg/utils" "context" "google.golang.org/protobuf/types/known/wrapperspb" + "time" ) func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) { @@ -21,72 +22,44 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( defer func() { tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "req", req) }() - operationID := tracelog.GetOperationID(ctx) - commonCallbackReq := &callbackstruct.CallbackBeforeCreateGroupReq{ + cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{ CallbackCommand: constant.CallbackBeforeCreateGroupCommand, - OperationID: operationID, + OperationID: tracelog.GetOperationID(ctx), GroupInfo: *req.GroupInfo, } - commonCallbackReq.InitMemberList = append(commonCallbackReq.InitMemberList, &apistruct.GroupAddMemberInfo{ + cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{ UserID: req.OwnerUserID, RoleLevel: constant.GroupOwner, }) for _, userID := range req.AdminUserIDs { - commonCallbackReq.InitMemberList = append(commonCallbackReq.InitMemberList, &apistruct.GroupAddMemberInfo{ + cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{ UserID: userID, RoleLevel: constant.GroupAdmin, }) } for _, userID := range req.AdminUserIDs { - commonCallbackReq.InitMemberList = append(commonCallbackReq.InitMemberList, &apistruct.GroupAddMemberInfo{ + cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{ UserID: userID, RoleLevel: constant.GroupOrdinaryUsers, }) } - resp := &callbackstruct.CallbackBeforeCreateGroupResp{ - CommonCallbackResp: &callbackstruct.CommonCallbackResp{OperationID: operationID}, - } - err = http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeCreateGroupCommand, commonCallbackReq, resp, config.Config.Callback.CallbackBeforeCreateGroup) + resp := &callbackstruct.CallbackBeforeCreateGroupResp{} + err = http.CallBackPostReturnV2(config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup) if err != nil { return err } - - if resp.GroupID != nil { - req.GroupInfo.GroupID = *resp.GroupID - } - if resp.GroupName != nil { - req.GroupInfo.GroupName = *resp.GroupName - } - if resp.Notification != nil { - req.GroupInfo.Notification = *resp.Notification - } - if resp.Introduction != nil { - req.GroupInfo.Introduction = *resp.Introduction - } - if resp.FaceURL != nil { - req.GroupInfo.FaceURL = *resp.FaceURL - } - if resp.OwnerUserID != nil { - req.GroupInfo.OwnerUserID = *resp.OwnerUserID - } - if resp.Ex != nil { - req.GroupInfo.Ex = *resp.Ex - } - if resp.Status != nil { - req.GroupInfo.Status = *resp.Status - } - if resp.CreatorUserID != nil { - req.GroupInfo.CreatorUserID = *resp.CreatorUserID - } - if resp.GroupType != nil { - req.GroupInfo.GroupType = *resp.GroupType - } - if resp.NeedVerification != nil { - req.GroupInfo.NeedVerification = *resp.NeedVerification - } - if resp.LookMemberInfo != nil { - req.GroupInfo.LookMemberInfo = *resp.LookMemberInfo - } + utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID) + utils.NotNilReplace(&req.GroupInfo.GroupName, resp.GroupName) + utils.NotNilReplace(&req.GroupInfo.Notification, resp.Notification) + utils.NotNilReplace(&req.GroupInfo.Introduction, resp.Introduction) + utils.NotNilReplace(&req.GroupInfo.FaceURL, resp.FaceURL) + utils.NotNilReplace(&req.GroupInfo.OwnerUserID, resp.OwnerUserID) + utils.NotNilReplace(&req.GroupInfo.Ex, resp.Ex) + utils.NotNilReplace(&req.GroupInfo.Status, resp.Status) + utils.NotNilReplace(&req.GroupInfo.CreatorUserID, resp.CreatorUserID) + utils.NotNilReplace(&req.GroupInfo.GroupType, resp.GroupType) + utils.NotNilReplace(&req.GroupInfo.NeedVerification, resp.NeedVerification) + utils.NotNilReplace(&req.GroupInfo.LookMemberInfo, resp.LookMemberInfo) return nil } @@ -97,39 +70,26 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, groupMember *relation.Gr defer func() { tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "groupMember", *groupMember, "groupEx", groupEx) }() - operationID := tracelog.GetOperationID(ctx) - callbackResp := callbackstruct.CommonCallbackResp{OperationID: operationID} - callbackReq := callbackstruct.CallbackBeforeMemberJoinGroupReq{ + callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ CallbackCommand: constant.CallbackBeforeMemberJoinGroupCommand, - OperationID: operationID, + OperationID: tracelog.GetOperationID(ctx), GroupID: groupMember.GroupID, UserID: groupMember.UserID, Ex: groupMember.Ex, GroupEx: groupEx, } - resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{ - CommonCallbackResp: &callbackResp, - } - err = http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeMemberJoinGroupCommand, callbackReq, - resp, config.Config.Callback.CallbackBeforeMemberJoinGroup) + resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{} + err = http.CallBackPostReturnV2(config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeMemberJoinGroup) if err != nil { return err } if resp.MuteEndTime != nil { - groupMember.MuteEndTime = utils.UnixSecondToTime(*resp.MuteEndTime) - } - if resp.FaceURL != nil { - groupMember.FaceURL = *resp.FaceURL - } - if resp.Ex != nil { - groupMember.Ex = *resp.Ex - } - if resp.NickName != nil { - groupMember.Nickname = *resp.NickName - } - if resp.RoleLevel != nil { - groupMember.RoleLevel = *resp.RoleLevel + groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime) } + utils.NotNilReplace(&groupMember.FaceURL, resp.FaceURL) + utils.NotNilReplace(&groupMember.Ex, resp.Ex) + utils.NotNilReplace(&groupMember.Nickname, resp.Nickname) + utils.NotNilReplace(&groupMember.RoleLevel, resp.RoleLevel) return nil } @@ -140,44 +100,40 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe defer func() { tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "req", *req) }() - operationID := tracelog.GetOperationID(ctx) - callbackResp := callbackstruct.CommonCallbackResp{OperationID: operationID} callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ CallbackCommand: constant.CallbackBeforeSetGroupMemberInfoCommand, - OperationID: operationID, + OperationID: tracelog.GetOperationID(ctx), GroupID: req.GroupID, UserID: req.UserID, } if req.Nickname != nil { - callbackReq.Nickname = req.Nickname.Value + callbackReq.Nickname = &req.Nickname.Value } if req.FaceURL != nil { - callbackReq.FaceURL = req.FaceURL.Value + callbackReq.FaceURL = &req.FaceURL.Value } if req.RoleLevel != nil { - callbackReq.RoleLevel = req.RoleLevel.Value + callbackReq.RoleLevel = &req.RoleLevel.Value } if req.Ex != nil { - callbackReq.Ex = req.Ex.Value + callbackReq.Ex = &req.Ex.Value } - resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{ - CommonCallbackResp: &callbackResp, - } - err = http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetGroupMemberInfoCommand, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo) + resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{} + err = http.CallBackPostReturnV2(config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo) if err != nil { return err } if resp.FaceURL != nil { - req.FaceURL = &wrapperspb.StringValue{Value: *resp.FaceURL} + req.FaceURL = wrapperspb.String(*resp.FaceURL) } if resp.Nickname != nil { - req.Nickname = &wrapperspb.StringValue{Value: *resp.Nickname} + req.Nickname = wrapperspb.String(*resp.Nickname) } if resp.RoleLevel != nil { - req.RoleLevel = &wrapperspb.Int32Value{Value: *resp.RoleLevel} + req.RoleLevel = wrapperspb.Int32(*resp.RoleLevel) } if resp.Ex != nil { - req.Ex = &wrapperspb.StringValue{Value: *resp.Ex} + req.Ex = wrapperspb.String(*resp.Ex) } - return err + return nil } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 55e6d90fd..dfa58f7ef 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -28,11 +28,11 @@ import ( ) func Start(client *openKeeper.ZkClient, server *grpc.Server) error { - mysql, err := relation.NewGormDB() + db, err := relation.NewGormDB() if err != nil { return err } - if err := mysql.AutoMigrate(&relationTb.GroupModel{}, &relationTb.GroupMemberModel{}, &relationTb.GroupRequestModel{}); err != nil { + if err := db.AutoMigrate(&relationTb.GroupModel{}, &relationTb.GroupMemberModel{}, &relationTb.GroupRequestModel{}); err != nil { return err } redis, err := cache.NewRedis() @@ -44,7 +44,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error { return err } pbGroup.RegisterGroupServer(server, &groupServer{ - GroupInterface: controller.NewGroupInterface(mysql, redis.GetClient(), mongo.GetClient()), + GroupInterface: controller.NewGroupInterface(controller.NewGroupDatabase(db, redis.GetClient(), mongo.GetClient())), UserCheck: check.NewUserCheck(client), ConversationChecker: check.NewConversationChecker(client), }) diff --git a/internal/rpc/msg/extend_msg_callback.go b/internal/rpc/msg/extend_msg_callback.go index eefcb419e..949e30c3f 100644 --- a/internal/rpc/msg/extend_msg_callback.go +++ b/internal/rpc/msg/extend_msg_callback.go @@ -1,7 +1,7 @@ package msg import ( - cbApi "Open_IM/pkg/call_back_struct" + cb "Open_IM/pkg/callbackstruct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/http" @@ -11,10 +11,10 @@ import ( http2 "net/http" ) -func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cbApi.CallbackBeforeSetMessageReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID} +func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cb.CallbackBeforeSetMessageReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID} log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) - req := cbApi.CallbackBeforeSetMessageReactionExtReq{ + req := cb.CallbackBeforeSetMessageReactionExtReq{ OperationID: setReq.OperationID, CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand, SourceID: setReq.SourceID, @@ -26,7 +26,7 @@ func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensio IsExternalExtensions: setReq.IsExternalExtensions, MsgFirstModifyTime: setReq.MsgFirstModifyTime, } - resp := &cbApi.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp} + resp := &cb.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp} defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetMessageReactionExtensionCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { callbackResp.ErrCode = http2.StatusInternalServerError @@ -36,8 +36,8 @@ func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensio } -func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cbApi.CallbackDeleteMessageReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID} +func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cb.CallbackDeleteMessageReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID} log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) req := cbApi.CallbackDeleteMessageReactionExtReq{ OperationID: setReq.OperationID, @@ -58,8 +58,8 @@ func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti } return resp } -func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cbApi.CallbackGetMessageListReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: getReq.OperationID} +func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cb.CallbackGetMessageListReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: getReq.OperationID} log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), getReq.String()) req := cbApi.CallbackGetMessageListReactionExtReq{ OperationID: getReq.OperationID, @@ -78,8 +78,8 @@ func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReaction } return resp } -func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cbApi.CallbackAddMessageReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID} +func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID} log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) req := cbApi.CallbackAddMessageReactionExtReq{ OperationID: setReq.OperationID, @@ -93,7 +93,7 @@ func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensio IsExternalExtensions: setReq.IsExternalExtensions, MsgFirstModifyTime: setReq.MsgFirstModifyTime, } - resp := &cbApi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} + resp := &cb.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime) if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { callbackResp.ErrCode = http2.StatusInternalServerError diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index ae416ccf3..28d5c14a2 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -2,33 +2,23 @@ package msg import ( "Open_IM/internal/common/check" - "Open_IM/internal/common/notification" - "Open_IM/internal/common/rpcserver" - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" + "Open_IM/pkg/common/db/relation" + tablerelation "Open_IM/pkg/common/db/table/relation" + discoveryRegistry "Open_IM/pkg/discoveryregistry" + "github.com/OpenIMSDK/openKeeper" - "Open_IM/pkg/common/kafka" - "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/proto/msg" - "Open_IM/pkg/utils" - "github.com/OpenIMSDK/getcdv3" - "net" - "strconv" - "strings" - - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "google.golang.org/grpc" ) type msgServer struct { - *rpcserver.RpcServer - MsgInterface controller.MsgInterface - Group *check.GroupChecker - User *check.UserCheck - Conversation *check.ConversationChecker + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry + MsgInterface controller.MsgInterface + Group *check.GroupChecker + User *check.UserCheck + Conversation *check.ConversationChecker } type deleteMsg struct { @@ -38,39 +28,34 @@ type deleteMsg struct { OperationID string } -func NewRpcChatServer(port int) *msgServer { - log.NewPrivateLog(constant.LogFileName) - rc := msgServer{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, - dMessageLocker: NewLockerMessage(), +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() + if err != nil { + return err } - rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) - //rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic) - rc.delMsgCh = make(chan deleteMsg, 1000) - return &rc + if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil { + return err + } + s := &msgServer{ + Conversation: check.NewConversationChecker(client), + User: check.NewUserCheck(client), + Group: check.NewGroupChecker(client), + //MsgInterface: controller.MsgInterface(), + RegisterCenter: client, + } + s.initPrometheus() + msg.RegisterMsgServer(server, s) + return nil } -func (rpc *rpcChat) initPrometheus() { - //sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - // Name: "send_msg_success", - // Help: "The number of send msg success", - //}) - //sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - // Name: "send_msg_failed", - // Help: "The number of send msg failed", - //}) +func (m *msgServer) initPrometheus() { promePkg.NewMsgPullFromRedisSuccessCounter() promePkg.NewMsgPullFromRedisFailedCounter() promePkg.NewMsgPullFromMongoSuccessCounter() promePkg.NewMsgPullFromMongoFailedCounter() - promePkg.NewSingleChatMsgRecvSuccessCounter() promePkg.NewGroupChatMsgRecvSuccessCounter() promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() - promePkg.NewSingleChatMsgProcessSuccessCounter() promePkg.NewSingleChatMsgProcessFailedCounter() promePkg.NewGroupChatMsgProcessSuccessCounter() @@ -78,78 +63,3 @@ func (rpc *rpcChat) initPrometheus() { promePkg.NewWorkSuperGroupChatMsgProcessSuccessCounter() promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter() } - -func (m *msgServer) Run() { - log.Info("", "rpcChat init...") - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(m.rpcPort) - listener, err := net.Listen("tcp", address) - if err != nil { - panic("listening err:" + err.Error() + m.rpcRegisterName) - } - log.Info("", "listen network success, address ", address) - recvSize := 1024 * 1024 * 30 - sendSize := 1024 * 1024 * 30 - var grpcOpts = []grpc.ServerOption{ - grpc.MaxRecvMsgSize(recvSize), - grpc.MaxSendMsgSize(sendSize), - } - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - - rpcRegisterIP := config.Config.RpcRegisterIP - msg.RegisterMsgServer(srv, m) - if config.Config.RpcRegisterIP == "" { - rpcRegisterIP, err = utils.GetLocalIP() - if err != nil { - log.Error("", "GetLocalIP failed ", err.Error()) - } - } - err = getcdv3.RegisterEtcd(m.etcdSchema, strings.Join(m.etcdAddr, ","), rpcRegisterIP, m.rpcPort, m.rpcRegisterName, 10, "") - if err != nil { - log.Error("", "register rpcChat to etcd failed ", err.Error()) - panic(utils.Wrap(err, "register chat module m to etcd err")) - } - go m.runCh() - m.initPrometheus() - err = srv.Serve(listener) - if err != nil { - log.Error("", "m rpcChat failed ", err.Error()) - return - } - log.Info("", "m rpcChat init success") -} - -func (rpc *rpcChat) runCh() { - log.NewInfo("", "start del msg chan ") - for { - select { - case msg := <-rpc.delMsgCh: - log.NewInfo(msg.OperationID, utils.GetSelfFuncName(), "delmsgch recv new: ", msg) - db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID) - unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID) - if err != nil { - log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error()) - continue - } - if len(unexistSeqList) > 0 { - notification.DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID) - } - } - } -} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 658de155d..72ed92759 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -4,184 +4,46 @@ import ( "Open_IM/internal/common/check" "Open_IM/internal/common/convert" "Open_IM/internal/common/notification" - "Open_IM/internal/common/rpcserver" - "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" tablerelation "Open_IM/pkg/common/db/table/relation" - "Open_IM/pkg/common/log" - prome "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/tokenverify" "Open_IM/pkg/common/tracelog" + discoveryRegistry "Open_IM/pkg/discoveryregistry" "Open_IM/pkg/proto/sdkws" pbuser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - + "github.com/OpenIMSDK/openKeeper" "google.golang.org/grpc" ) type userServer struct { - *rpcserver.RpcServer controller.UserInterface - notification *notification.Check - userCheck *check.UserCheck + notification *notification.Check + userCheck *check.UserCheck + ConversationChecker *check.ConversationChecker + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry } -func NewUserServer(port int) *userServer { - r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() if err != nil { - panic(err) + return err } - //mysql init - var mysql relation.Mysql - var model relation.UserGorm - err = mysql.InitConn().AutoMigrateModel(&model) - if err != nil { - panic("db init err:" + err.Error()) + if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil { + return err } - if mysql.GormConn() != nil { - model.DB = mysql.GormConn() - } else { - panic("db init err:" + "conn is nil") - } - return &userServer{RpcServer: r, UserInterface: controller.NewUserController(model.DB)} + pbuser.RegisterUserServer(server, &userServer{ + UserInterface: controller.NewUserController(mysql), + notification: notification.NewCheck(client), + userCheck: check.NewUserCheck(client), + RegisterCenter: client, + }) + return nil } -func (s *userServer) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "rpc user start...") - listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "listen ok ", address) - defer listener.Close() - //grpc server - var grpcOpts []grpc.ServerOption - if config.Config.Prometheus.Enable { - prome.NewGrpcRequestCounter() - prome.NewGrpcRequestFailedCounter() - prome.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - //Service registers with etcd - pbuser.RegisterUserServer(srv, s) - - err = srv.Serve(listener) - if err != nil { - panic(err) - } - log.NewInfo(operationID, "rpc user success") -} - -// ok -//func (s *userServer) SyncJoinedGroupMemberFaceURL(ctx context.Context, userID string, faceURL string, operationID string, opUserID string) { -// members, err := s.GetJoinedGroupMembers(ctx, userID) -// if err != nil { -// return -// } -// groupIDs := make([]string, 0) -// for _, v := range members { -// groupIDs = append(groupIDs, v.GroupID) -// } -// if s.SetGroupMemberInfo(ctx, "", faceURL, "", 0, groupIDs, userID) != nil { -// return -// } -// for _, v := range groupIDs { -// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID) -// } -//} - -// ok -//func (s *userServer) SyncJoinedGroupMemberNickname(ctx context.Context, userID string, newNickname, oldNickname string, operationID string, opUserID string) { -// members, err := s.GetJoinedGroupMembers(ctx, userID) -// if err != nil { -// return -// } -// groupIDs := make([]string, 0) -// for _, v := range members { -// if v.Nickname == oldNickname { -// groupIDs = append(groupIDs, v.GroupID) -// } -// } -// s.SetGroupMemberInfo(ctx, newNickname, "", "", 0, groupIDs, userID) -// for _, v := range groupIDs { -// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID) -// } -//} - -// 设置群头像 -//func (s *userServer) SetGroupMemberInfo(ctx context.Context, nickname, faceURL, ex string, roleLevel int32, groupIDs []string, userID string) (err error) { -// -// req := pbgroup.SetGroupMemberInfo{UserID: userID} -// if nickname != "" { -// req.Nickname = &wrappers.StringValue{Value: nickname} -// } -// if faceURL != "" { -// req.FaceURL = &wrappers.StringValue{Value: faceURL} -// } -// if ex != "" { -// req.Ex = &wrappers.StringValue{Value: ex} -// } -// if roleLevel != 0 { -// req.RoleLevel = &wrappers.Int32Value{Value: roleLevel} -// } -// -// setGroupMemberInfoReq := &pbgroup.SetGroupMemberInfoReq{} -// for _, v := range groupIDs { -// req.GroupID = v -// setGroupMemberInfoReq.Members = append(setGroupMemberInfoReq.Members, &req) -// } -// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName) -// if err != nil { -// return err -// } -// client := group.NewGroupClient(conn) -// _, err = client.SetGroupMemberInfo(ctx, setGroupMemberInfoReq) -// return -//} - -// 获取加入的群成员信息 -//func (s *userServer) GetJoinedGroupMembers(ctx context.Context, userID string) (members []*sdkws.GroupMemberFullInfo, err error) { -// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName) -// if err != nil { -// return nil, err -// } -// -// client := group.NewGroupClient(conn) -// for { -// idx := int32(0) -// req := pbgroup.GetJoinedGroupListReq{FromUserID: userID, Pagination: &sdkws.RequestPagination{PageNumber: idx, ShowNumber: constant.ShowNumber}} -// resp, err := client.GetJoinedGroupList(ctx, &req) -// if err != nil { -// return nil, err -// } -// groupIDs := make([]string, 0) -// -// for _, v := range resp.Groups { -// groupIDs = append(groupIDs, v.GroupID) -// } -// -// client.GetGroupMembersInfo() -// -// if len(resp.Groups) < constant.ShowNumber { -// break -// } -// idx++ -// } -// -// return -//} - // ok func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) { resp = &pbuser.GetDesignateUsersResp{} diff --git a/internal/startrpc/start.go b/internal/startrpc/start.go index 0bdc2a51e..1a487d2b7 100644 --- a/internal/startrpc/start.go +++ b/internal/startrpc/start.go @@ -15,15 +15,13 @@ import ( "net" ) -func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options []grpc.ServerOption) error { - flagRpcPort := flag.Int("port", rpcPort, "get RpcGroupPort from cmd,default 16000 as port") - flagPrometheusPort := flag.Int("prometheus_port", prometheusPort, "groupPrometheusPort default listen port") +func start(rpcPorts []int, rpcRegisterName string, prometheusPorts []int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options []grpc.ServerOption) error { + flagRpcPort := flag.Int("port", rpcPorts[0], "get RpcGroupPort from cmd,default 16000 as port") + flagPrometheusPort := flag.Int("prometheus_port", prometheusPorts[0], "groupPrometheusPort default listen port") flag.Parse() - rpcPort = *flagRpcPort - prometheusPort = *flagPrometheusPort - fmt.Println("start group rpc server, port: ", rpcPort, ", OpenIM version: ", constant.CurrentVersion) + fmt.Println("start group rpc server, port: ", *flagRpcPort, ", OpenIM version: ", constant.CurrentVersion) log.NewPrivateLog(constant.LogFileName) - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, rpcPort)) + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, *flagRpcPort)) if err != nil { return err } @@ -50,12 +48,12 @@ func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c } srv := grpc.NewServer(options...) defer srv.GracefulStop() - err = zkClient.Register(rpcRegisterName, registerIP, rpcPort) + err = zkClient.Register(rpcRegisterName, registerIP, *flagRpcPort) if err != nil { return err } if config.Config.Prometheus.Enable { - err := promePkg.StartPromeSrv(prometheusPort) + err := promePkg.StartPromeSrv(*flagPrometheusPort) if err != nil { return err } @@ -63,7 +61,7 @@ func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c return rpcFn(zkClient, srv) } -func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options ...grpc.ServerOption) { - err := start(rpcPort, rpcRegisterName, prometheusPort, rpcFn, options) +func Start(rpcPorts []int, rpcRegisterName string, prometheusPorts []int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options ...grpc.ServerOption) { + err := start(rpcPorts, rpcRegisterName, prometheusPorts, rpcFn, options) fmt.Println("end", err) } diff --git a/pkg/callbackstruct/common.go b/pkg/callbackstruct/common.go index 2f589fc91..098dc8135 100644 --- a/pkg/callbackstruct/common.go +++ b/pkg/callbackstruct/common.go @@ -41,13 +41,9 @@ type CommonCallbackResp struct { ActionCode int `json:"actionCode"` ErrCode int32 `json:"errCode"` ErrMsg string `json:"errMsg"` - //OperationID string `json:"operationID"` } -func (c *CommonCallbackResp) Parse() error { - if c == nil { - return constant.ErrData.Wrap("callback common is nil") - } +func (c CommonCallbackResp) Parse() error { if c.ActionCode != constant.NoError || c.ErrCode != constant.NoError { newErr := constant.ErrCallback newErr.ErrCode = c.ErrCode diff --git a/pkg/callbackstruct/friend.go b/pkg/callbackstruct/friend.go index fa1cffe28..9967707f3 100644 --- a/pkg/callbackstruct/friend.go +++ b/pkg/callbackstruct/friend.go @@ -9,5 +9,5 @@ type CallbackBeforeAddFriendReq struct { } type CallbackBeforeAddFriendResp struct { - *CommonCallbackResp + CommonCallbackResp } diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index a84753326..136386297 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -5,15 +5,21 @@ import ( common "Open_IM/pkg/proto/sdkws" ) +type CallbackCommand string + +func (c CallbackCommand) GetCallbackCommand() string { + return string(c) +} + type CallbackBeforeCreateGroupReq struct { - CallbackCommand string `json:"callbackCommand"` OperationID string `json:"operationID"` + CallbackCommand `json:"callbackCommand"` common.GroupInfo InitMemberList []*apistruct.GroupAddMemberInfo `json:"initMemberList"` } type CallbackBeforeCreateGroupResp struct { - *CommonCallbackResp + CommonCallbackResp GroupID *string `json:"groupID"` GroupName *string `json:"groupName"` Notification *string `json:"notification"` @@ -30,7 +36,7 @@ type CallbackBeforeCreateGroupResp struct { } type CallbackBeforeMemberJoinGroupReq struct { - CallbackCommand string `json:"callbackCommand"` + CallbackCommand `json:"callbackCommand"` OperationID string `json:"operationID"` GroupID string `json:"groupID"` UserID string `json:"userID"` @@ -39,8 +45,8 @@ type CallbackBeforeMemberJoinGroupReq struct { } type CallbackBeforeMemberJoinGroupResp struct { - *CommonCallbackResp - NickName *string `json:"nickName"` + CommonCallbackResp + Nickname *string `json:"nickname"` FaceURL *string `json:"faceURL"` RoleLevel *int32 `json:"roleLevel"` MuteEndTime *int64 `json:"muteEndTime"` @@ -48,18 +54,18 @@ type CallbackBeforeMemberJoinGroupResp struct { } type CallbackBeforeSetGroupMemberInfoReq struct { - CallbackCommand string `json:"callbackCommand"` - OperationID string `json:"operationID"` - GroupID string `json:"groupID"` - UserID string `json:"userID"` - Nickname string `json:"nickName"` - FaceURL string `json:"faceURL"` - RoleLevel int32 `json:"roleLevel"` - Ex string `json:"ex"` + CallbackCommand `json:"callbackCommand"` + OperationID string `json:"operationID"` + GroupID string `json:"groupID"` + UserID string `json:"userID"` + Nickname *string `json:"nickName"` + FaceURL *string `json:"faceURL"` + RoleLevel *int32 `json:"roleLevel"` + Ex *string `json:"ex"` } type CallbackBeforeSetGroupMemberInfoResp struct { - *CommonCallbackResp + CommonCallbackResp Ex *string `json:"ex"` Nickname *string `json:"nickName"` FaceURL *string `json:"faceURL"` diff --git a/pkg/callbackstruct/message.go b/pkg/callbackstruct/message.go index efb26f4a3..39276cdb1 100644 --- a/pkg/callbackstruct/message.go +++ b/pkg/callbackstruct/message.go @@ -11,7 +11,7 @@ type CallbackBeforeSendSingleMsgReq struct { } type CallbackBeforeSendSingleMsgResp struct { - *CommonCallbackResp + CommonCallbackResp } type CallbackAfterSendSingleMsgReq struct { @@ -20,7 +20,7 @@ type CallbackAfterSendSingleMsgReq struct { } type CallbackAfterSendSingleMsgResp struct { - *CommonCallbackResp + CommonCallbackResp } type CallbackBeforeSendGroupMsgReq struct { @@ -29,7 +29,7 @@ type CallbackBeforeSendGroupMsgReq struct { } type CallbackBeforeSendGroupMsgResp struct { - *CommonCallbackResp + CommonCallbackResp } type CallbackAfterSendGroupMsgReq struct { @@ -38,7 +38,7 @@ type CallbackAfterSendGroupMsgReq struct { } type CallbackAfterSendGroupMsgResp struct { - *CommonCallbackResp + CommonCallbackResp } type CallbackMsgModifyCommandReq struct { @@ -46,7 +46,7 @@ type CallbackMsgModifyCommandReq struct { } type CallbackMsgModifyCommandResp struct { - *CommonCallbackResp + CommonCallbackResp Content *string `json:"content"` RecvID *string `json:"recvID"` GroupID *string `json:"groupID"` @@ -79,7 +79,7 @@ type CallbackBeforeSetMessageReactionExtReq struct { MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } type CallbackBeforeSetMessageReactionExtResp struct { - *CommonCallbackResp + CommonCallbackResp ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"` MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } @@ -95,7 +95,7 @@ type CallbackDeleteMessageReactionExtReq struct { MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } type CallbackDeleteMessageReactionExtResp struct { - *CommonCallbackResp + CommonCallbackResp ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"` MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` } diff --git a/pkg/callbackstruct/msg_gateway.go b/pkg/callbackstruct/msg_gateway.go index 02509ac3f..34bb14e69 100644 --- a/pkg/callbackstruct/msg_gateway.go +++ b/pkg/callbackstruct/msg_gateway.go @@ -9,7 +9,7 @@ type CallbackUserOnlineReq struct { } type CallbackUserOnlineResp struct { - *CommonCallbackResp + CommonCallbackResp } type CallbackUserOfflineReq struct { @@ -19,7 +19,7 @@ type CallbackUserOfflineReq struct { } type CallbackUserOfflineResp struct { - *CommonCallbackResp + CommonCallbackResp } type CallbackUserKickOffReq struct { @@ -28,5 +28,5 @@ type CallbackUserKickOffReq struct { } type CallbackUserKickOffResp struct { - *CommonCallbackResp + CommonCallbackResp } diff --git a/pkg/callbackstruct/push.go b/pkg/callbackstruct/push.go index 483a2dffb..9a686b011 100644 --- a/pkg/callbackstruct/push.go +++ b/pkg/callbackstruct/push.go @@ -15,7 +15,7 @@ type CallbackBeforePushReq struct { } type CallbackBeforePushResp struct { - *CommonCallbackResp + CommonCallbackResp UserIDList []string `json:"userIDList"` OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"` } @@ -34,7 +34,7 @@ type CallbackBeforeSuperGroupOnlinePushReq struct { } type CallbackBeforeSuperGroupOnlinePushResp struct { - *CommonCallbackResp + CommonCallbackResp UserIDList []string `json:"userIDList"` OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"` } diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 34c5ee94b..27f4bec13 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -95,6 +95,10 @@ type RedisClient struct { rdb redis.UniversalClient } +func NewRedisClient(rdb redis.UniversalClient) *RedisClient { + return &RedisClient{rdb: rdb} +} + //func (r *RedisClient) InitRedis() { // var rdb redis.UniversalClient // var err error diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index eba88fd7d..ad485f117 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -19,7 +19,6 @@ type AuthController struct { } func NewAuthController(rdb redis.UniversalClient, accessSecret string, accessExpire int64) *AuthController { - cache.NewRedisClient(rdb) return &AuthController{database: cache.NewTokenRedis(cache.NewRedisClient(rdb), accessSecret, accessExpire)} } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 093249940..863026909 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -142,7 +142,7 @@ func NewConversationDataBase(db relation.Conversation, cache cache.ConversationC } func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { - + panic("implement me") } func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 8a39442c8..2fb6a8551 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -55,8 +55,8 @@ type GroupInterface interface { var _ GroupInterface = (*GroupController)(nil) -func NewGroupInterface(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.Client) GroupInterface { - return &GroupController{database: NewGroupDatabase(db, rdb, mgoClient)} +func NewGroupInterface(database GroupDataBaseInterface) GroupInterface { + return &GroupController{database: database} } type GroupController struct { @@ -72,7 +72,7 @@ func (g *GroupController) CreateGroup(ctx context.Context, groups []*relationTb. } func (g *GroupController) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) { - return g.TakeGroup(ctx, groupID) + return g.database.TakeGroup(ctx, groupID) } func (g *GroupController) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) { @@ -175,6 +175,53 @@ func (g *GroupController) CreateSuperGroupMember(ctx context.Context, groupID st return g.database.CreateSuperGroupMember(ctx, groupID, userIDs) } +type Group interface { + CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error + TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) + FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) + SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error) + UpdateGroup(ctx context.Context, groupID string, data map[string]any) error + DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员 +} + +type GroupMember interface { + TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) + TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error) + FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error) + FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error) + PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) + SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error) + HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error + DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error + MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) + MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error) + TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群 + UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error + UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error +} + +type GroupRequest interface { + CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error + TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error) + PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error) +} + +type SuperGroup interface { + FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error) + FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error) + CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error + DeleteSuperGroup(ctx context.Context, groupID string) error + DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error + CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error +} + +type GroupDataBase1 interface { + Group + GroupMember + GroupRequest + SuperGroup +} + type GroupDataBaseInterface interface { CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) @@ -291,7 +338,10 @@ func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relationTb.Gr } func (g *GroupDataBase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) { - return g.cache.GetGroupInfo(ctx, groupID) + //return g.cache.GetGroupInfo(ctx, groupID) + return cache.GetCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) { + return g.group.Take(ctx, groupID) + }) } func (g *GroupDataBase) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) { diff --git a/pkg/common/db/relation/group_member_model.go b/pkg/common/db/relation/group_member_model.go index 14373119a..06b41b43a 100644 --- a/pkg/common/db/relation/group_member_model.go +++ b/pkg/common/db/relation/group_member_model.go @@ -130,5 +130,5 @@ func (g *GroupMemberGorm) FindMemberUserID(ctx context.Context, groupID string, defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userIDs", userIDs) }() - return userIDs, utils.Wrap(getDBConn(g.DB, tx).Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Pluck("user_id", &userIDs), "") + return userIDs, utils.Wrap(getDBConn(g.DB, tx).Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Pluck("user_id", &userIDs).Error, "") }