refactor: third update

This commit is contained in:
Gordon 2024-04-07 21:23:36 +08:00
parent 8730664dba
commit 8c10e4f532
10 changed files with 184 additions and 40 deletions

View File

@ -15,16 +15,12 @@
package main package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcMsgServer, msg.Start) if err := cmd.NewMsgRpcCmd().Exec(); err != nil {
rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil {
program.ExitWithError(err) program.ExitWithError(err)
} }
} }

View File

@ -15,16 +15,12 @@
package main package main
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcThirdServer, third.Start) if err := cmd.NewThirdRpcCmd().Exec(); err != nil {
rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil {
program.ExitWithError(err) program.ExitWithError(err)
} }
} }

View File

@ -16,10 +16,10 @@ package msg
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
@ -47,20 +47,21 @@ type (
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages. Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
config *config.GlobalConfig // Global configuration settings. config *cmd.MsgConfig // Global configuration settings.
} }
) )
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...) m.Handlers = append(m.Handlers, interceptorFunc...
} }
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(ctx context.Context, config *cmd.MsgConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil { if err != nil {
return err return err
} }
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil { if err != nil {
return err return err
} }
@ -71,11 +72,11 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
} }
msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis) msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis)
seqModel := cache.NewSeqCache(rdb) seqModel := cache.NewSeqCache(rdb)
conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig)
if err != nil { if err != nil {
return err return err
} }
@ -90,7 +91,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
config: config, config: config,
} }
s.notificationSender = rpcclient.NewNotificationSender(&config.Notification, rpcclient.WithLocalSendMsg(s.SendMsg)) s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled) s.addInterceptorHandler(MessageHasReadEnabled)
msg.RegisterMsgServer(server, s) msg.RegisterMsgServer(server, s)
return nil return nil

View File

@ -17,10 +17,10 @@ package third
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"net/url" "net/url"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
@ -37,12 +37,21 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { type thirdServer struct {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) apiURL string
thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient
defaultExpire time.Duration
config *cmd.ThirdConfig
}
func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil { if err != nil {
return err return err
} }
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil { if err != nil {
return err return err
} }
@ -54,11 +63,11 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
if err != nil { if err != nil {
return err return err
} }
apiURL := config.Object.ApiURL apiURL := config.MinioConfig.URL
if apiURL == "" { if apiURL == "" {
return errs.Wrap(fmt.Errorf("api is empty")) return errs.Wrap(fmt.Errorf("api is empty"))
} }
if _, err := url.Parse(config.Object.ApiURL); err != nil { if _, err := url.Parse(config.MinioConfig.URL); err != nil {
return err return err
} }
if apiURL[len(apiURL)-1] != '/' { if apiURL[len(apiURL)-1] != '/' {
@ -67,7 +76,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
apiURL += "object/" apiURL += "object/"
// Select the oss method according to the profile policy // Select the oss method according to the profile policy
enable := config.Object.Enable enable := config.RpcConfig.Object.Enable
var o s3.Interface var o s3.Interface
switch enable { switch enable {
case "minio": case "minio":
@ -93,15 +102,6 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
return nil return nil
} }
type thirdServer struct {
apiURL string
thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient
defaultExpire time.Duration
config *config.GlobalConfig
}
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) { func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime) err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil { if err != nil {

View File

@ -60,11 +60,15 @@ const (
logEnvPrefix = "openim-log" logEnvPrefix = "openim-log"
redisEnvPrefix = "openim-redis" redisEnvPrefix = "openim-redis"
mongodbEnvPrefix = "openim-mongodb" mongodbEnvPrefix = "openim-mongodb"
minioEnvPrefix = "openim-minio"
kafkaEnvPrefix = "openim-kafka"
zoopkeeperEnvPrefix = "openim-zookeeper" zoopkeeperEnvPrefix = "openim-zookeeper"
authEnvPrefix = "openim-auth" authEnvPrefix = "openim-auth"
conversationEnvPrefix = "openim-conversation" conversationEnvPrefix = "openim-conversation"
friendEnvPrefix = "openim-friend" friendEnvPrefix = "openim-friend"
groupEnvPrefix = "openim-group" groupEnvPrefix = "openim-group"
msgEnvPrefix = "openim-msg"
thridEnvPrefix = "openim-third"
) )
const ( const (

View File

@ -30,7 +30,7 @@ type FriendRpcCmd struct {
friendConfig FriendConfig friendConfig FriendConfig
} }
type FriendConfig struct { type FriendConfig struct {
RpcConfig config.Conversation RpcConfig config.Friend
RedisConfig config.Redis RedisConfig config.Redis
MongodbConfig config.Mongo MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper ZookeeperConfig config.ZooKeeper

View File

@ -30,7 +30,7 @@ type GroupRpcCmd struct {
groupConfig GroupConfig groupConfig GroupConfig
} }
type GroupConfig struct { type GroupConfig struct {
RpcConfig config.Conversation RpcConfig config.Group
RedisConfig config.Redis RedisConfig config.Redis
MongodbConfig config.Mongo MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper ZookeeperConfig config.ZooKeeper

72
pkg/common/cmd/msg.go Normal file
View File

@ -0,0 +1,72 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type MsgRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
msgConfig MsgConfig
}
type MsgConfig struct {
RpcConfig config.Msg
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func NewMsgRpcCmd() *MsgRpcCmd {
var msgConfig MsgConfig
ret := &MsgRpcCmd{msgConfig: msgConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCMsgCfgFileName: {EnvPrefix: msgEnvPrefix, ConfigStruct: &msgConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &msgConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &msgConfig.KafkaConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &msgConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgConfig.WebhooksConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (a *MsgRpcCmd) Exec() error {
return a.Execute()
}
func (a *MsgRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Share.RpcRegisterName.Auth, &a.msgConfig, msg.Start)
}

70
pkg/common/cmd/third.go Normal file
View File

@ -0,0 +1,70 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type ThirdRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
thirdConfig ThirdConfig
}
type ThirdConfig struct {
RpcConfig config.Third
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
}
func NewThirdRpcCmd() *ThirdRpcCmd {
var thirdConfig ThirdConfig
ret := &ThirdRpcCmd{thirdConfig: thirdConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCThirdCfgFileName: {EnvPrefix: thridEnvPrefix, ConfigStruct: &thirdConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &thirdConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &thirdConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &thirdConfig.MongodbConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &thirdConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &thirdConfig.NotificationConfig},
MinioConfigFileName: {EnvPrefix: minioEnvPrefix, ConfigStruct: &thirdConfig.MinioConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (a *ThirdRpcCmd) Exec() error {
return a.Execute()
}
func (a *ThirdRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Share.RpcRegisterName.Auth, &a.thirdConfig, third.Start)
}

View File

@ -17,6 +17,7 @@ package config
import ( import (
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/mq/kafka"
"time" "time"
) )
@ -420,6 +421,10 @@ func (r *Redis) Build() *redisutil.Config {
} }
} }
func (k *Kafka) Build() *kafka.Config {
return &kafka.Config{}
}
func (l *CacheConfig) Failed() time.Duration { func (l *CacheConfig) Failed() time.Duration {
return time.Second * time.Duration(l.FailedExpire) return time.Second * time.Duration(l.FailedExpire)
} }