diff --git a/cmd/trainer/cmd/root.go b/cmd/trainer/cmd/root.go new file mode 100644 index 000000000..75babcd1c --- /dev/null +++ b/cmd/trainer/cmd/root.go @@ -0,0 +1,123 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "fmt" + "os" + "path" + + "github.com/spf13/cobra" + + "d7y.io/dragonfly/v2/cmd/dependency" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/dfpath" + "d7y.io/dragonfly/v2/pkg/types" + "d7y.io/dragonfly/v2/trainer" + "d7y.io/dragonfly/v2/trainer/config" + "d7y.io/dragonfly/v2/version" +) + +var ( + cfg *config.Config +) + +// rootCmd represents the commonv1 command when called without any subcommands. +var rootCmd = &cobra.Command{ + Use: "trainer", + Short: "the trainer of dragonfly", + Long: `Trainer is a long-running process and is mainly responsible for receiving historical download and network topology records, +preprocessing original record data, establing datasets and training machine learning and AI models that support scheduler peer-scheduling decisions.`, + Args: cobra.NoArgs, + DisableAutoGenTag: true, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + // Convert config. + if err := cfg.Convert(); err != nil { + return err + } + + // Validate config. + if err := cfg.Validate(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize dfpath. + d, err := initDfpath(&cfg.Server) + if err != nil { + return err + } + + // Initialize logger. + if err := logger.InitTrainer(cfg.Verbose, cfg.Console, d.LogDir()); err != nil { + return fmt.Errorf("init trainer logger: %w", err) + } + logger.RedirectStdoutAndStderr(cfg.Console, path.Join(d.LogDir(), types.SchedulerName)) + + return runTrainer(ctx, d) + }, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + if err := rootCmd.Execute(); err != nil { + logger.Error(err) + os.Exit(1) + } +} + +func init() { + // Initialize default scheduler config. + cfg = config.New() + // Initialize command and config. + dependency.InitCommandAndConfig(rootCmd, true, cfg) +} + +func initDfpath(cfg *config.ServerConfig) (dfpath.Dfpath, error) { + var options []dfpath.Option + if cfg.LogDir != "" { + options = append(options, dfpath.WithLogDir(cfg.LogDir)) + } + + dataDir := dfpath.DefaultDataDir + if cfg.DataDir != "" { + dataDir = cfg.DataDir + } + options = append(options, dfpath.WithDataDir(dataDir)) + + return dfpath.New(options...) +} + +func runTrainer(ctx context.Context, d dfpath.Dfpath) error { + logger.Infof("version:\n%s", version.Version()) + + ff := dependency.InitMonitor(cfg.PProfPort, cfg.Telemetry) + defer ff() + + svr, err := trainer.New(ctx, cfg, d) + if err != nil { + return err + } + + dependency.SetupQuitSignalHandler(func() { svr.Stop() }) + return svr.Serve() +} diff --git a/cmd/trainer/main.go b/cmd/trainer/main.go new file mode 100644 index 000000000..5478f597a --- /dev/null +++ b/cmd/trainer/main.go @@ -0,0 +1,23 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "d7y.io/dragonfly/v2/cmd/trainer/cmd" + +func main() { + cmd.Execute() +} diff --git a/internal/dflog/loginit.go b/internal/dflog/loginit.go index c8695a8d3..4b6540ad5 100644 --- a/internal/dflog/loginit.go +++ b/internal/dflog/loginit.go @@ -208,3 +208,24 @@ func InitDfcache(console bool, dir string) error { return createFileLogger(console, meta, logDir) } + +func InitTrainer(verbose, console bool, dir string) error { + if console { + return createConsoleLogger(verbose) + } + + logDir := filepath.Join(dir, types.TrainerName) + + var meta = []logInitMeta{ + { + fileName: CoreLogFileName, + setSugaredLoggerFunc: SetCoreLogger, + }, + { + fileName: GrpcLogFileName, + setSugaredLoggerFunc: SetGrpcLogger, + }, + } + + return createFileLogger(console, meta, logDir) +} diff --git a/trainer/config/config.go b/trainer/config/config.go index bedbc68d6..45b8d2280 100644 --- a/trainer/config/config.go +++ b/trainer/config/config.go @@ -21,6 +21,7 @@ import ( "net" "time" + "d7y.io/dragonfly/v2/cmd/dependency/base" "d7y.io/dragonfly/v2/pkg/net/ip" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/slices" @@ -28,6 +29,9 @@ import ( ) type Config struct { + // Base options. + base.Options `yaml:",inline" mapstructure:",squash"` + // Network configuration. Network NetworkConfig `yaml:"network" mapstructure:"network"` diff --git a/trainer/rpcserver/rpcserver.go b/trainer/rpcserver/rpcserver.go new file mode 100644 index 000000000..d64e5c82a --- /dev/null +++ b/trainer/rpcserver/rpcserver.go @@ -0,0 +1,35 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + "google.golang.org/grpc" + + "d7y.io/dragonfly/v2/pkg/rpc/trainer/server" + "d7y.io/dragonfly/v2/trainer/config" + "d7y.io/dragonfly/v2/trainer/storage" +) + +func New( + cfg *config.Config, + storage storage.Storage, + opts ...grpc.ServerOption, +) *grpc.Server { + return server.New( + newTrainerServerV1(cfg, storage), + opts...) +} diff --git a/trainer/rpcserver/rpcserver_test.go b/trainer/rpcserver/rpcserver_test.go new file mode 100644 index 000000000..01d6748ed --- /dev/null +++ b/trainer/rpcserver/rpcserver_test.go @@ -0,0 +1,54 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + "reflect" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/trainer/config" + storagemocks "d7y.io/dragonfly/v2/trainer/storage/mocks" +) + +func TestRPCServer_New(t *testing.T) { + tests := []struct { + name string + expect func(t *testing.T, s any) + }{ + { + name: "new server", + expect: func(t *testing.T, s any) { + assert := assert.New(t) + assert.Equal(reflect.TypeOf(s).Elem().Name(), "Server") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + storage := storagemocks.NewMockStorage(ctl) + + svr := New(&config.Config{}, storage) + tc.expect(t, svr) + }) + } +} diff --git a/trainer/rpcserver/trainer_server_v1.go b/trainer/rpcserver/trainer_server_v1.go new file mode 100644 index 000000000..75c6c01ed --- /dev/null +++ b/trainer/rpcserver/trainer_server_v1.go @@ -0,0 +1,41 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rpcserver + +import ( + trainerv1 "d7y.io/api/pkg/apis/trainer/v1" + + "d7y.io/dragonfly/v2/trainer/config" + "d7y.io/dragonfly/v2/trainer/service" + storage "d7y.io/dragonfly/v2/trainer/storage" +) + +// schedulerServerV1 is v1 version of the scheduler grpc server. +type trainerServerV1 struct { + // Service interface. + service *service.V1 +} + +func newTrainerServerV1(cfg *config.Config, st storage.Storage) trainerv1.TrainerServer { + return &trainerServerV1{service.NewV1(cfg, st)} +} + +// TODO (fyx): implement Train method. +// Train handles the training request from scheduler. +func (t *trainerServerV1) Train(stream trainerv1.Trainer_TrainServer) error { + return nil +} diff --git a/trainer/service/service_v1.go b/trainer/service/service_v1.go new file mode 100644 index 000000000..8e0540725 --- /dev/null +++ b/trainer/service/service_v1.go @@ -0,0 +1,43 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "d7y.io/dragonfly/v2/trainer/config" + "d7y.io/dragonfly/v2/trainer/storage" +) + +// V1 is the interface for v1 version of the service. +type V1 struct { + // Trainer service config. + config *config.Config + + // Storage Interface. + storage storage.Storage +} + +// New v1 version of service instance. +func NewV1( + cfg *config.Config, + storage storage.Storage, + +) *V1 { + return &V1{ + config: cfg, + storage: storage, + } +} diff --git a/trainer/trainer.go b/trainer/trainer.go new file mode 100644 index 000000000..773d9d006 --- /dev/null +++ b/trainer/trainer.go @@ -0,0 +1,144 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package trainer + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "time" + + "google.golang.org/grpc" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/dfpath" + "d7y.io/dragonfly/v2/pkg/net/ip" + "d7y.io/dragonfly/v2/trainer/config" + "d7y.io/dragonfly/v2/trainer/metrics" + "d7y.io/dragonfly/v2/trainer/rpcserver" + "d7y.io/dragonfly/v2/trainer/storage" +) + +const ( + // gracefulStopTimeout specifies a time limit for + // grpc server to complete a graceful shutdown. + gracefulStopTimeout = 10 * time.Minute +) + +type Server struct { + // Server configuration. + config *config.Config + + // GRPC server. + grpcServer *grpc.Server + + // Metrics server. + metricsServer *http.Server + + // Storage interface. + storage storage.Storage +} + +func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) { + s := &Server{config: cfg} + + // Initialize Storage. + s.storage = storage.New(d.DataDir()) + + // Initialize trainer grpc server. + s.grpcServer = rpcserver.New(cfg, s.storage) + + // Initialize metrics. + if cfg.Metrics.Enable { + s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer) + } + + return s, nil +} + +func (s *Server) Serve() error { + // Started metrics server. + if s.metricsServer != nil { + go func() { + logger.Infof("started metrics server at %s", s.metricsServer.Addr) + if err := s.metricsServer.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + return + } + + logger.Fatalf("metrics server closed unexpect: %s", err.Error()) + } + }() + } + + // Generate GRPC limit listener. + ip, ok := ip.FormatIP(s.config.Server.ListenIP.String()) + if !ok { + return errors.New("format ip failed") + } + + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, s.config.Server.Port)) + if err != nil { + logger.Fatalf("net listener failed to start: %s", err.Error()) + } + defer listener.Close() + + // Started GRPC server. + logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String()) + if err := s.grpcServer.Serve(listener); err != nil { + logger.Errorf("stoped grpc server: %s", err.Error()) + return err + } + + return nil +} + +func (s *Server) Stop() { + // Clean storage file. + if err := s.storage.Clear(); err != nil { + logger.Errorf("clean storage file failed %s", err.Error()) + } else { + logger.Info("clean storage file completed") + } + + // Stop metrics server. + if s.metricsServer != nil { + if err := s.metricsServer.Shutdown(context.Background()); err != nil { + logger.Errorf("metrics server failed to stop: %s", err.Error()) + } else { + logger.Info("metrics server closed under request") + } + } + + // Stop GRPC server. + stopped := make(chan struct{}) + go func() { + s.grpcServer.GracefulStop() + logger.Info("grpc server closed under request") + close(stopped) + }() + + t := time.NewTimer(gracefulStopTimeout) + select { + case <-t.C: + s.grpcServer.Stop() + case <-stopped: + t.Stop() + } +}