mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-10 04:59:49 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
83950263ad
@ -1,26 +1,11 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
rpcAuth "Open_IM/internal/rpc/auth"
|
"Open_IM/internal/rpc/auth"
|
||||||
|
"Open_IM/internal/startrpc"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defaultPorts := config.Config.RpcPort.OpenImAuthPort
|
startrpc.Start(config.Config.RpcPort.OpenImAuthPort, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Prometheus.AuthPrometheusPort, auth.Start)
|
||||||
rpcPort := flag.Int("port", defaultPorts[0], "RpcToken default listen port 10800")
|
|
||||||
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.AuthPrometheusPort[0], "authPrometheusPort default listen port")
|
|
||||||
flag.Parse()
|
|
||||||
fmt.Println("start auth rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
|
|
||||||
rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort)
|
|
||||||
go func() {
|
|
||||||
err := promePkg.StartPromeSrv(*prometheusPort)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
rpcServer.Run()
|
|
||||||
}
|
}
|
||||||
|
@ -2,25 +2,10 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/rpc/friend"
|
"Open_IM/internal/rpc/friend"
|
||||||
|
"Open_IM/internal/startrpc"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defaultPorts := config.Config.RpcPort.OpenImFriendPort
|
startrpc.Start(config.Config.RpcPort.OpenImFriendPort[0], config.Config.RpcRegisterName.OpenImFriendName, config.Config.Prometheus.FriendPrometheusPort[0], friend.Start)
|
||||||
rpcPort := flag.Int("port", defaultPorts[0], "get RpcFriendPort from cmd,default 12000 as port")
|
|
||||||
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.FriendPrometheusPort[0], "friendPrometheusPort default listen port")
|
|
||||||
flag.Parse()
|
|
||||||
fmt.Println("start friend rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
|
|
||||||
rpcServer := friend.NewFriendServer(*rpcPort)
|
|
||||||
go func() {
|
|
||||||
err := promePkg.StartPromeSrv(*prometheusPort)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
rpcServer.Run()
|
|
||||||
}
|
}
|
||||||
|
@ -7,5 +7,5 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
startrpc.Start(config.Config.RpcPort.OpenImGroupPort[0], config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort[0], group.Start)
|
startrpc.Start(config.Config.RpcPort.OpenImGroupPort, config.Config.RpcRegisterName.OpenImGroupName, config.Config.Prometheus.GroupPrometheusPort, group.Start)
|
||||||
}
|
}
|
||||||
|
@ -2,25 +2,10 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/rpc/msg"
|
"Open_IM/internal/rpc/msg"
|
||||||
|
"Open_IM/internal/startrpc"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defaultPorts := config.Config.RpcPort.OpenImMessagePort
|
startrpc.Start(config.Config.RpcPort.OpenImMessagePort, config.Config.RpcRegisterName.OpenImMsgName, config.Config.Prometheus.AuthPrometheusPort, msg.Start)
|
||||||
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
|
|
||||||
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessagePrometheusPort[0], "msgPrometheusPort default listen port")
|
|
||||||
flag.Parse()
|
|
||||||
fmt.Println("start msg rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
|
|
||||||
rpcServer := msg.NewRpcChatServer(*rpcPort)
|
|
||||||
go func() {
|
|
||||||
err := promePkg.StartPromeSrv(*prometheusPort)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
rpcServer.Run()
|
|
||||||
}
|
}
|
||||||
|
@ -2,25 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/rpc/user"
|
"Open_IM/internal/rpc/user"
|
||||||
|
"Open_IM/internal/startrpc"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defaultPorts := config.Config.RpcPort.OpenImUserPort
|
|
||||||
rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port")
|
startrpc.Start(config.Config.RpcPort.OpenImUserPort[0], config.Config.RpcRegisterName.OpenImUserName, config.Config.Prometheus.UserPrometheusPort[0], user.Start)
|
||||||
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.UserPrometheusPort[0], "userPrometheusPort default listen port")
|
|
||||||
flag.Parse()
|
|
||||||
fmt.Println("start user rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
|
|
||||||
rpcServer := user.NewUserServer(*rpcPort)
|
|
||||||
go func() {
|
|
||||||
err := promePkg.StartPromeSrv(*prometheusPort)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
rpcServer.Run()
|
|
||||||
}
|
}
|
||||||
|
2
go.mod
2
go.mod
@ -41,7 +41,7 @@ require (
|
|||||||
github.com/stretchr/testify v1.8.1
|
github.com/stretchr/testify v1.8.1
|
||||||
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe
|
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe
|
||||||
github.com/swaggo/gin-swagger v1.5.0
|
github.com/swaggo/gin-swagger v1.5.0
|
||||||
github.com/swaggo/swag v1.8.3
|
github.com/swaggo/swag v1.8.3 // indirect
|
||||||
github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca
|
github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca
|
||||||
go.etcd.io/etcd/client/v3 v3.5.6 // indirect
|
go.etcd.io/etcd/client/v3 v3.5.6 // indirect
|
||||||
go.mongodb.org/mongo-driver v1.8.3
|
go.mongodb.org/mongo-driver v1.8.3
|
||||||
|
@ -46,27 +46,27 @@ func (db *DBFriend) DB2PB(ctx context.Context, friends []*relation.FriendModel)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, v := range friends {
|
for _, v := range friends {
|
||||||
pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
|
pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
|
||||||
utils.CopyStructFields(pbFriend, users[v.OwnerUserID])
|
utils.CopyStructFields(pbfriend, users[v.OwnerUserID])
|
||||||
utils.CopyStructFields(pbFriend.FriendUser, users[v.FriendUserID])
|
utils.CopyStructFields(pbfriend.FriendUser, users[v.FriendUserID])
|
||||||
pbFriend.CreateTime = v.CreateTime.Unix()
|
pbfriend.CreateTime = v.CreateTime.Unix()
|
||||||
pbFriend.FriendUser.CreateTime = v.CreateTime.Unix()
|
pbfriend.FriendUser.CreateTime = v.CreateTime.Unix()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DBFriend) Convert(ctx context.Context) (*sdk.FriendInfo, error) {
|
func (db *DBFriend) Convert(ctx context.Context) (*sdk.FriendInfo, error) {
|
||||||
pbFriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
|
pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
|
||||||
utils.CopyStructFields(pbFriend, db)
|
utils.CopyStructFields(pbfriend, db)
|
||||||
user, err := db.userCheck.GetUsersInfo(ctx, db.FriendUserID)
|
user, err := db.userCheck.GetUsersInfo(ctx, db.FriendUserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
utils.CopyStructFields(pbFriend.FriendUser, user)
|
utils.CopyStructFields(pbfriend.FriendUser, user)
|
||||||
pbFriend.CreateTime = db.CreateTime.Unix()
|
pbfriend.CreateTime = db.CreateTime.Unix()
|
||||||
|
|
||||||
pbFriend.FriendUser.CreateTime = db.CreateTime.Unix()
|
pbfriend.FriendUser.CreateTime = db.CreateTime.Unix()
|
||||||
return pbFriend, nil
|
return pbfriend, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *PBFriend) Convert() (*relation.FriendModel, error) {
|
func (pb *PBFriend) Convert() (*relation.FriendModel, error) {
|
||||||
|
@ -2,68 +2,45 @@ package auth
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/common/check"
|
"Open_IM/internal/common/check"
|
||||||
"Open_IM/internal/common/rpcserver"
|
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db/cache"
|
"Open_IM/pkg/common/db/cache"
|
||||||
"Open_IM/pkg/common/db/controller"
|
"Open_IM/pkg/common/db/controller"
|
||||||
|
"Open_IM/pkg/common/db/relation"
|
||||||
|
relationTb "Open_IM/pkg/common/db/table/relation"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
|
||||||
"Open_IM/pkg/common/tokenverify"
|
"Open_IM/pkg/common/tokenverify"
|
||||||
"Open_IM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
|
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
||||||
pbAuth "Open_IM/pkg/proto/auth"
|
pbAuth "Open_IM/pkg/proto/auth"
|
||||||
pbRelay "Open_IM/pkg/proto/relay"
|
pbRelay "Open_IM/pkg/proto/relay"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
"github.com/OpenIMSDK/openKeeper"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewRpcAuthServer(port int) *rpcAuth {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
mysql, err := relation.NewGormDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
var redis cache.RedisClient
|
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
|
||||||
redis.InitRedis()
|
return err
|
||||||
return &rpcAuth{
|
|
||||||
RpcServer: r,
|
|
||||||
AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
|
|
||||||
}
|
}
|
||||||
|
redis, err := cache.NewRedis()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pbAuth.RegisterAuthServer(server, &authServer{
|
||||||
|
userCheck: check.NewUserCheck(client),
|
||||||
|
RegisterCenter: client,
|
||||||
|
AuthInterface: controller.NewAuthController(redis.GetClient(), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
|
||||||
|
})
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcAuth) Run() {
|
func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
|
||||||
operationID := utils.OperationIDGenerator()
|
|
||||||
log.NewInfo(operationID, "rpc auth start...")
|
|
||||||
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
log.NewInfo(operationID, "listen network success ", listener, address)
|
|
||||||
var grpcOpts []grpc.ServerOption
|
|
||||||
if config.Config.Prometheus.Enable {
|
|
||||||
promePkg.NewGrpcRequestCounter()
|
|
||||||
promePkg.NewGrpcRequestFailedCounter()
|
|
||||||
promePkg.NewGrpcRequestSuccessCounter()
|
|
||||||
promePkg.NewUserRegisterCounter()
|
|
||||||
promePkg.NewUserLoginCounter()
|
|
||||||
grpcOpts = append(grpcOpts, []grpc.ServerOption{
|
|
||||||
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
|
|
||||||
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
|
|
||||||
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
|
|
||||||
}...)
|
|
||||||
}
|
|
||||||
srv := grpc.NewServer(grpcOpts...)
|
|
||||||
defer srv.GracefulStop()
|
|
||||||
pbAuth.RegisterAuthServer(srv, s)
|
|
||||||
err = srv.Serve(listener)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
log.NewInfo(operationID, "rpc auth ok")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
|
|
||||||
resp := pbAuth.UserTokenResp{}
|
resp := pbAuth.UserTokenResp{}
|
||||||
if _, err := s.userCheck.GetUsersInfo(ctx, req.UserID); err != nil {
|
if _, err := s.userCheck.GetUsersInfo(ctx, req.UserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -77,7 +54,7 @@ func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbA
|
|||||||
return &resp, nil
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
|
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
|
||||||
claims, err = tokenverify.GetClaimFromToken(tokensString)
|
claims, err = tokenverify.GetClaimFromToken(tokensString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, utils.Wrap(err, "")
|
return nil, utils.Wrap(err, "")
|
||||||
@ -102,7 +79,7 @@ func (s *rpcAuth) parseToken(ctx context.Context, tokensString string) (claims *
|
|||||||
return nil, constant.ErrTokenNotExist.Wrap()
|
return nil, constant.ErrTokenNotExist.Wrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) {
|
func (s *authServer) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (resp *pbAuth.ParseTokenResp, err error) {
|
||||||
resp = &pbAuth.ParseTokenResp{}
|
resp = &pbAuth.ParseTokenResp{}
|
||||||
claims, err := s.parseToken(ctx, req.Token)
|
claims, err := s.parseToken(ctx, req.Token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -114,7 +91,7 @@ func (s *rpcAuth) ParseToken(ctx context.Context, req *pbAuth.ParseTokenReq) (re
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) {
|
func (s *authServer) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (*pbAuth.ForceLogoutResp, error) {
|
||||||
resp := pbAuth.ForceLogoutResp{}
|
resp := pbAuth.ForceLogoutResp{}
|
||||||
if err := tokenverify.CheckAdmin(ctx); err != nil {
|
if err := tokenverify.CheckAdmin(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -125,7 +102,7 @@ func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) (
|
|||||||
return &resp, nil
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error {
|
func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error {
|
||||||
grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName)
|
grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -140,8 +117,8 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in
|
|||||||
return constant.ErrInternalServer.Wrap()
|
return constant.ErrInternalServer.Wrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
type rpcAuth struct {
|
type authServer struct {
|
||||||
*rpcserver.RpcServer
|
|
||||||
controller.AuthInterface
|
controller.AuthInterface
|
||||||
userCheck *check.UserCheck
|
userCheck *check.UserCheck
|
||||||
|
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||||
}
|
}
|
||||||
|
@ -4,100 +4,49 @@ import (
|
|||||||
"Open_IM/internal/common/check"
|
"Open_IM/internal/common/check"
|
||||||
"Open_IM/internal/common/convert"
|
"Open_IM/internal/common/convert"
|
||||||
"Open_IM/internal/common/notification"
|
"Open_IM/internal/common/notification"
|
||||||
"Open_IM/internal/common/rpcserver"
|
|
||||||
"Open_IM/pkg/common/config"
|
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db/controller"
|
"Open_IM/pkg/common/db/controller"
|
||||||
"Open_IM/pkg/common/db/relation"
|
"Open_IM/pkg/common/db/relation"
|
||||||
relationTb "Open_IM/pkg/common/db/table/relation"
|
relationTb "Open_IM/pkg/common/db/table/relation"
|
||||||
"Open_IM/pkg/common/log"
|
|
||||||
"Open_IM/pkg/common/middleware"
|
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
|
||||||
"Open_IM/pkg/common/tokenverify"
|
"Open_IM/pkg/common/tokenverify"
|
||||||
"Open_IM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
pbFriend "Open_IM/pkg/proto/friend"
|
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
||||||
|
pbfriend "Open_IM/pkg/proto/friend"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
"github.com/OpenIMSDK/openKeeper"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type friendServer struct {
|
type friendServer struct {
|
||||||
*rpcserver.RpcServer
|
|
||||||
controller.FriendInterface
|
controller.FriendInterface
|
||||||
controller.BlackInterface
|
controller.BlackInterface
|
||||||
notification *notification.Check
|
notification *notification.Check
|
||||||
userCheck *check.UserCheck
|
userCheck *check.UserCheck
|
||||||
|
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFriendServer(port int) *friendServer {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
mysql, err := relation.NewGormDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
//mysql init
|
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
|
||||||
var mysql relation.Mysql
|
return err
|
||||||
var model relation.FriendGorm
|
|
||||||
err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendModel{})
|
|
||||||
if err != nil {
|
|
||||||
panic("db init err:" + err.Error())
|
|
||||||
}
|
|
||||||
err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendRequestModel{})
|
|
||||||
if err != nil {
|
|
||||||
panic("db init err:" + err.Error())
|
|
||||||
}
|
|
||||||
err = mysql.InitConn().AutoMigrateModel(&relationTb.BlackModel{})
|
|
||||||
if err != nil {
|
|
||||||
panic("db init err:" + err.Error())
|
|
||||||
}
|
|
||||||
if mysql.GormConn() != nil {
|
|
||||||
model.DB = mysql.GormConn()
|
|
||||||
} else {
|
|
||||||
panic("db init err:" + "conn is nil")
|
|
||||||
}
|
|
||||||
return &friendServer{
|
|
||||||
RpcServer: r,
|
|
||||||
FriendInterface: controller.NewFriendController(model.DB),
|
|
||||||
BlackInterface: controller.NewBlackController(model.DB),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *friendServer) Run() {
|
|
||||||
operationID := utils.OperationIDGenerator()
|
|
||||||
log.NewInfo(operationID, "friendServer run...")
|
|
||||||
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.NewInfo(operationID, "listen ok ", address)
|
|
||||||
defer listener.Close()
|
|
||||||
//grpc server
|
|
||||||
var grpcOpts []grpc.ServerOption
|
|
||||||
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(middleware.RpcServerInterceptor))
|
|
||||||
if config.Config.Prometheus.Enable {
|
|
||||||
promePkg.NewGrpcRequestCounter()
|
|
||||||
promePkg.NewGrpcRequestFailedCounter()
|
|
||||||
promePkg.NewGrpcRequestSuccessCounter()
|
|
||||||
grpcOpts = append(grpcOpts, []grpc.ServerOption{
|
|
||||||
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
|
|
||||||
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
|
|
||||||
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
|
|
||||||
}...)
|
|
||||||
}
|
|
||||||
srv := grpc.NewServer(grpcOpts...)
|
|
||||||
defer srv.GracefulStop()
|
|
||||||
pbFriend.RegisterFriendServer(srv, s)
|
|
||||||
err = srv.Serve(listener)
|
|
||||||
if err != nil {
|
|
||||||
log.NewError(operationID, "Serve failed ", err.Error(), listener)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
pbfriend.RegisterFriendServer(server, &friendServer{
|
||||||
|
FriendInterface: controller.NewFriendController(mysql),
|
||||||
|
BlackInterface: controller.NewBlackController(mysql),
|
||||||
|
notification: notification.NewCheck(client),
|
||||||
|
userCheck: check.NewUserCheck(client),
|
||||||
|
RegisterCenter: client,
|
||||||
|
})
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.ApplyToAddFriendReq) (resp *pbFriend.ApplyToAddFriendResp, err error) {
|
func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) (resp *pbfriend.ApplyToAddFriendResp, err error) {
|
||||||
resp = &pbFriend.ApplyToAddFriendResp{}
|
resp = &pbfriend.ApplyToAddFriendResp{}
|
||||||
if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil {
|
if err := tokenverify.CheckAccessV3(ctx, req.FromUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -125,8 +74,8 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbFriend.Apply
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFriendReq) (resp *pbFriend.ImportFriendResp, err error) {
|
func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFriendReq) (resp *pbfriend.ImportFriendResp, err error) {
|
||||||
resp = &pbFriend.ImportFriendResp{}
|
resp = &pbfriend.ImportFriendResp{}
|
||||||
if err := tokenverify.CheckAdmin(ctx); err != nil {
|
if err := tokenverify.CheckAdmin(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -148,8 +97,8 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFr
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.RespondFriendApplyReq) (resp *pbFriend.RespondFriendApplyResp, err error) {
|
func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.RespondFriendApplyReq) (resp *pbfriend.RespondFriendApplyResp, err error) {
|
||||||
resp = &pbFriend.RespondFriendApplyResp{}
|
resp = &pbfriend.RespondFriendApplyResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -174,8 +123,8 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.Res
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFriendReq) (resp *pbFriend.DeleteFriendResp, err error) {
|
func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendReq) (resp *pbfriend.DeleteFriendResp, err error) {
|
||||||
resp = &pbFriend.DeleteFriendResp{}
|
resp = &pbfriend.DeleteFriendResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -191,8 +140,8 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFriendRemarkReq) (resp *pbFriend.SetFriendRemarkResp, err error) {
|
func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) {
|
||||||
resp = &pbFriend.SetFriendRemarkResp{}
|
resp = &pbfriend.SetFriendRemarkResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.OwnerUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -208,8 +157,8 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) {
|
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) {
|
||||||
resp = &pbFriend.GetDesignatedFriendsResp{}
|
resp = &pbfriend.GetDesignatedFriendsResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -226,8 +175,8 @@ func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.G
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok 获取接收到的好友申请(即别人主动申请的)
|
// ok 获取接收到的好友申请(即别人主动申请的)
|
||||||
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyToReq) (resp *pbFriend.GetPaginationFriendsApplyToResp, err error) {
|
func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyToReq) (resp *pbfriend.GetPaginationFriendsApplyToResp, err error) {
|
||||||
resp = &pbFriend.GetPaginationFriendsApplyToResp{}
|
resp = &pbfriend.GetPaginationFriendsApplyToResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -244,8 +193,8 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbF
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok 获取主动发出去的好友申请列表
|
// ok 获取主动发出去的好友申请列表
|
||||||
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbFriend.GetPaginationFriendsApplyFromReq) (resp *pbFriend.GetPaginationFriendsApplyFromResp, err error) {
|
func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *pbfriend.GetPaginationFriendsApplyFromReq) (resp *pbfriend.GetPaginationFriendsApplyFromResp, err error) {
|
||||||
resp = &pbFriend.GetPaginationFriendsApplyFromResp{}
|
resp = &pbfriend.GetPaginationFriendsApplyFromResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -262,8 +211,8 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) (resp *pbFriend.IsFriendResp, err error) {
|
func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq) (resp *pbfriend.IsFriendResp, err error) {
|
||||||
resp = &pbFriend.IsFriendResp{}
|
resp = &pbfriend.IsFriendResp{}
|
||||||
resp.InUser1Friends, resp.InUser2Friends, err = s.FriendInterface.CheckIn(ctx, req.UserID1, req.UserID2)
|
resp.InUser1Friends, resp.InUser2Friends, err = s.FriendInterface.CheckIn(ctx, req.UserID1, req.UserID2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -272,8 +221,8 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbFriend.GetPaginationFriendsReq) (resp *pbFriend.GetPaginationFriendsResp, err error) {
|
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
|
||||||
resp = &pbFriend.GetPaginationFriendsResp{}
|
resp = &pbfriend.GetPaginationFriendsResp{}
|
||||||
if utils.Duplicate(req.FriendUserIDs) {
|
if utils.Duplicate(req.FriendUserIDs) {
|
||||||
return nil, constant.ErrArgs.Wrap("friend userID repeated")
|
return nil, constant.ErrArgs.Wrap("friend userID repeated")
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"google.golang.org/protobuf/types/known/wrapperspb"
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) {
|
func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) {
|
||||||
@ -21,72 +22,44 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (
|
|||||||
defer func() {
|
defer func() {
|
||||||
tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "req", req)
|
tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "req", req)
|
||||||
}()
|
}()
|
||||||
operationID := tracelog.GetOperationID(ctx)
|
cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{
|
||||||
commonCallbackReq := &callbackstruct.CallbackBeforeCreateGroupReq{
|
|
||||||
CallbackCommand: constant.CallbackBeforeCreateGroupCommand,
|
CallbackCommand: constant.CallbackBeforeCreateGroupCommand,
|
||||||
OperationID: operationID,
|
OperationID: tracelog.GetOperationID(ctx),
|
||||||
GroupInfo: *req.GroupInfo,
|
GroupInfo: *req.GroupInfo,
|
||||||
}
|
}
|
||||||
commonCallbackReq.InitMemberList = append(commonCallbackReq.InitMemberList, &apistruct.GroupAddMemberInfo{
|
cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{
|
||||||
UserID: req.OwnerUserID,
|
UserID: req.OwnerUserID,
|
||||||
RoleLevel: constant.GroupOwner,
|
RoleLevel: constant.GroupOwner,
|
||||||
})
|
})
|
||||||
for _, userID := range req.AdminUserIDs {
|
for _, userID := range req.AdminUserIDs {
|
||||||
commonCallbackReq.InitMemberList = append(commonCallbackReq.InitMemberList, &apistruct.GroupAddMemberInfo{
|
cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
RoleLevel: constant.GroupAdmin,
|
RoleLevel: constant.GroupAdmin,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
for _, userID := range req.AdminUserIDs {
|
for _, userID := range req.AdminUserIDs {
|
||||||
commonCallbackReq.InitMemberList = append(commonCallbackReq.InitMemberList, &apistruct.GroupAddMemberInfo{
|
cbReq.InitMemberList = append(cbReq.InitMemberList, &apistruct.GroupAddMemberInfo{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
RoleLevel: constant.GroupOrdinaryUsers,
|
RoleLevel: constant.GroupOrdinaryUsers,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
resp := &callbackstruct.CallbackBeforeCreateGroupResp{
|
resp := &callbackstruct.CallbackBeforeCreateGroupResp{}
|
||||||
CommonCallbackResp: &callbackstruct.CommonCallbackResp{OperationID: operationID},
|
err = http.CallBackPostReturnV2(config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup)
|
||||||
}
|
|
||||||
err = http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeCreateGroupCommand, commonCallbackReq, resp, config.Config.Callback.CallbackBeforeCreateGroup)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID)
|
||||||
if resp.GroupID != nil {
|
utils.NotNilReplace(&req.GroupInfo.GroupName, resp.GroupName)
|
||||||
req.GroupInfo.GroupID = *resp.GroupID
|
utils.NotNilReplace(&req.GroupInfo.Notification, resp.Notification)
|
||||||
}
|
utils.NotNilReplace(&req.GroupInfo.Introduction, resp.Introduction)
|
||||||
if resp.GroupName != nil {
|
utils.NotNilReplace(&req.GroupInfo.FaceURL, resp.FaceURL)
|
||||||
req.GroupInfo.GroupName = *resp.GroupName
|
utils.NotNilReplace(&req.GroupInfo.OwnerUserID, resp.OwnerUserID)
|
||||||
}
|
utils.NotNilReplace(&req.GroupInfo.Ex, resp.Ex)
|
||||||
if resp.Notification != nil {
|
utils.NotNilReplace(&req.GroupInfo.Status, resp.Status)
|
||||||
req.GroupInfo.Notification = *resp.Notification
|
utils.NotNilReplace(&req.GroupInfo.CreatorUserID, resp.CreatorUserID)
|
||||||
}
|
utils.NotNilReplace(&req.GroupInfo.GroupType, resp.GroupType)
|
||||||
if resp.Introduction != nil {
|
utils.NotNilReplace(&req.GroupInfo.NeedVerification, resp.NeedVerification)
|
||||||
req.GroupInfo.Introduction = *resp.Introduction
|
utils.NotNilReplace(&req.GroupInfo.LookMemberInfo, resp.LookMemberInfo)
|
||||||
}
|
|
||||||
if resp.FaceURL != nil {
|
|
||||||
req.GroupInfo.FaceURL = *resp.FaceURL
|
|
||||||
}
|
|
||||||
if resp.OwnerUserID != nil {
|
|
||||||
req.GroupInfo.OwnerUserID = *resp.OwnerUserID
|
|
||||||
}
|
|
||||||
if resp.Ex != nil {
|
|
||||||
req.GroupInfo.Ex = *resp.Ex
|
|
||||||
}
|
|
||||||
if resp.Status != nil {
|
|
||||||
req.GroupInfo.Status = *resp.Status
|
|
||||||
}
|
|
||||||
if resp.CreatorUserID != nil {
|
|
||||||
req.GroupInfo.CreatorUserID = *resp.CreatorUserID
|
|
||||||
}
|
|
||||||
if resp.GroupType != nil {
|
|
||||||
req.GroupInfo.GroupType = *resp.GroupType
|
|
||||||
}
|
|
||||||
if resp.NeedVerification != nil {
|
|
||||||
req.GroupInfo.NeedVerification = *resp.NeedVerification
|
|
||||||
}
|
|
||||||
if resp.LookMemberInfo != nil {
|
|
||||||
req.GroupInfo.LookMemberInfo = *resp.LookMemberInfo
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,39 +70,26 @@ func CallbackBeforeMemberJoinGroup(ctx context.Context, groupMember *relation.Gr
|
|||||||
defer func() {
|
defer func() {
|
||||||
tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "groupMember", *groupMember, "groupEx", groupEx)
|
tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "groupMember", *groupMember, "groupEx", groupEx)
|
||||||
}()
|
}()
|
||||||
operationID := tracelog.GetOperationID(ctx)
|
callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
|
||||||
callbackResp := callbackstruct.CommonCallbackResp{OperationID: operationID}
|
|
||||||
callbackReq := callbackstruct.CallbackBeforeMemberJoinGroupReq{
|
|
||||||
CallbackCommand: constant.CallbackBeforeMemberJoinGroupCommand,
|
CallbackCommand: constant.CallbackBeforeMemberJoinGroupCommand,
|
||||||
OperationID: operationID,
|
OperationID: tracelog.GetOperationID(ctx),
|
||||||
GroupID: groupMember.GroupID,
|
GroupID: groupMember.GroupID,
|
||||||
UserID: groupMember.UserID,
|
UserID: groupMember.UserID,
|
||||||
Ex: groupMember.Ex,
|
Ex: groupMember.Ex,
|
||||||
GroupEx: groupEx,
|
GroupEx: groupEx,
|
||||||
}
|
}
|
||||||
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{
|
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{}
|
||||||
CommonCallbackResp: &callbackResp,
|
err = http.CallBackPostReturnV2(config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeMemberJoinGroup)
|
||||||
}
|
|
||||||
err = http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeMemberJoinGroupCommand, callbackReq,
|
|
||||||
resp, config.Config.Callback.CallbackBeforeMemberJoinGroup)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if resp.MuteEndTime != nil {
|
if resp.MuteEndTime != nil {
|
||||||
groupMember.MuteEndTime = utils.UnixSecondToTime(*resp.MuteEndTime)
|
groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime)
|
||||||
}
|
|
||||||
if resp.FaceURL != nil {
|
|
||||||
groupMember.FaceURL = *resp.FaceURL
|
|
||||||
}
|
|
||||||
if resp.Ex != nil {
|
|
||||||
groupMember.Ex = *resp.Ex
|
|
||||||
}
|
|
||||||
if resp.NickName != nil {
|
|
||||||
groupMember.Nickname = *resp.NickName
|
|
||||||
}
|
|
||||||
if resp.RoleLevel != nil {
|
|
||||||
groupMember.RoleLevel = *resp.RoleLevel
|
|
||||||
}
|
}
|
||||||
|
utils.NotNilReplace(&groupMember.FaceURL, resp.FaceURL)
|
||||||
|
utils.NotNilReplace(&groupMember.Ex, resp.Ex)
|
||||||
|
utils.NotNilReplace(&groupMember.Nickname, resp.Nickname)
|
||||||
|
utils.NotNilReplace(&groupMember.RoleLevel, resp.RoleLevel)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,44 +100,40 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe
|
|||||||
defer func() {
|
defer func() {
|
||||||
tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "req", *req)
|
tracelog.SetCtxInfo(ctx, utils.GetFuncName(1), err, "req", *req)
|
||||||
}()
|
}()
|
||||||
operationID := tracelog.GetOperationID(ctx)
|
|
||||||
callbackResp := callbackstruct.CommonCallbackResp{OperationID: operationID}
|
|
||||||
callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{
|
callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{
|
||||||
CallbackCommand: constant.CallbackBeforeSetGroupMemberInfoCommand,
|
CallbackCommand: constant.CallbackBeforeSetGroupMemberInfoCommand,
|
||||||
OperationID: operationID,
|
OperationID: tracelog.GetOperationID(ctx),
|
||||||
GroupID: req.GroupID,
|
GroupID: req.GroupID,
|
||||||
UserID: req.UserID,
|
UserID: req.UserID,
|
||||||
}
|
}
|
||||||
if req.Nickname != nil {
|
if req.Nickname != nil {
|
||||||
callbackReq.Nickname = req.Nickname.Value
|
callbackReq.Nickname = &req.Nickname.Value
|
||||||
}
|
}
|
||||||
if req.FaceURL != nil {
|
if req.FaceURL != nil {
|
||||||
callbackReq.FaceURL = req.FaceURL.Value
|
callbackReq.FaceURL = &req.FaceURL.Value
|
||||||
}
|
}
|
||||||
if req.RoleLevel != nil {
|
if req.RoleLevel != nil {
|
||||||
callbackReq.RoleLevel = req.RoleLevel.Value
|
callbackReq.RoleLevel = &req.RoleLevel.Value
|
||||||
}
|
}
|
||||||
if req.Ex != nil {
|
if req.Ex != nil {
|
||||||
callbackReq.Ex = req.Ex.Value
|
callbackReq.Ex = &req.Ex.Value
|
||||||
}
|
}
|
||||||
resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{
|
resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{}
|
||||||
CommonCallbackResp: &callbackResp,
|
err = http.CallBackPostReturnV2(config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo)
|
||||||
}
|
|
||||||
err = http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetGroupMemberInfoCommand, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if resp.FaceURL != nil {
|
if resp.FaceURL != nil {
|
||||||
req.FaceURL = &wrapperspb.StringValue{Value: *resp.FaceURL}
|
req.FaceURL = wrapperspb.String(*resp.FaceURL)
|
||||||
}
|
}
|
||||||
if resp.Nickname != nil {
|
if resp.Nickname != nil {
|
||||||
req.Nickname = &wrapperspb.StringValue{Value: *resp.Nickname}
|
req.Nickname = wrapperspb.String(*resp.Nickname)
|
||||||
}
|
}
|
||||||
if resp.RoleLevel != nil {
|
if resp.RoleLevel != nil {
|
||||||
req.RoleLevel = &wrapperspb.Int32Value{Value: *resp.RoleLevel}
|
req.RoleLevel = wrapperspb.Int32(*resp.RoleLevel)
|
||||||
}
|
}
|
||||||
if resp.Ex != nil {
|
if resp.Ex != nil {
|
||||||
req.Ex = &wrapperspb.StringValue{Value: *resp.Ex}
|
req.Ex = wrapperspb.String(*resp.Ex)
|
||||||
}
|
}
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -28,11 +28,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
mysql, err := relation.NewGormDB()
|
db, err := relation.NewGormDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := mysql.AutoMigrate(&relationTb.GroupModel{}, &relationTb.GroupMemberModel{}, &relationTb.GroupRequestModel{}); err != nil {
|
if err := db.AutoMigrate(&relationTb.GroupModel{}, &relationTb.GroupMemberModel{}, &relationTb.GroupRequestModel{}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
redis, err := cache.NewRedis()
|
redis, err := cache.NewRedis()
|
||||||
@ -44,7 +44,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
pbGroup.RegisterGroupServer(server, &groupServer{
|
pbGroup.RegisterGroupServer(server, &groupServer{
|
||||||
GroupInterface: controller.NewGroupInterface(mysql, redis.GetClient(), mongo.GetClient()),
|
GroupInterface: controller.NewGroupInterface(controller.NewGroupDatabase(db, redis.GetClient(), mongo.GetClient())),
|
||||||
UserCheck: check.NewUserCheck(client),
|
UserCheck: check.NewUserCheck(client),
|
||||||
ConversationChecker: check.NewConversationChecker(client),
|
ConversationChecker: check.NewConversationChecker(client),
|
||||||
})
|
})
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package msg
|
package msg
|
||||||
|
|
||||||
import (
|
import (
|
||||||
cbApi "Open_IM/pkg/call_back_struct"
|
cb "Open_IM/pkg/callbackstruct"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/http"
|
"Open_IM/pkg/common/http"
|
||||||
@ -11,10 +11,10 @@ import (
|
|||||||
http2 "net/http"
|
http2 "net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cbApi.CallbackBeforeSetMessageReactionExtResp {
|
func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cb.CallbackBeforeSetMessageReactionExtResp {
|
||||||
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
|
callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID}
|
||||||
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
||||||
req := cbApi.CallbackBeforeSetMessageReactionExtReq{
|
req := cb.CallbackBeforeSetMessageReactionExtReq{
|
||||||
OperationID: setReq.OperationID,
|
OperationID: setReq.OperationID,
|
||||||
CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand,
|
CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand,
|
||||||
SourceID: setReq.SourceID,
|
SourceID: setReq.SourceID,
|
||||||
@ -26,7 +26,7 @@ func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensio
|
|||||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||||
}
|
}
|
||||||
resp := &cbApi.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp}
|
resp := &cb.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp}
|
||||||
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
|
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||||
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetMessageReactionExtensionCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetMessageReactionExtensionCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
||||||
callbackResp.ErrCode = http2.StatusInternalServerError
|
callbackResp.ErrCode = http2.StatusInternalServerError
|
||||||
@ -36,8 +36,8 @@ func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensio
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cbApi.CallbackDeleteMessageReactionExtResp {
|
func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cb.CallbackDeleteMessageReactionExtResp {
|
||||||
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
|
callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID}
|
||||||
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
||||||
req := cbApi.CallbackDeleteMessageReactionExtReq{
|
req := cbApi.CallbackDeleteMessageReactionExtReq{
|
||||||
OperationID: setReq.OperationID,
|
OperationID: setReq.OperationID,
|
||||||
@ -58,8 +58,8 @@ func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti
|
|||||||
}
|
}
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cbApi.CallbackGetMessageListReactionExtResp {
|
func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cb.CallbackGetMessageListReactionExtResp {
|
||||||
callbackResp := cbApi.CommonCallbackResp{OperationID: getReq.OperationID}
|
callbackResp := cb.CommonCallbackResp{OperationID: getReq.OperationID}
|
||||||
log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), getReq.String())
|
log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), getReq.String())
|
||||||
req := cbApi.CallbackGetMessageListReactionExtReq{
|
req := cbApi.CallbackGetMessageListReactionExtReq{
|
||||||
OperationID: getReq.OperationID,
|
OperationID: getReq.OperationID,
|
||||||
@ -78,8 +78,8 @@ func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReaction
|
|||||||
}
|
}
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cbApi.CallbackAddMessageReactionExtResp {
|
func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp {
|
||||||
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
|
callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID}
|
||||||
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
||||||
req := cbApi.CallbackAddMessageReactionExtReq{
|
req := cbApi.CallbackAddMessageReactionExtReq{
|
||||||
OperationID: setReq.OperationID,
|
OperationID: setReq.OperationID,
|
||||||
@ -93,7 +93,7 @@ func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensio
|
|||||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||||
}
|
}
|
||||||
resp := &cbApi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp}
|
resp := &cb.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp}
|
||||||
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime)
|
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime)
|
||||||
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
||||||
callbackResp.ErrCode = http2.StatusInternalServerError
|
callbackResp.ErrCode = http2.StatusInternalServerError
|
||||||
|
@ -2,33 +2,23 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/common/check"
|
"Open_IM/internal/common/check"
|
||||||
"Open_IM/internal/common/notification"
|
|
||||||
"Open_IM/internal/common/rpcserver"
|
|
||||||
"Open_IM/pkg/common/config"
|
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
"Open_IM/pkg/common/db/controller"
|
"Open_IM/pkg/common/db/controller"
|
||||||
|
"Open_IM/pkg/common/db/relation"
|
||||||
|
tablerelation "Open_IM/pkg/common/db/table/relation"
|
||||||
|
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
||||||
|
"github.com/OpenIMSDK/openKeeper"
|
||||||
|
|
||||||
"Open_IM/pkg/common/kafka"
|
|
||||||
"Open_IM/pkg/common/log"
|
|
||||||
promePkg "Open_IM/pkg/common/prometheus"
|
promePkg "Open_IM/pkg/common/prometheus"
|
||||||
"Open_IM/pkg/proto/msg"
|
"Open_IM/pkg/proto/msg"
|
||||||
"Open_IM/pkg/utils"
|
|
||||||
"github.com/OpenIMSDK/getcdv3"
|
|
||||||
"net"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type msgServer struct {
|
type msgServer struct {
|
||||||
*rpcserver.RpcServer
|
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||||
MsgInterface controller.MsgInterface
|
MsgInterface controller.MsgInterface
|
||||||
Group *check.GroupChecker
|
Group *check.GroupChecker
|
||||||
User *check.UserCheck
|
User *check.UserCheck
|
||||||
Conversation *check.ConversationChecker
|
Conversation *check.ConversationChecker
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteMsg struct {
|
type deleteMsg struct {
|
||||||
@ -38,39 +28,34 @@ type deleteMsg struct {
|
|||||||
OperationID string
|
OperationID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRpcChatServer(port int) *msgServer {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
log.NewPrivateLog(constant.LogFileName)
|
mysql, err := relation.NewGormDB()
|
||||||
rc := msgServer{
|
if err != nil {
|
||||||
rpcPort: port,
|
return err
|
||||||
rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName,
|
|
||||||
etcdSchema: config.Config.Etcd.EtcdSchema,
|
|
||||||
etcdAddr: config.Config.Etcd.EtcdAddr,
|
|
||||||
dMessageLocker: NewLockerMessage(),
|
|
||||||
}
|
}
|
||||||
rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
|
if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil {
|
||||||
//rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic)
|
return err
|
||||||
rc.delMsgCh = make(chan deleteMsg, 1000)
|
}
|
||||||
return &rc
|
s := &msgServer{
|
||||||
|
Conversation: check.NewConversationChecker(client),
|
||||||
|
User: check.NewUserCheck(client),
|
||||||
|
Group: check.NewGroupChecker(client),
|
||||||
|
//MsgInterface: controller.MsgInterface(),
|
||||||
|
RegisterCenter: client,
|
||||||
|
}
|
||||||
|
s.initPrometheus()
|
||||||
|
msg.RegisterMsgServer(server, s)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpc *rpcChat) initPrometheus() {
|
func (m *msgServer) initPrometheus() {
|
||||||
//sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
|
|
||||||
// Name: "send_msg_success",
|
|
||||||
// Help: "The number of send msg success",
|
|
||||||
//})
|
|
||||||
//sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
|
|
||||||
// Name: "send_msg_failed",
|
|
||||||
// Help: "The number of send msg failed",
|
|
||||||
//})
|
|
||||||
promePkg.NewMsgPullFromRedisSuccessCounter()
|
promePkg.NewMsgPullFromRedisSuccessCounter()
|
||||||
promePkg.NewMsgPullFromRedisFailedCounter()
|
promePkg.NewMsgPullFromRedisFailedCounter()
|
||||||
promePkg.NewMsgPullFromMongoSuccessCounter()
|
promePkg.NewMsgPullFromMongoSuccessCounter()
|
||||||
promePkg.NewMsgPullFromMongoFailedCounter()
|
promePkg.NewMsgPullFromMongoFailedCounter()
|
||||||
|
|
||||||
promePkg.NewSingleChatMsgRecvSuccessCounter()
|
promePkg.NewSingleChatMsgRecvSuccessCounter()
|
||||||
promePkg.NewGroupChatMsgRecvSuccessCounter()
|
promePkg.NewGroupChatMsgRecvSuccessCounter()
|
||||||
promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter()
|
promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter()
|
||||||
|
|
||||||
promePkg.NewSingleChatMsgProcessSuccessCounter()
|
promePkg.NewSingleChatMsgProcessSuccessCounter()
|
||||||
promePkg.NewSingleChatMsgProcessFailedCounter()
|
promePkg.NewSingleChatMsgProcessFailedCounter()
|
||||||
promePkg.NewGroupChatMsgProcessSuccessCounter()
|
promePkg.NewGroupChatMsgProcessSuccessCounter()
|
||||||
@ -78,78 +63,3 @@ func (rpc *rpcChat) initPrometheus() {
|
|||||||
promePkg.NewWorkSuperGroupChatMsgProcessSuccessCounter()
|
promePkg.NewWorkSuperGroupChatMsgProcessSuccessCounter()
|
||||||
promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter()
|
promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) Run() {
|
|
||||||
log.Info("", "rpcChat init...")
|
|
||||||
listenIP := ""
|
|
||||||
if config.Config.ListenIP == "" {
|
|
||||||
listenIP = "0.0.0.0"
|
|
||||||
} else {
|
|
||||||
listenIP = config.Config.ListenIP
|
|
||||||
}
|
|
||||||
address := listenIP + ":" + strconv.Itoa(m.rpcPort)
|
|
||||||
listener, err := net.Listen("tcp", address)
|
|
||||||
if err != nil {
|
|
||||||
panic("listening err:" + err.Error() + m.rpcRegisterName)
|
|
||||||
}
|
|
||||||
log.Info("", "listen network success, address ", address)
|
|
||||||
recvSize := 1024 * 1024 * 30
|
|
||||||
sendSize := 1024 * 1024 * 30
|
|
||||||
var grpcOpts = []grpc.ServerOption{
|
|
||||||
grpc.MaxRecvMsgSize(recvSize),
|
|
||||||
grpc.MaxSendMsgSize(sendSize),
|
|
||||||
}
|
|
||||||
if config.Config.Prometheus.Enable {
|
|
||||||
promePkg.NewGrpcRequestCounter()
|
|
||||||
promePkg.NewGrpcRequestFailedCounter()
|
|
||||||
promePkg.NewGrpcRequestSuccessCounter()
|
|
||||||
grpcOpts = append(grpcOpts, []grpc.ServerOption{
|
|
||||||
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
|
|
||||||
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
|
|
||||||
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
|
|
||||||
}...)
|
|
||||||
}
|
|
||||||
srv := grpc.NewServer(grpcOpts...)
|
|
||||||
defer srv.GracefulStop()
|
|
||||||
|
|
||||||
rpcRegisterIP := config.Config.RpcRegisterIP
|
|
||||||
msg.RegisterMsgServer(srv, m)
|
|
||||||
if config.Config.RpcRegisterIP == "" {
|
|
||||||
rpcRegisterIP, err = utils.GetLocalIP()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("", "GetLocalIP failed ", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = getcdv3.RegisterEtcd(m.etcdSchema, strings.Join(m.etcdAddr, ","), rpcRegisterIP, m.rpcPort, m.rpcRegisterName, 10, "")
|
|
||||||
if err != nil {
|
|
||||||
log.Error("", "register rpcChat to etcd failed ", err.Error())
|
|
||||||
panic(utils.Wrap(err, "register chat module m to etcd err"))
|
|
||||||
}
|
|
||||||
go m.runCh()
|
|
||||||
m.initPrometheus()
|
|
||||||
err = srv.Serve(listener)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("", "m rpcChat failed ", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Info("", "m rpcChat init success")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpc *rpcChat) runCh() {
|
|
||||||
log.NewInfo("", "start del msg chan ")
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg := <-rpc.delMsgCh:
|
|
||||||
log.NewInfo(msg.OperationID, utils.GetSelfFuncName(), "delmsgch recv new: ", msg)
|
|
||||||
db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID)
|
|
||||||
unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID)
|
|
||||||
if err != nil {
|
|
||||||
log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(unexistSeqList) > 0 {
|
|
||||||
notification.DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -4,184 +4,46 @@ import (
|
|||||||
"Open_IM/internal/common/check"
|
"Open_IM/internal/common/check"
|
||||||
"Open_IM/internal/common/convert"
|
"Open_IM/internal/common/convert"
|
||||||
"Open_IM/internal/common/notification"
|
"Open_IM/internal/common/notification"
|
||||||
"Open_IM/internal/common/rpcserver"
|
|
||||||
"Open_IM/pkg/common/config"
|
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db/controller"
|
"Open_IM/pkg/common/db/controller"
|
||||||
"Open_IM/pkg/common/db/relation"
|
"Open_IM/pkg/common/db/relation"
|
||||||
tablerelation "Open_IM/pkg/common/db/table/relation"
|
tablerelation "Open_IM/pkg/common/db/table/relation"
|
||||||
"Open_IM/pkg/common/log"
|
|
||||||
prome "Open_IM/pkg/common/prometheus"
|
|
||||||
"Open_IM/pkg/common/tokenverify"
|
"Open_IM/pkg/common/tokenverify"
|
||||||
"Open_IM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
|
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
||||||
"Open_IM/pkg/proto/sdkws"
|
"Open_IM/pkg/proto/sdkws"
|
||||||
pbuser "Open_IM/pkg/proto/user"
|
pbuser "Open_IM/pkg/proto/user"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
"github.com/OpenIMSDK/openKeeper"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type userServer struct {
|
type userServer struct {
|
||||||
*rpcserver.RpcServer
|
|
||||||
controller.UserInterface
|
controller.UserInterface
|
||||||
notification *notification.Check
|
notification *notification.Check
|
||||||
userCheck *check.UserCheck
|
userCheck *check.UserCheck
|
||||||
|
ConversationChecker *check.ConversationChecker
|
||||||
|
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUserServer(port int) *userServer {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
mysql, err := relation.NewGormDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return err
|
||||||
}
|
}
|
||||||
//mysql init
|
if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil {
|
||||||
var mysql relation.Mysql
|
return err
|
||||||
var model relation.UserGorm
|
|
||||||
err = mysql.InitConn().AutoMigrateModel(&model)
|
|
||||||
if err != nil {
|
|
||||||
panic("db init err:" + err.Error())
|
|
||||||
}
|
}
|
||||||
if mysql.GormConn() != nil {
|
pbuser.RegisterUserServer(server, &userServer{
|
||||||
model.DB = mysql.GormConn()
|
UserInterface: controller.NewUserController(mysql),
|
||||||
} else {
|
notification: notification.NewCheck(client),
|
||||||
panic("db init err:" + "conn is nil")
|
userCheck: check.NewUserCheck(client),
|
||||||
}
|
RegisterCenter: client,
|
||||||
return &userServer{RpcServer: r, UserInterface: controller.NewUserController(model.DB)}
|
})
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *userServer) Run() {
|
|
||||||
operationID := utils.OperationIDGenerator()
|
|
||||||
log.NewInfo(operationID, "rpc user start...")
|
|
||||||
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
log.NewInfo(operationID, "listen ok ", address)
|
|
||||||
defer listener.Close()
|
|
||||||
//grpc server
|
|
||||||
var grpcOpts []grpc.ServerOption
|
|
||||||
if config.Config.Prometheus.Enable {
|
|
||||||
prome.NewGrpcRequestCounter()
|
|
||||||
prome.NewGrpcRequestFailedCounter()
|
|
||||||
prome.NewGrpcRequestSuccessCounter()
|
|
||||||
grpcOpts = append(grpcOpts, []grpc.ServerOption{
|
|
||||||
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
|
|
||||||
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
|
|
||||||
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
|
|
||||||
}...)
|
|
||||||
}
|
|
||||||
srv := grpc.NewServer(grpcOpts...)
|
|
||||||
defer srv.GracefulStop()
|
|
||||||
//Service registers with etcd
|
|
||||||
pbuser.RegisterUserServer(srv, s)
|
|
||||||
|
|
||||||
err = srv.Serve(listener)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
log.NewInfo(operationID, "rpc user success")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ok
|
|
||||||
//func (s *userServer) SyncJoinedGroupMemberFaceURL(ctx context.Context, userID string, faceURL string, operationID string, opUserID string) {
|
|
||||||
// members, err := s.GetJoinedGroupMembers(ctx, userID)
|
|
||||||
// if err != nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// groupIDs := make([]string, 0)
|
|
||||||
// for _, v := range members {
|
|
||||||
// groupIDs = append(groupIDs, v.GroupID)
|
|
||||||
// }
|
|
||||||
// if s.SetGroupMemberInfo(ctx, "", faceURL, "", 0, groupIDs, userID) != nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// for _, v := range groupIDs {
|
|
||||||
// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID)
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
// ok
|
|
||||||
//func (s *userServer) SyncJoinedGroupMemberNickname(ctx context.Context, userID string, newNickname, oldNickname string, operationID string, opUserID string) {
|
|
||||||
// members, err := s.GetJoinedGroupMembers(ctx, userID)
|
|
||||||
// if err != nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// groupIDs := make([]string, 0)
|
|
||||||
// for _, v := range members {
|
|
||||||
// if v.Nickname == oldNickname {
|
|
||||||
// groupIDs = append(groupIDs, v.GroupID)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// s.SetGroupMemberInfo(ctx, newNickname, "", "", 0, groupIDs, userID)
|
|
||||||
// for _, v := range groupIDs {
|
|
||||||
// chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID)
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
// 设置群头像
|
|
||||||
//func (s *userServer) SetGroupMemberInfo(ctx context.Context, nickname, faceURL, ex string, roleLevel int32, groupIDs []string, userID string) (err error) {
|
|
||||||
//
|
|
||||||
// req := pbgroup.SetGroupMemberInfo{UserID: userID}
|
|
||||||
// if nickname != "" {
|
|
||||||
// req.Nickname = &wrappers.StringValue{Value: nickname}
|
|
||||||
// }
|
|
||||||
// if faceURL != "" {
|
|
||||||
// req.FaceURL = &wrappers.StringValue{Value: faceURL}
|
|
||||||
// }
|
|
||||||
// if ex != "" {
|
|
||||||
// req.Ex = &wrappers.StringValue{Value: ex}
|
|
||||||
// }
|
|
||||||
// if roleLevel != 0 {
|
|
||||||
// req.RoleLevel = &wrappers.Int32Value{Value: roleLevel}
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// setGroupMemberInfoReq := &pbgroup.SetGroupMemberInfoReq{}
|
|
||||||
// for _, v := range groupIDs {
|
|
||||||
// req.GroupID = v
|
|
||||||
// setGroupMemberInfoReq.Members = append(setGroupMemberInfoReq.Members, &req)
|
|
||||||
// }
|
|
||||||
// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName)
|
|
||||||
// if err != nil {
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// client := group.NewGroupClient(conn)
|
|
||||||
// _, err = client.SetGroupMemberInfo(ctx, setGroupMemberInfoReq)
|
|
||||||
// return
|
|
||||||
//}
|
|
||||||
|
|
||||||
// 获取加入的群成员信息
|
|
||||||
//func (s *userServer) GetJoinedGroupMembers(ctx context.Context, userID string) (members []*sdkws.GroupMemberFullInfo, err error) {
|
|
||||||
// conn, err := s.RegisterCenter.GetConn(config.Config.RpcRegisterName.OpenImGroupName)
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// client := group.NewGroupClient(conn)
|
|
||||||
// for {
|
|
||||||
// idx := int32(0)
|
|
||||||
// req := pbgroup.GetJoinedGroupListReq{FromUserID: userID, Pagination: &sdkws.RequestPagination{PageNumber: idx, ShowNumber: constant.ShowNumber}}
|
|
||||||
// resp, err := client.GetJoinedGroupList(ctx, &req)
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
// groupIDs := make([]string, 0)
|
|
||||||
//
|
|
||||||
// for _, v := range resp.Groups {
|
|
||||||
// groupIDs = append(groupIDs, v.GroupID)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// client.GetGroupMembersInfo()
|
|
||||||
//
|
|
||||||
// if len(resp.Groups) < constant.ShowNumber {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// idx++
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return
|
|
||||||
//}
|
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) {
|
func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) {
|
||||||
resp = &pbuser.GetDesignateUsersResp{}
|
resp = &pbuser.GetDesignateUsersResp{}
|
||||||
|
@ -15,15 +15,13 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options []grpc.ServerOption) error {
|
func start(rpcPorts []int, rpcRegisterName string, prometheusPorts []int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options []grpc.ServerOption) error {
|
||||||
flagRpcPort := flag.Int("port", rpcPort, "get RpcGroupPort from cmd,default 16000 as port")
|
flagRpcPort := flag.Int("port", rpcPorts[0], "get RpcGroupPort from cmd,default 16000 as port")
|
||||||
flagPrometheusPort := flag.Int("prometheus_port", prometheusPort, "groupPrometheusPort default listen port")
|
flagPrometheusPort := flag.Int("prometheus_port", prometheusPorts[0], "groupPrometheusPort default listen port")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
rpcPort = *flagRpcPort
|
fmt.Println("start group rpc server, port: ", *flagRpcPort, ", OpenIM version: ", constant.CurrentVersion)
|
||||||
prometheusPort = *flagPrometheusPort
|
|
||||||
fmt.Println("start group rpc server, port: ", rpcPort, ", OpenIM version: ", constant.CurrentVersion)
|
|
||||||
log.NewPrivateLog(constant.LogFileName)
|
log.NewPrivateLog(constant.LogFileName)
|
||||||
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, rpcPort))
|
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, *flagRpcPort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -50,12 +48,12 @@ func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c
|
|||||||
}
|
}
|
||||||
srv := grpc.NewServer(options...)
|
srv := grpc.NewServer(options...)
|
||||||
defer srv.GracefulStop()
|
defer srv.GracefulStop()
|
||||||
err = zkClient.Register(rpcRegisterName, registerIP, rpcPort)
|
err = zkClient.Register(rpcRegisterName, registerIP, *flagRpcPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if config.Config.Prometheus.Enable {
|
if config.Config.Prometheus.Enable {
|
||||||
err := promePkg.StartPromeSrv(prometheusPort)
|
err := promePkg.StartPromeSrv(*flagPrometheusPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -63,7 +61,7 @@ func start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c
|
|||||||
return rpcFn(zkClient, srv)
|
return rpcFn(zkClient, srv)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options ...grpc.ServerOption) {
|
func Start(rpcPorts []int, rpcRegisterName string, prometheusPorts []int, rpcFn func(client *openKeeper.ZkClient, server *grpc.Server) error, options ...grpc.ServerOption) {
|
||||||
err := start(rpcPort, rpcRegisterName, prometheusPort, rpcFn, options)
|
err := start(rpcPorts, rpcRegisterName, prometheusPorts, rpcFn, options)
|
||||||
fmt.Println("end", err)
|
fmt.Println("end", err)
|
||||||
}
|
}
|
||||||
|
@ -41,13 +41,9 @@ type CommonCallbackResp struct {
|
|||||||
ActionCode int `json:"actionCode"`
|
ActionCode int `json:"actionCode"`
|
||||||
ErrCode int32 `json:"errCode"`
|
ErrCode int32 `json:"errCode"`
|
||||||
ErrMsg string `json:"errMsg"`
|
ErrMsg string `json:"errMsg"`
|
||||||
//OperationID string `json:"operationID"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CommonCallbackResp) Parse() error {
|
func (c CommonCallbackResp) Parse() error {
|
||||||
if c == nil {
|
|
||||||
return constant.ErrData.Wrap("callback common is nil")
|
|
||||||
}
|
|
||||||
if c.ActionCode != constant.NoError || c.ErrCode != constant.NoError {
|
if c.ActionCode != constant.NoError || c.ErrCode != constant.NoError {
|
||||||
newErr := constant.ErrCallback
|
newErr := constant.ErrCallback
|
||||||
newErr.ErrCode = c.ErrCode
|
newErr.ErrCode = c.ErrCode
|
||||||
|
@ -9,5 +9,5 @@ type CallbackBeforeAddFriendReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeAddFriendResp struct {
|
type CallbackBeforeAddFriendResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
@ -5,15 +5,21 @@ import (
|
|||||||
common "Open_IM/pkg/proto/sdkws"
|
common "Open_IM/pkg/proto/sdkws"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type CallbackCommand string
|
||||||
|
|
||||||
|
func (c CallbackCommand) GetCallbackCommand() string {
|
||||||
|
return string(c)
|
||||||
|
}
|
||||||
|
|
||||||
type CallbackBeforeCreateGroupReq struct {
|
type CallbackBeforeCreateGroupReq struct {
|
||||||
CallbackCommand string `json:"callbackCommand"`
|
|
||||||
OperationID string `json:"operationID"`
|
OperationID string `json:"operationID"`
|
||||||
|
CallbackCommand `json:"callbackCommand"`
|
||||||
common.GroupInfo
|
common.GroupInfo
|
||||||
InitMemberList []*apistruct.GroupAddMemberInfo `json:"initMemberList"`
|
InitMemberList []*apistruct.GroupAddMemberInfo `json:"initMemberList"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeCreateGroupResp struct {
|
type CallbackBeforeCreateGroupResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
GroupID *string `json:"groupID"`
|
GroupID *string `json:"groupID"`
|
||||||
GroupName *string `json:"groupName"`
|
GroupName *string `json:"groupName"`
|
||||||
Notification *string `json:"notification"`
|
Notification *string `json:"notification"`
|
||||||
@ -30,7 +36,7 @@ type CallbackBeforeCreateGroupResp struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeMemberJoinGroupReq struct {
|
type CallbackBeforeMemberJoinGroupReq struct {
|
||||||
CallbackCommand string `json:"callbackCommand"`
|
CallbackCommand `json:"callbackCommand"`
|
||||||
OperationID string `json:"operationID"`
|
OperationID string `json:"operationID"`
|
||||||
GroupID string `json:"groupID"`
|
GroupID string `json:"groupID"`
|
||||||
UserID string `json:"userID"`
|
UserID string `json:"userID"`
|
||||||
@ -39,8 +45,8 @@ type CallbackBeforeMemberJoinGroupReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeMemberJoinGroupResp struct {
|
type CallbackBeforeMemberJoinGroupResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
NickName *string `json:"nickName"`
|
Nickname *string `json:"nickname"`
|
||||||
FaceURL *string `json:"faceURL"`
|
FaceURL *string `json:"faceURL"`
|
||||||
RoleLevel *int32 `json:"roleLevel"`
|
RoleLevel *int32 `json:"roleLevel"`
|
||||||
MuteEndTime *int64 `json:"muteEndTime"`
|
MuteEndTime *int64 `json:"muteEndTime"`
|
||||||
@ -48,18 +54,18 @@ type CallbackBeforeMemberJoinGroupResp struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeSetGroupMemberInfoReq struct {
|
type CallbackBeforeSetGroupMemberInfoReq struct {
|
||||||
CallbackCommand string `json:"callbackCommand"`
|
CallbackCommand `json:"callbackCommand"`
|
||||||
OperationID string `json:"operationID"`
|
OperationID string `json:"operationID"`
|
||||||
GroupID string `json:"groupID"`
|
GroupID string `json:"groupID"`
|
||||||
UserID string `json:"userID"`
|
UserID string `json:"userID"`
|
||||||
Nickname string `json:"nickName"`
|
Nickname *string `json:"nickName"`
|
||||||
FaceURL string `json:"faceURL"`
|
FaceURL *string `json:"faceURL"`
|
||||||
RoleLevel int32 `json:"roleLevel"`
|
RoleLevel *int32 `json:"roleLevel"`
|
||||||
Ex string `json:"ex"`
|
Ex *string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeSetGroupMemberInfoResp struct {
|
type CallbackBeforeSetGroupMemberInfoResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
Ex *string `json:"ex"`
|
Ex *string `json:"ex"`
|
||||||
Nickname *string `json:"nickName"`
|
Nickname *string `json:"nickName"`
|
||||||
FaceURL *string `json:"faceURL"`
|
FaceURL *string `json:"faceURL"`
|
||||||
|
@ -11,7 +11,7 @@ type CallbackBeforeSendSingleMsgReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeSendSingleMsgResp struct {
|
type CallbackBeforeSendSingleMsgResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackAfterSendSingleMsgReq struct {
|
type CallbackAfterSendSingleMsgReq struct {
|
||||||
@ -20,7 +20,7 @@ type CallbackAfterSendSingleMsgReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackAfterSendSingleMsgResp struct {
|
type CallbackAfterSendSingleMsgResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeSendGroupMsgReq struct {
|
type CallbackBeforeSendGroupMsgReq struct {
|
||||||
@ -29,7 +29,7 @@ type CallbackBeforeSendGroupMsgReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeSendGroupMsgResp struct {
|
type CallbackBeforeSendGroupMsgResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackAfterSendGroupMsgReq struct {
|
type CallbackAfterSendGroupMsgReq struct {
|
||||||
@ -38,7 +38,7 @@ type CallbackAfterSendGroupMsgReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackAfterSendGroupMsgResp struct {
|
type CallbackAfterSendGroupMsgResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackMsgModifyCommandReq struct {
|
type CallbackMsgModifyCommandReq struct {
|
||||||
@ -46,7 +46,7 @@ type CallbackMsgModifyCommandReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackMsgModifyCommandResp struct {
|
type CallbackMsgModifyCommandResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
Content *string `json:"content"`
|
Content *string `json:"content"`
|
||||||
RecvID *string `json:"recvID"`
|
RecvID *string `json:"recvID"`
|
||||||
GroupID *string `json:"groupID"`
|
GroupID *string `json:"groupID"`
|
||||||
@ -79,7 +79,7 @@ type CallbackBeforeSetMessageReactionExtReq struct {
|
|||||||
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
||||||
}
|
}
|
||||||
type CallbackBeforeSetMessageReactionExtResp struct {
|
type CallbackBeforeSetMessageReactionExtResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"`
|
ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"`
|
||||||
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
||||||
}
|
}
|
||||||
@ -95,7 +95,7 @@ type CallbackDeleteMessageReactionExtReq struct {
|
|||||||
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
||||||
}
|
}
|
||||||
type CallbackDeleteMessageReactionExtResp struct {
|
type CallbackDeleteMessageReactionExtResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"`
|
ResultReactionExtensionList []*msg.KeyValueResp `json:"resultReactionExtensionList"`
|
||||||
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ type CallbackUserOnlineReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackUserOnlineResp struct {
|
type CallbackUserOnlineResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackUserOfflineReq struct {
|
type CallbackUserOfflineReq struct {
|
||||||
@ -19,7 +19,7 @@ type CallbackUserOfflineReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackUserOfflineResp struct {
|
type CallbackUserOfflineResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
|
||||||
type CallbackUserKickOffReq struct {
|
type CallbackUserKickOffReq struct {
|
||||||
@ -28,5 +28,5 @@ type CallbackUserKickOffReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackUserKickOffResp struct {
|
type CallbackUserKickOffResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,7 @@ type CallbackBeforePushReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforePushResp struct {
|
type CallbackBeforePushResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
UserIDList []string `json:"userIDList"`
|
UserIDList []string `json:"userIDList"`
|
||||||
OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"`
|
OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"`
|
||||||
}
|
}
|
||||||
@ -34,7 +34,7 @@ type CallbackBeforeSuperGroupOnlinePushReq struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CallbackBeforeSuperGroupOnlinePushResp struct {
|
type CallbackBeforeSuperGroupOnlinePushResp struct {
|
||||||
*CommonCallbackResp
|
CommonCallbackResp
|
||||||
UserIDList []string `json:"userIDList"`
|
UserIDList []string `json:"userIDList"`
|
||||||
OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"`
|
OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"`
|
||||||
}
|
}
|
||||||
|
4
pkg/common/db/cache/redis.go
vendored
4
pkg/common/db/cache/redis.go
vendored
@ -95,6 +95,10 @@ type RedisClient struct {
|
|||||||
rdb redis.UniversalClient
|
rdb redis.UniversalClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
|
||||||
|
return &RedisClient{rdb: rdb}
|
||||||
|
}
|
||||||
|
|
||||||
//func (r *RedisClient) InitRedis() {
|
//func (r *RedisClient) InitRedis() {
|
||||||
// var rdb redis.UniversalClient
|
// var rdb redis.UniversalClient
|
||||||
// var err error
|
// var err error
|
||||||
|
@ -19,7 +19,6 @@ type AuthController struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewAuthController(rdb redis.UniversalClient, accessSecret string, accessExpire int64) *AuthController {
|
func NewAuthController(rdb redis.UniversalClient, accessSecret string, accessExpire int64) *AuthController {
|
||||||
cache.NewRedisClient(rdb)
|
|
||||||
return &AuthController{database: cache.NewTokenRedis(cache.NewRedisClient(rdb), accessSecret, accessExpire)}
|
return &AuthController{database: cache.NewTokenRedis(cache.NewRedisClient(rdb), accessSecret, accessExpire)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ func NewConversationDataBase(db relation.Conversation, cache cache.ConversationC
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
|
func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
|
||||||
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
|
func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
|
||||||
|
@ -55,8 +55,8 @@ type GroupInterface interface {
|
|||||||
|
|
||||||
var _ GroupInterface = (*GroupController)(nil)
|
var _ GroupInterface = (*GroupController)(nil)
|
||||||
|
|
||||||
func NewGroupInterface(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.Client) GroupInterface {
|
func NewGroupInterface(database GroupDataBaseInterface) GroupInterface {
|
||||||
return &GroupController{database: NewGroupDatabase(db, rdb, mgoClient)}
|
return &GroupController{database: database}
|
||||||
}
|
}
|
||||||
|
|
||||||
type GroupController struct {
|
type GroupController struct {
|
||||||
@ -72,7 +72,7 @@ func (g *GroupController) CreateGroup(ctx context.Context, groups []*relationTb.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupController) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
|
func (g *GroupController) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
|
||||||
return g.TakeGroup(ctx, groupID)
|
return g.database.TakeGroup(ctx, groupID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupController) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
|
func (g *GroupController) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
|
||||||
@ -175,6 +175,53 @@ func (g *GroupController) CreateSuperGroupMember(ctx context.Context, groupID st
|
|||||||
return g.database.CreateSuperGroupMember(ctx, groupID, userIDs)
|
return g.database.CreateSuperGroupMember(ctx, groupID, userIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Group interface {
|
||||||
|
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
|
||||||
|
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
|
||||||
|
FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)
|
||||||
|
SearchGroup(ctx context.Context, keyword string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupModel, error)
|
||||||
|
UpdateGroup(ctx context.Context, groupID string, data map[string]any) error
|
||||||
|
DismissGroup(ctx context.Context, groupID string) error // 解散群,并删除群成员
|
||||||
|
}
|
||||||
|
|
||||||
|
type GroupMember interface {
|
||||||
|
TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error)
|
||||||
|
TakeGroupOwner(ctx context.Context, groupID string) (*relationTb.GroupMemberModel, error)
|
||||||
|
FindGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32) ([]*relationTb.GroupMemberModel, error)
|
||||||
|
FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error)
|
||||||
|
PageGroupMember(ctx context.Context, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
|
||||||
|
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
|
||||||
|
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
|
||||||
|
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||||
|
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
|
||||||
|
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
|
||||||
|
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
|
||||||
|
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
|
||||||
|
UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type GroupRequest interface {
|
||||||
|
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
|
||||||
|
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
||||||
|
PageGroupRequestUser(ctx context.Context, userID string, pageNumber, showNumber int32) (uint32, []*relationTb.GroupRequestModel, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SuperGroup interface {
|
||||||
|
FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error)
|
||||||
|
FindJoinSuperGroup(ctx context.Context, userID string) (*unrelationTb.UserToSuperGroupModel, error)
|
||||||
|
CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error
|
||||||
|
DeleteSuperGroup(ctx context.Context, groupID string) error
|
||||||
|
DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||||
|
CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type GroupDataBase1 interface {
|
||||||
|
Group
|
||||||
|
GroupMember
|
||||||
|
GroupRequest
|
||||||
|
SuperGroup
|
||||||
|
}
|
||||||
|
|
||||||
type GroupDataBaseInterface interface {
|
type GroupDataBaseInterface interface {
|
||||||
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
|
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
|
||||||
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
|
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
|
||||||
@ -291,7 +338,10 @@ func (g *GroupDataBase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
|
func (g *GroupDataBase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
|
||||||
return g.cache.GetGroupInfo(ctx, groupID)
|
//return g.cache.GetGroupInfo(ctx, groupID)
|
||||||
|
return cache.GetCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) {
|
||||||
|
return g.group.Take(ctx, groupID)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
|
func (g *GroupDataBase) FindGroup(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
|
||||||
|
@ -130,5 +130,5 @@ func (g *GroupMemberGorm) FindMemberUserID(ctx context.Context, groupID string,
|
|||||||
defer func() {
|
defer func() {
|
||||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userIDs", userIDs)
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userIDs", userIDs)
|
||||||
}()
|
}()
|
||||||
return userIDs, utils.Wrap(getDBConn(g.DB, tx).Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Pluck("user_id", &userIDs), "")
|
return userIDs, utils.Wrap(getDBConn(g.DB, tx).Model(&relation.GroupMemberModel{}).Where("group_id = ?", groupID).Pluck("user_id", &userIDs).Error, "")
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user