Error code standardization

This commit is contained in:
skiffer-git 2023-02-14 19:34:19 +08:00
parent c63b543a4a
commit dea9a4c8fa
8 changed files with 105 additions and 360 deletions

View File

@ -1,26 +1,11 @@
package main
import (
rpcAuth "Open_IM/internal/rpc/auth"
"Open_IM/internal/rpc/auth"
"Open_IM/internal/startrpc"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
func main() {
defaultPorts := config.Config.RpcPort.OpenImAuthPort
rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port")
flag.Parse()
fmt.Println("start auth rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
startrpc.Start(config.Config.RpcPort.OpenImAuthPort[0], config.Config.RpcRegisterName.OpenImAuthName, config.Config.Prometheus.AuthPrometheusPort[0], auth.Start)
}

View File

@ -2,25 +2,10 @@ package main
import (
"Open_IM/internal/rpc/friend"
"Open_IM/internal/startrpc"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
func main() {
defaultPorts := config.Config.RpcPort.OpenImFriendPort
rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port")
flag.Parse()
fmt.Println("start friend rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
rpcServer := friend.NewFriendServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
startrpc.Start(config.Config.RpcPort.OpenImFriendPort[0], config.Config.RpcRegisterName.OpenImFriendName, config.Config.Prometheus.FriendPrometheusPort[0], friend.Start)
}

View File

@ -2,25 +2,12 @@ package main
import (
"Open_IM/internal/rpc/user"
"Open_IM/internal/startrpc"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus"
"flag"
"fmt"
)
func main() {
defaultPorts := config.Config.RpcPort.OpenImUserPort
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port")
flag.Parse()
fmt.Println("start user rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
rpcServer := user.NewUserServer(*rpcPort)
go func() {
err := promePkg.StartPromeSrv(*prometheusPort)
if err != nil {
panic(err)
}
}()
rpcServer.Run()
startrpc.Start(config.Config.RpcPort.OpenImUserPort[0], config.Config.RpcRegisterName.OpenImUserName, config.Config.Prometheus.UserPrometheusPort[0], user.Start)
}

2
go.mod
View File

@ -41,7 +41,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe
github.com/swaggo/gin-swagger v1.5.0
github.com/swaggo/swag v1.8.3
github.com/swaggo/swag v1.8.3 // indirect
github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca
go.etcd.io/etcd/client/v3 v3.5.6 // indirect
go.mongodb.org/mongo-driver v1.8.3

View File

@ -46,27 +46,27 @@ func (db *DBFriend) DB2PB(ctx context.Context, friends []*relation.FriendModel)
return nil, err
}
for _, v := range friends {
pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
utils.CopyStructFields(pbFriend, users[v.OwnerUserID])
utils.CopyStructFields(pbFriend.FriendUser, users[v.FriendUserID])
pbFriend.CreateTime = v.CreateTime.Unix()
pbFriend.FriendUser.CreateTime = v.CreateTime.Unix()
pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
utils.CopyStructFields(pbfriend, users[v.OwnerUserID])
utils.CopyStructFields(pbfriend.FriendUser, users[v.FriendUserID])
pbfriend.CreateTime = v.CreateTime.Unix()
pbfriend.FriendUser.CreateTime = v.CreateTime.Unix()
}
return
}
func (db *DBFriend) Convert(ctx context.Context) (*sdk.FriendInfo, error) {
pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
utils.CopyStructFields(pbFriend, db)
pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
utils.CopyStructFields(pbfriend, db)
user, err := db.userCheck.GetUsersInfo(ctx, db.FriendUserID)
if err != nil {
return nil, err
}
utils.CopyStructFields(pbFriend.FriendUser, user)
pbFriend.CreateTime = db.CreateTime.Unix()
utils.CopyStructFields(pbfriend.FriendUser, user)
pbfriend.CreateTime = db.CreateTime.Unix()
pbFriend.FriendUser.CreateTime = db.CreateTime.Unix()
return pbFriend, nil
pbfriend.FriendUser.CreateTime = db.CreateTime.Unix()
return pbfriend, nil
}
func (pb *PBFriend) Convert() (*relation.FriendModel, error) {

View File

@ -2,68 +2,45 @@ package auth
import (
"Open_IM/internal/common/check"
"Open_IM/internal/common/rpcserver"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
pbAuth "Open_IM/pkg/proto/auth"
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"
"context"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/OpenIMSDK/openKeeper"
"google.golang.org/grpc"
)
func NewRpcAuthServer(port int) *rpcAuth {
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
mysql, err := relation.NewGormDB()
if err != nil {
panic(err)
return err
}
var redis cache.RedisClient
redis.InitRedis()
return &rpcAuth{
RpcServer: r,
AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
return err
}
redis, err := cache.NewRedis()
if err != nil {
return err
}
pbAuth.RegisterAuthServer(server, &authServer{
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
})
return nil
}
func (s *rpcAuth) Run() {
operationID := utils.OperationIDGenerator()
log.NewInfo(operationID, "rpc auth start...")
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
if err != nil {
panic(err)
}
log.NewInfo(operationID, "listen network success ", listener, address)
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
promePkg.NewUserRegisterCounter()
promePkg.NewUserLoginCounter()
grpcOpts = append(grpcOpts, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbAuth.RegisterAuthServer(srv, s)
err = srv.Serve(listener)
if err != nil {
panic(err)
}
log.NewInfo(operationID, "rpc auth ok")
}
func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
resp := pbAuth.UserTokenResp{}
if _, err := s.userCheck.GetUsersInfo(ctx, req.UserID); err != nil {
return nil, err
@ -77,7 +54,7 @@ func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbA
return &resp, nil
}
func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString)
if err != nil {
return nil, utils.Wrap(err, "")
@ -102,7 +79,7 @@ func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims *
return nil, constant.ErrTokenNotExist.Wrap()
}
func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) {
func (s *authServer) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) {
resp = &pbAuth.ParseTokenResp{}
claims, err := s.parseToken(ctx, req.Token)
if err != nil {
@ -114,7 +91,7 @@ func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (re
return resp, nil
}
func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) {
func (s *authServer) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) {
resp := pbAuth.ForceLogoutResp{}
if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err
@ -125,7 +102,7 @@ func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (
return &resp, nil
}
func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error {
func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error {
grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName)
if err != nil {
return err
@ -140,8 +117,8 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in
return constant.ErrInternalServer.Wrap()
}
type rpcAuth struct {
*rpcserver.RpcServer
type authServer struct {
controller.AuthInterface
userCheck *check.UserCheck
userCheck *check.UserCheck
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
}

View File

@ -4,100 +4,49 @@ import (
"Open_IM/internal/common/check"
"Open_IM/internal/common/convert"
"Open_IM/internal/common/notification"
"Open_IM/internal/common/rpcserver"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
pbFriend "Open_IM/pkg/proto/friend"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
pbfriend "Open_IM/pkg/proto/friend"
"Open_IM/pkg/utils"
"context"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/OpenIMSDK/openKeeper"
"google.golang.org/grpc"
)
type friendServer struct {
*rpcserver.RpcServer
controller.FriendInterface
controller.BlackInterface
notification *notification.Check
userCheck *check.UserCheck
notification *notification.Check
userCheck *check.UserCheck
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
}
func NewFriendServer(port int) *friendServer {
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
mysql, err := relation.NewGormDB()
if err != nil {
panic(err)
return err
}
//mysql init
var mysql relation.Mysql
var model relation.FriendGorm
err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendModel{})
if err != nil {
panic("db init err:" + err.Error())
}
err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendRequestModel{})
if err != nil {
panic("db init err:" + err.Error())
}
err = mysql.InitConn().AutoMigrateModel(&relationTb.BlackModel{})
if err != nil {
panic("db init err:" + err.Error())
}
if mysql.GormConn() != nil {
model.DB = mysql.GormConn()
} else {
panic("db init err:" + "conn is nil")
}
return &friendServer{
RpcServer: r,
FriendInterface: controller.NewFriendController(model.DB),
BlackInterface: controller.NewBlackController(model.DB),
}
}
func (s *friendServer) Run() {
operationID := utils.OperationIDGenerator()
log.NewInfo(operationID, "friendServer run...")
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
if err != nil {
panic(err)
}
log.NewInfo(operationID, "listen ok ", address)
defer listener.Close()
//grpc server
var grpcOpts []grpc.ServerOption
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(middleware.RpcServerInterceptor))
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbFriend.RegisterFriendServer(srv, s)
err = srv.Serve(listener)
if err != nil {
log.NewError(operationID, "Serve failed ", err.Error(), listener)
return
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
return err
}
pbfriend.RegisterFriendServer(server, &friendServer{
FriendInterface: controller.NewFriendController(mysql),
BlackInterface: controller.NewBlackController(mysql),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
})
return nil
}
// ok
func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.ApplyToAddFriendReq) (resp *pbFriend.ApplyToAddFriendResp, err error) {
resp = &pbFriend.ApplyToAddFriendResp{}
func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) {
resp = &pbfriend.ApplyToAddFriendResp{}
if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil {
return nil, err
}
@ -125,8 +74,8 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.Apply
}
// ok
func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFriendReq) (resp *pbFriend.ImportFriendResp, err error) {
resp = &pbFriend.ImportFriendResp{}
func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFriendReq) (resp *pbfriend.ImportFriendResp, err error) {
resp = &pbfriend.ImportFriendResp{}
if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err
}
@ -148,8 +97,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFr
}
// ok
func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.RespondFriendApplyReq) (resp *pbFriend.RespondFriendApplyResp, err error) {
resp = &pbFriend.RespondFriendApplyResp{}
func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.RespondFriendApplyReq) (resp *pbfriend.RespondFriendApplyResp, err error) {
resp = &pbfriend.RespondFriendApplyResp{}
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
return nil, err
}
@ -174,8 +123,8 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.Res
}
// ok
func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFriendReq) (resp *pbFriend.DeleteFriendResp, err error) {
resp = &pbFriend.DeleteFriendResp{}
func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendReq) (resp *pbfriend.DeleteFriendResp, err error) {
resp = &pbfriend.DeleteFriendResp{}
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
return nil, err
}
@ -191,8 +140,8 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri
}
// ok
func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFriendRemarkReq) (resp *pbFriend.SetFriendRemarkResp, err error) {
resp = &pbFriend.SetFriendRemarkResp{}
func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) {
resp = &pbfriend.SetFriendRemarkResp{}
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
return nil, err
}
@ -208,8 +157,8 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri
}
// ok
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) {
resp = &pbFriend.GetDesignatedFriendsResp{}
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) {
resp = &pbfriend.GetDesignatedFriendsResp{}
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
@ -226,8 +175,8 @@ func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.G
}
// ok 获取接收到的好友申请(即别人主动申请的)
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyToReq) (resp *pbFriend.GetPaginationFriendsApplyToResp, err error) {
resp = &pbFriend.GetPaginationFriendsApplyToResp{}
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq) (resp *pbfriend.GetPaginationFriendsApplyToResp, err error) {
resp = &pbfriend.GetPaginationFriendsApplyToResp{}
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
@ -244,8 +193,8 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbF
}
// ok 获取主动发出去的好友申请列表
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyFromReq) (resp *pbFriend.GetPaginationFriendsApplyFromResp, err error) {
resp = &pbFriend.GetPaginationFriendsApplyFromResp{}
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq) (resp *pbfriend.GetPaginationFriendsApplyFromResp, err error) {
resp = &pbfriend.GetPaginationFriendsApplyFromResp{}
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
@ -262,8 +211,8 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p
}
// ok
func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) (resp *pbFriend.IsFriendResp, err error) {
resp = &pbFriend.IsFriendResp{}
func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq) (resp *pbfriend.IsFriendResp, err error) {
resp = &pbfriend.IsFriendResp{}
resp.InUser1Friends, resp.InUser2Friends, err = s.FriendInterface.CheckIn(ctx, req.UserID1, req.UserID2)
if err != nil {
return nil, err
@ -272,8 +221,8 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq)
}
// ok
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbFriend.GetPaginationFriendsReq) (resp *pbFriend.GetPaginationFriendsResp, err error) {
resp = &pbFriend.GetPaginationFriendsResp{}
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
resp = &pbfriend.GetPaginationFriendsResp{}
if utils.Duplicate(req.FriendUserIDs) {
return nil, constant.ErrArgs.Wrap("friend userID repeated")
}

View File

@ -4,184 +4,46 @@ import (
"Open_IM/internal/common/check"
"Open_IM/internal/common/convert"
"Open_IM/internal/common/notification"
"Open_IM/internal/common/rpcserver"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
tablerelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/log"
prome "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/sdkws"
pbuser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/OpenIMSDK/openKeeper"
"google.golang.org/grpc"
)
type userServer struct {
*rpcserver.RpcServer
controller.UserInterface
notification *notification.Check
userCheck *check.UserCheck
notification *notification.Check
userCheck *check.UserCheck
ConversationChecker *check.ConversationChecker
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
}
func NewUserServer(port int) *userServer {
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
mysql, err := relation.NewGormDB()
if err != nil {
panic(err)
return err
}
//mysql init
var mysql relation.Mysql
var model relation.UserGorm
err = mysql.InitConn().AutoMigrateModel(&model)
if err != nil {
panic("db init err:" + err.Error())
if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil {
return err
}
if mysql.GormConn() != nil {
model.DB = mysql.GormConn()
} else {
panic("db init err:" + "conn is nil")
}
return &userServer{RpcServer: r, UserInterface: controller.NewUserController(model.DB)}
pbuser.RegisterUserServer(server, &userServer{
UserInterface: controller.NewUserController(mysql),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
})
return nil
}
func (s *userServer) Run() {
operationID := utils.OperationIDGenerator()
log.NewInfo(operationID, "rpc user start...")
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
if err != nil {
panic(err)
}
log.NewInfo(operationID, "listen ok ", address)
defer listener.Close()
//grpc server
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
prome.NewGrpcRequestCounter()
prome.NewGrpcRequestFailedCounter()
prome.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//Service registers with etcd
pbuser.RegisterUserServer(srv, s)
err = srv.Serve(listener)
if err != nil {
panic(err)
}
log.NewInfo(operationID, "rpc user success")
}
// ok
//func (s *userServer) SyncJoinedGroupMemberFaceURL(ctx context.Context, userID string, faceURL string, operationID string, opUserID string) {
// members, err := s.GetJoinedGroupMembers(ctx, userID)
// if err != nil {
// return
// }
// groupIDs := make([]string, 0)
// for _, v := range members {
// groupIDs = append(groupIDs, v.GroupID)
// }
// if s.SetGroupMemberInfo(ctx, "", faceURL, "", 0, groupIDs, userID) != nil {
// return
// }
// for _, v := range groupIDs {
// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID)
// }
//}
// ok
//func (s *userServer) SyncJoinedGroupMemberNickname(ctx context.Context, userID string, newNickname, oldNickname string, operationID string, opUserID string) {
// members, err := s.GetJoinedGroupMembers(ctx, userID)
// if err != nil {
// return
// }
// groupIDs := make([]string, 0)
// for _, v := range members {
// if v.Nickname == oldNickname {
// groupIDs = append(groupIDs, v.GroupID)
// }
// }
// s.SetGroupMemberInfo(ctx, newNickname, "", "", 0, groupIDs, userID)
// for _, v := range groupIDs {
// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID)
// }
//}
// 设置群头像
//func (s *userServer) SetGroupMemberInfo(ctx context.Context, nickname, faceURL, ex string, roleLevel int32, groupIDs []string, userID string) (err error) {
//
// req := pbgroup.SetGroupMemberInfo{UserID: userID}
// if nickname != "" {
// req.Nickname = &wrappers.StringValue{Value: nickname}
// }
// if faceURL != "" {
// req.FaceURL = &wrappers.StringValue{Value: faceURL}
// }
// if ex != "" {
// req.Ex = &wrappers.StringValue{Value: ex}
// }
// if roleLevel != 0 {
// req.RoleLevel = &wrappers.Int32Value{Value: roleLevel}
// }
//
// setGroupMemberInfoReq := &pbgroup.SetGroupMemberInfoReq{}
// for _, v := range groupIDs {
// req.GroupID = v
// setGroupMemberInfoReq.Members = append(setGroupMemberInfoReq.Members, &req)
// }
// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName)
// if err != nil {
// return err
// }
// client := group.NewGroupClient(conn)
// _, err = client.SetGroupMemberInfo(ctx, setGroupMemberInfoReq)
// return
//}
// 获取加入的群成员信息
//func (s *userServer) GetJoinedGroupMembers(ctx context.Context, userID string) (members []*sdkws.GroupMemberFullInfo, err error) {
// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName)
// if err != nil {
// return nil, err
// }
//
// client := group.NewGroupClient(conn)
// for {
// idx := int32(0)
// req := pbgroup.GetJoinedGroupListReq{FromUserID: userID, Pagination: &sdkws.RequestPagination{PageNumber: idx, ShowNumber: constant.ShowNumber}}
// resp, err := client.GetJoinedGroupList(ctx, &req)
// if err != nil {
// return nil, err
// }
// groupIDs := make([]string, 0)
//
// for _, v := range resp.Groups {
// groupIDs = append(groupIDs, v.GroupID)
// }
//
// client.GetGroupMembersInfo()
//
// if len(resp.Groups) < constant.ShowNumber {
// break
// }
// idx++
// }
//
// return
//}
// ok
func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) {
resp = &pbuser.GetDesignateUsersResp{}