diff --git a/cmd/api/main.go b/cmd/api/main.go index 4358044c4..c1b0bb6de 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -20,7 +20,9 @@ func main() { ginPort := flag.Int("port", config.Config.Api.GinPort[0], "get ginServerPort from cmd,default 10002 as port") configPath := flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + panic(err.Error()) + } address := "0.0.0.0:" + strconv.Itoa(*ginPort) if config.Config.Api.ListenIP != "" { address = config.Config.Api.ListenIP + ":" + strconv.Itoa(*ginPort) diff --git a/cmd/cmdutils/main.go b/cmd/cmdutils/main.go index ae6b8ef73..9957698a3 100644 --- a/cmd/cmdutils/main.go +++ b/cmd/cmdutils/main.go @@ -14,7 +14,9 @@ func main() { var fixAllSeq = flag.Bool("fix_all_seq", false, "fix seq") var configPath = flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + panic(err.Error()) + } fmt.Println(time.Now(), "start cronTask", *userID, *superGroupID) task.FixSeq(*userID, *superGroupID, *fixAllSeq) } diff --git a/cmd/crontask/main.go b/cmd/crontask/main.go index 3c8939e89..1908e6d4c 100644 --- a/cmd/crontask/main.go +++ b/cmd/crontask/main.go @@ -12,7 +12,9 @@ func main() { fmt.Println(time.Now(), "start cronTask") var configPath = flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + panic(err.Error()) + } if err := task.StartCronTask(); err != nil { panic(err.Error()) } diff --git a/cmd/msggateway/main.go b/cmd/msggateway/main.go index 21ac9f2cf..4c3ce7d94 100644 --- a/cmd/msggateway/main.go +++ b/cmd/msggateway/main.go @@ -20,7 +20,9 @@ func main() { prometheusPort := flag.Int("prometheus_port", defaultPromePorts[0], "PushrometheusPort default listen port") configPath := flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + panic(err.Error()) + } var wg sync.WaitGroup wg.Add(1) fmt.Println("start rpc/msg_gateway server, port: ", *rpcPort, *wsPort, *prometheusPort, ", OpenIM version: ", constant.CurrentVersion, "\n") diff --git a/cmd/msgtransfer/main.go b/cmd/msgtransfer/main.go index 3828a56f6..1f57b2792 100644 --- a/cmd/msgtransfer/main.go +++ b/cmd/msgtransfer/main.go @@ -16,7 +16,9 @@ func main() { prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "MessageTransferPrometheusPort default listen port") configPath := flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + panic(err.Error()) + } log.NewPrivateLog(constant.LogFileName) msgTransfer := msgtransfer.NewMsgTransfer() fmt.Println("start msg_transfer server ", ", OpenIM version: ", constant.CurrentVersion, "\n") diff --git a/cmd/push/main.go b/cmd/push/main.go index 45b0b7d1c..85be2d6a3 100644 --- a/cmd/push/main.go +++ b/cmd/push/main.go @@ -16,7 +16,9 @@ func main() { prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.MessageTransferPrometheusPort[0], "PushrometheusPort default listen port") configPath := flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + panic(err.Error()) + } var wg sync.WaitGroup wg.Add(1) log.NewPrivateLog(constant.LogFileName) diff --git a/internal/startrpc/start.go b/internal/startrpc/start.go index e92f256f5..c6beb4880 100644 --- a/internal/startrpc/start.go +++ b/internal/startrpc/start.go @@ -20,7 +20,9 @@ func start(rpcPorts []int, rpcRegisterName string, prometheusPorts []int, rpcFn flagPrometheusPort := flag.Int("prometheus_port", prometheusPorts[0], "groupPrometheusPort default listen port") configPath := flag.String("config_path", "../config/", "config folder") flag.Parse() - config.InitConfig(*configPath) + if err := config.InitConfig(*configPath); err != nil { + return err + } fmt.Println("start group rpc server, port: ", *flagRpcPort, ", OpenIM version: ", constant.CurrentVersion) log.NewPrivateLog(constant.LogFileName) listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, *flagRpcPort)) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 7032aa419..8590f1d0d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -502,17 +502,17 @@ type Notification struct { } `yaml:"signal"` } -func unmarshalConfig(config interface{}, configPath string) { +func unmarshalConfig(config interface{}, configPath string) error { bytes, err := ioutil.ReadFile(configPath) if err != nil { - panic(err.Error() + configPath) + return err } if err = yaml.Unmarshal(bytes, config); err != nil { - panic(err.Error()) + return err } } -func initConfig(config interface{}, configName, configPath string) { +func initConfig(config interface{}, configName, configPath string) error { if configPath == "" { var env = "CONFIG_NAME" if configName == "config.yaml" { @@ -533,11 +533,18 @@ func initConfig(config interface{}, configName, configPath string) { configPath = fmt.Sprintf("../config/%s", configName) } } - unmarshalConfig(config, configPath) + return unmarshalConfig(config, configPath) } -func InitConfig(configPath string) { - initConfig(&Config, "config.yaml", configPath) - initConfig(&NotificationConfig, "notification.yaml", configPath) +func InitConfig(configPath string) error { + err := initConfig(&Config, "config.yaml", configPath) + if err != nil { + return err + } + err = initConfig(&NotificationConfig, "notification.yaml", configPath) + if err != nil { + return err + } Config.Notification = NotificationConfig.Notification + return nil } diff --git a/pkg/common/db/table/relation/statistics.go b/pkg/common/db/table/relation/statistics.go deleted file mode 100644 index fd2af5f84..000000000 --- a/pkg/common/db/table/relation/statistics.go +++ /dev/null @@ -1,30 +0,0 @@ -package relation - -import "time" - -// these two is virtual table just for cms -type ActiveGroup struct { - Name string - ID string `gorm:"column:recv_id"` - MessageNum int `gorm:"column:message_num"` -} - -type ActiveUser struct { - Name string - ID string `gorm:"column:send_id"` - MessageNum int `gorm:"column:message_num"` -} - -type StatisticsInterface interface { - GetActiveUserNum(from, to time.Time) (num int64, err error) - GetIncreaseUserNum(from, to time.Time) (num int64, err error) - GetTotalUserNum() (num int64, err error) - GetTotalUserNumByDate(to time.Time) (num int64, err error) - GetSingleChatMessageNum(from, to time.Time) (num int64, err error) - GetGroupMessageNum(from, to time.Time) (num int64, err error) - GetIncreaseGroupNum(from, to time.Time) (num int64, err error) - GetTotalGroupNum() (num int64, err error) - GetGroupNum(to time.Time) (num int64, err error) - GetActiveGroups(from, to time.Time, limit int) ([]*ActiveGroup, error) - GetActiveUsers(from, to time.Time, limit int) (activeUsers []*ActiveUser, err error) -} diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go index cd873c190..4ab702827 100644 --- a/pkg/common/db/unrelation/super_group.go +++ b/pkg/common/db/unrelation/super_group.go @@ -57,7 +57,6 @@ func (s *SuperGroupMongoDriver) FindSuperGroup(ctx context.Context, groupIDs []s return nil, err } defer cursor.Close(ctx) - if err := cursor.All(ctx, &groups); err != nil { return nil, utils.Wrap(err, "") } @@ -65,43 +64,32 @@ func (s *SuperGroupMongoDriver) FindSuperGroup(ctx context.Context, groupIDs []s } func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error { - opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) - return s.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { - _, err := s.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}}) + _, err := s.superGroupCollection.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}}) + if err != nil { + return err + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + for _, userID := range userIDs { + _, err = s.userToSuperGroupCollection.UpdateOne(ctx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) if err != nil { - _ = sCtx.AbortTransaction(ctx) - return err + return utils.Wrap(err, "transaction failed") } - upsert := true - opts := &options.UpdateOptions{ - Upsert: &upsert, - } - for _, userID := range userIDs { - _, err = s.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) - if err != nil { - _ = sCtx.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - } - return sCtx.CommitTransaction(ctx) - }) + } } func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error { - opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) - return s.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { - _, err := s.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}}) - if err != nil { - _ = sCtx.AbortTransaction(ctx) - return err - } - err = s.RemoveGroupFromUser(sCtx, groupID, userIDs) - if err != nil { - _ = sCtx.AbortTransaction(ctx) - return err - } - return sCtx.CommitTransaction(ctx) - }) + _, err := s.superGroupCollection.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}}) + if err != nil { + return err + } + err = s.RemoveGroupFromUser(ctx, groupID, userIDs) + if err != nil { + return err + } + return nil } func (s *SuperGroupMongoDriver) GetSuperGroupByUserID(ctx context.Context, userID string) (*unrelation.UserToSuperGroupModel, error) {