diff --git a/cmd/rpc/auth/main.go b/cmd/rpc/auth/main.go index 4625fa2d0..a89ff6845 100644 --- a/cmd/rpc/auth/main.go +++ b/cmd/rpc/auth/main.go @@ -7,5 +7,5 @@ import ( ) func main() { - startrpc.Start(config.Config.RpcPort.OpenImAuthPort[0], config.Config.RpcRegisterName.OpenImAuthName, config.Config.Prometheus.AuthPrometheusPort[0], auth.Start) + startrpc.Start(config.Config.RpcPort.OpenImAuthPort, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Prometheus.AuthPrometheusPort, auth.Start) } diff --git a/cmd/rpc/msg/main.go b/cmd/rpc/msg/main.go index c9bcecf9c..7900b1d4e 100644 --- a/cmd/rpc/msg/main.go +++ b/cmd/rpc/msg/main.go @@ -2,25 +2,10 @@ package main import ( "Open_IM/internal/rpc/msg" + "Open_IM/internal/startrpc" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" - promePkg "Open_IM/pkg/common/prometheus" - "flag" - "fmt" ) func main() { - defaultPorts := config.Config.RpcPort.OpenImMessagePort - rpcPort := flag.Int("port", defaultPorts[0], "rpc listening port") - prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessagePrometheusPort[0], "msgPrometheusPort default listen port") - flag.Parse() - fmt.Println("start msg rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - rpcServer := msg.NewRpcChatServer(*rpcPort) - go func() { - err := promePkg.StartPromeSrv(*prometheusPort) - if err != nil { - panic(err) - } - }() - rpcServer.Run() + startrpc.Start(config.Config.RpcPort.OpenImMessagePort, config.Config.RpcRegisterName.OpenImMsgName, config.Config.Prometheus.AuthPrometheusPort, msg.Start) } diff --git a/internal/rpc/msg/extend_msg_callback.go b/internal/rpc/msg/extend_msg_callback.go index eefcb419e..949e30c3f 100644 --- a/internal/rpc/msg/extend_msg_callback.go +++ b/internal/rpc/msg/extend_msg_callback.go @@ -1,7 +1,7 @@ package msg import ( - cbApi "Open_IM/pkg/call_back_struct" + cb "Open_IM/pkg/callbackstruct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/http" @@ -11,10 +11,10 @@ import ( http2 "net/http" ) -func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cbApi.CallbackBeforeSetMessageReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID} +func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cb.CallbackBeforeSetMessageReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID} log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) - req := cbApi.CallbackBeforeSetMessageReactionExtReq{ + req := cb.CallbackBeforeSetMessageReactionExtReq{ OperationID: setReq.OperationID, CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand, SourceID: setReq.SourceID, @@ -26,7 +26,7 @@ func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensio IsExternalExtensions: setReq.IsExternalExtensions, MsgFirstModifyTime: setReq.MsgFirstModifyTime, } - resp := &cbApi.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp} + resp := &cb.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp} defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetMessageReactionExtensionCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { callbackResp.ErrCode = http2.StatusInternalServerError @@ -36,8 +36,8 @@ func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensio } -func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cbApi.CallbackDeleteMessageReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID} +func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cb.CallbackDeleteMessageReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID} log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) req := cbApi.CallbackDeleteMessageReactionExtReq{ OperationID: setReq.OperationID, @@ -58,8 +58,8 @@ func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti } return resp } -func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cbApi.CallbackGetMessageListReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: getReq.OperationID} +func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cb.CallbackGetMessageListReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: getReq.OperationID} log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), getReq.String()) req := cbApi.CallbackGetMessageListReactionExtReq{ OperationID: getReq.OperationID, @@ -78,8 +78,8 @@ func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReaction } return resp } -func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cbApi.CallbackAddMessageReactionExtResp { - callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID} +func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cb.CallbackAddMessageReactionExtResp { + callbackResp := cb.CommonCallbackResp{OperationID: setReq.OperationID} log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String()) req := cbApi.CallbackAddMessageReactionExtReq{ OperationID: setReq.OperationID, @@ -93,7 +93,7 @@ func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensio IsExternalExtensions: setReq.IsExternalExtensions, MsgFirstModifyTime: setReq.MsgFirstModifyTime, } - resp := &cbApi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} + resp := &cb.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp} defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime) if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { callbackResp.ErrCode = http2.StatusInternalServerError diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index ae416ccf3..28d5c14a2 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -2,33 +2,23 @@ package msg import ( "Open_IM/internal/common/check" - "Open_IM/internal/common/notification" - "Open_IM/internal/common/rpcserver" - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" + "Open_IM/pkg/common/db/relation" + tablerelation "Open_IM/pkg/common/db/table/relation" + discoveryRegistry "Open_IM/pkg/discoveryregistry" + "github.com/OpenIMSDK/openKeeper" - "Open_IM/pkg/common/kafka" - "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/proto/msg" - "Open_IM/pkg/utils" - "github.com/OpenIMSDK/getcdv3" - "net" - "strconv" - "strings" - - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "google.golang.org/grpc" ) type msgServer struct { - *rpcserver.RpcServer - MsgInterface controller.MsgInterface - Group *check.GroupChecker - User *check.UserCheck - Conversation *check.ConversationChecker + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry + MsgInterface controller.MsgInterface + Group *check.GroupChecker + User *check.UserCheck + Conversation *check.ConversationChecker } type deleteMsg struct { @@ -38,39 +28,34 @@ type deleteMsg struct { OperationID string } -func NewRpcChatServer(port int) *msgServer { - log.NewPrivateLog(constant.LogFileName) - rc := msgServer{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, - dMessageLocker: NewLockerMessage(), +func Start(client *openKeeper.ZkClient, server *grpc.Server) error { + mysql, err := relation.NewGormDB() + if err != nil { + return err } - rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) - //rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic) - rc.delMsgCh = make(chan deleteMsg, 1000) - return &rc + if err := mysql.AutoMigrate(&tablerelation.UserModel{}); err != nil { + return err + } + s := &msgServer{ + Conversation: check.NewConversationChecker(client), + User: check.NewUserCheck(client), + Group: check.NewGroupChecker(client), + //MsgInterface: controller.MsgInterface(), + RegisterCenter: client, + } + s.initPrometheus() + msg.RegisterMsgServer(server, s) + return nil } -func (rpc *rpcChat) initPrometheus() { - //sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ - // Name: "send_msg_success", - // Help: "The number of send msg success", - //}) - //sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ - // Name: "send_msg_failed", - // Help: "The number of send msg failed", - //}) +func (m *msgServer) initPrometheus() { promePkg.NewMsgPullFromRedisSuccessCounter() promePkg.NewMsgPullFromRedisFailedCounter() promePkg.NewMsgPullFromMongoSuccessCounter() promePkg.NewMsgPullFromMongoFailedCounter() - promePkg.NewSingleChatMsgRecvSuccessCounter() promePkg.NewGroupChatMsgRecvSuccessCounter() promePkg.NewWorkSuperGroupChatMsgRecvSuccessCounter() - promePkg.NewSingleChatMsgProcessSuccessCounter() promePkg.NewSingleChatMsgProcessFailedCounter() promePkg.NewGroupChatMsgProcessSuccessCounter() @@ -78,78 +63,3 @@ func (rpc *rpcChat) initPrometheus() { promePkg.NewWorkSuperGroupChatMsgProcessSuccessCounter() promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter() } - -func (m *msgServer) Run() { - log.Info("", "rpcChat init...") - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(m.rpcPort) - listener, err := net.Listen("tcp", address) - if err != nil { - panic("listening err:" + err.Error() + m.rpcRegisterName) - } - log.Info("", "listen network success, address ", address) - recvSize := 1024 * 1024 * 30 - sendSize := 1024 * 1024 * 30 - var grpcOpts = []grpc.ServerOption{ - grpc.MaxRecvMsgSize(recvSize), - grpc.MaxSendMsgSize(sendSize), - } - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - - rpcRegisterIP := config.Config.RpcRegisterIP - msg.RegisterMsgServer(srv, m) - if config.Config.RpcRegisterIP == "" { - rpcRegisterIP, err = utils.GetLocalIP() - if err != nil { - log.Error("", "GetLocalIP failed ", err.Error()) - } - } - err = getcdv3.RegisterEtcd(m.etcdSchema, strings.Join(m.etcdAddr, ","), rpcRegisterIP, m.rpcPort, m.rpcRegisterName, 10, "") - if err != nil { - log.Error("", "register rpcChat to etcd failed ", err.Error()) - panic(utils.Wrap(err, "register chat module m to etcd err")) - } - go m.runCh() - m.initPrometheus() - err = srv.Serve(listener) - if err != nil { - log.Error("", "m rpcChat failed ", err.Error()) - return - } - log.Info("", "m rpcChat init success") -} - -func (rpc *rpcChat) runCh() { - log.NewInfo("", "start del msg chan ") - for { - select { - case msg := <-rpc.delMsgCh: - log.NewInfo(msg.OperationID, utils.GetSelfFuncName(), "delmsgch recv new: ", msg) - db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID) - unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID) - if err != nil { - log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error()) - continue - } - if len(unexistSeqList) > 0 { - notification.DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID) - } - } - } -} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 34c5ee94b..27f4bec13 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -95,6 +95,10 @@ type RedisClient struct { rdb redis.UniversalClient } +func NewRedisClient(rdb redis.UniversalClient) *RedisClient { + return &RedisClient{rdb: rdb} +} + //func (r *RedisClient) InitRedis() { // var rdb redis.UniversalClient // var err error diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index eba88fd7d..ad485f117 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -19,7 +19,6 @@ type AuthController struct { } func NewAuthController(rdb redis.UniversalClient, accessSecret string, accessExpire int64) *AuthController { - cache.NewRedisClient(rdb) return &AuthController{database: cache.NewTokenRedis(cache.NewRedisClient(rdb), accessSecret, accessExpire)} } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 093249940..863026909 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -142,7 +142,7 @@ func NewConversationDataBase(db relation.Conversation, cache cache.ConversationC } func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { - + panic("implement me") } func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {