feat: add trainer cmd and trainer service (#2479)
Signed-off-by: XZ <834756128@qq.com>
This commit is contained in:
parent
71e323f59d
commit
e20bb9743e
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"d7y.io/dragonfly/v2/cmd/dependency"
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/dfpath"
|
||||
"d7y.io/dragonfly/v2/pkg/types"
|
||||
"d7y.io/dragonfly/v2/trainer"
|
||||
"d7y.io/dragonfly/v2/trainer/config"
|
||||
"d7y.io/dragonfly/v2/version"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg *config.Config
|
||||
)
|
||||
|
||||
// rootCmd represents the commonv1 command when called without any subcommands.
|
||||
var rootCmd = &cobra.Command{
|
||||
Use: "trainer",
|
||||
Short: "the trainer of dragonfly",
|
||||
Long: `Trainer is a long-running process and is mainly responsible for receiving historical download and network topology records,
|
||||
preprocessing original record data, establing datasets and training machine learning and AI models that support scheduler peer-scheduling decisions.`,
|
||||
Args: cobra.NoArgs,
|
||||
DisableAutoGenTag: true,
|
||||
SilenceUsage: true,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Convert config.
|
||||
if err := cfg.Convert(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate config.
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize dfpath.
|
||||
d, err := initDfpath(&cfg.Server)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize logger.
|
||||
if err := logger.InitTrainer(cfg.Verbose, cfg.Console, d.LogDir()); err != nil {
|
||||
return fmt.Errorf("init trainer logger: %w", err)
|
||||
}
|
||||
logger.RedirectStdoutAndStderr(cfg.Console, path.Join(d.LogDir(), types.SchedulerName))
|
||||
|
||||
return runTrainer(ctx, d)
|
||||
},
|
||||
}
|
||||
|
||||
// Execute adds all child commands to the root command and sets flags appropriately.
|
||||
// This is called by main.main(). It only needs to happen once to the rootCmd.
|
||||
func Execute() {
|
||||
if err := rootCmd.Execute(); err != nil {
|
||||
logger.Error(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Initialize default scheduler config.
|
||||
cfg = config.New()
|
||||
// Initialize command and config.
|
||||
dependency.InitCommandAndConfig(rootCmd, true, cfg)
|
||||
}
|
||||
|
||||
func initDfpath(cfg *config.ServerConfig) (dfpath.Dfpath, error) {
|
||||
var options []dfpath.Option
|
||||
if cfg.LogDir != "" {
|
||||
options = append(options, dfpath.WithLogDir(cfg.LogDir))
|
||||
}
|
||||
|
||||
dataDir := dfpath.DefaultDataDir
|
||||
if cfg.DataDir != "" {
|
||||
dataDir = cfg.DataDir
|
||||
}
|
||||
options = append(options, dfpath.WithDataDir(dataDir))
|
||||
|
||||
return dfpath.New(options...)
|
||||
}
|
||||
|
||||
func runTrainer(ctx context.Context, d dfpath.Dfpath) error {
|
||||
logger.Infof("version:\n%s", version.Version())
|
||||
|
||||
ff := dependency.InitMonitor(cfg.PProfPort, cfg.Telemetry)
|
||||
defer ff()
|
||||
|
||||
svr, err := trainer.New(ctx, cfg, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dependency.SetupQuitSignalHandler(func() { svr.Stop() })
|
||||
return svr.Serve()
|
||||
}
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import "d7y.io/dragonfly/v2/cmd/trainer/cmd"
|
||||
|
||||
func main() {
|
||||
cmd.Execute()
|
||||
}
|
||||
|
|
@ -208,3 +208,24 @@ func InitDfcache(console bool, dir string) error {
|
|||
|
||||
return createFileLogger(console, meta, logDir)
|
||||
}
|
||||
|
||||
func InitTrainer(verbose, console bool, dir string) error {
|
||||
if console {
|
||||
return createConsoleLogger(verbose)
|
||||
}
|
||||
|
||||
logDir := filepath.Join(dir, types.TrainerName)
|
||||
|
||||
var meta = []logInitMeta{
|
||||
{
|
||||
fileName: CoreLogFileName,
|
||||
setSugaredLoggerFunc: SetCoreLogger,
|
||||
},
|
||||
{
|
||||
fileName: GrpcLogFileName,
|
||||
setSugaredLoggerFunc: SetGrpcLogger,
|
||||
},
|
||||
}
|
||||
|
||||
return createFileLogger(console, meta, logDir)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"net"
|
||||
"time"
|
||||
|
||||
"d7y.io/dragonfly/v2/cmd/dependency/base"
|
||||
"d7y.io/dragonfly/v2/pkg/net/ip"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc"
|
||||
"d7y.io/dragonfly/v2/pkg/slices"
|
||||
|
|
@ -28,6 +29,9 @@ import (
|
|||
)
|
||||
|
||||
type Config struct {
|
||||
// Base options.
|
||||
base.Options `yaml:",inline" mapstructure:",squash"`
|
||||
|
||||
// Network configuration.
|
||||
Network NetworkConfig `yaml:"network" mapstructure:"network"`
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package rpcserver
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/trainer/server"
|
||||
"d7y.io/dragonfly/v2/trainer/config"
|
||||
"d7y.io/dragonfly/v2/trainer/storage"
|
||||
)
|
||||
|
||||
func New(
|
||||
cfg *config.Config,
|
||||
storage storage.Storage,
|
||||
opts ...grpc.ServerOption,
|
||||
) *grpc.Server {
|
||||
return server.New(
|
||||
newTrainerServerV1(cfg, storage),
|
||||
opts...)
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package rpcserver
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"d7y.io/dragonfly/v2/trainer/config"
|
||||
storagemocks "d7y.io/dragonfly/v2/trainer/storage/mocks"
|
||||
)
|
||||
|
||||
func TestRPCServer_New(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expect func(t *testing.T, s any)
|
||||
}{
|
||||
{
|
||||
name: "new server",
|
||||
expect: func(t *testing.T, s any) {
|
||||
assert := assert.New(t)
|
||||
assert.Equal(reflect.TypeOf(s).Elem().Name(), "Server")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctl := gomock.NewController(t)
|
||||
defer ctl.Finish()
|
||||
storage := storagemocks.NewMockStorage(ctl)
|
||||
|
||||
svr := New(&config.Config{}, storage)
|
||||
tc.expect(t, svr)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package rpcserver
|
||||
|
||||
import (
|
||||
trainerv1 "d7y.io/api/pkg/apis/trainer/v1"
|
||||
|
||||
"d7y.io/dragonfly/v2/trainer/config"
|
||||
"d7y.io/dragonfly/v2/trainer/service"
|
||||
storage "d7y.io/dragonfly/v2/trainer/storage"
|
||||
)
|
||||
|
||||
// schedulerServerV1 is v1 version of the scheduler grpc server.
|
||||
type trainerServerV1 struct {
|
||||
// Service interface.
|
||||
service *service.V1
|
||||
}
|
||||
|
||||
func newTrainerServerV1(cfg *config.Config, st storage.Storage) trainerv1.TrainerServer {
|
||||
return &trainerServerV1{service.NewV1(cfg, st)}
|
||||
}
|
||||
|
||||
// TODO (fyx): implement Train method.
|
||||
// Train handles the training request from scheduler.
|
||||
func (t *trainerServerV1) Train(stream trainerv1.Trainer_TrainServer) error {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"d7y.io/dragonfly/v2/trainer/config"
|
||||
"d7y.io/dragonfly/v2/trainer/storage"
|
||||
)
|
||||
|
||||
// V1 is the interface for v1 version of the service.
|
||||
type V1 struct {
|
||||
// Trainer service config.
|
||||
config *config.Config
|
||||
|
||||
// Storage Interface.
|
||||
storage storage.Storage
|
||||
}
|
||||
|
||||
// New v1 version of service instance.
|
||||
func NewV1(
|
||||
cfg *config.Config,
|
||||
storage storage.Storage,
|
||||
|
||||
) *V1 {
|
||||
return &V1{
|
||||
config: cfg,
|
||||
storage: storage,
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* Copyright 2023 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package trainer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/dfpath"
|
||||
"d7y.io/dragonfly/v2/pkg/net/ip"
|
||||
"d7y.io/dragonfly/v2/trainer/config"
|
||||
"d7y.io/dragonfly/v2/trainer/metrics"
|
||||
"d7y.io/dragonfly/v2/trainer/rpcserver"
|
||||
"d7y.io/dragonfly/v2/trainer/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
// gracefulStopTimeout specifies a time limit for
|
||||
// grpc server to complete a graceful shutdown.
|
||||
gracefulStopTimeout = 10 * time.Minute
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
// Server configuration.
|
||||
config *config.Config
|
||||
|
||||
// GRPC server.
|
||||
grpcServer *grpc.Server
|
||||
|
||||
// Metrics server.
|
||||
metricsServer *http.Server
|
||||
|
||||
// Storage interface.
|
||||
storage storage.Storage
|
||||
}
|
||||
|
||||
func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
|
||||
s := &Server{config: cfg}
|
||||
|
||||
// Initialize Storage.
|
||||
s.storage = storage.New(d.DataDir())
|
||||
|
||||
// Initialize trainer grpc server.
|
||||
s.grpcServer = rpcserver.New(cfg, s.storage)
|
||||
|
||||
// Initialize metrics.
|
||||
if cfg.Metrics.Enable {
|
||||
s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) Serve() error {
|
||||
// Started metrics server.
|
||||
if s.metricsServer != nil {
|
||||
go func() {
|
||||
logger.Infof("started metrics server at %s", s.metricsServer.Addr)
|
||||
if err := s.metricsServer.ListenAndServe(); err != nil {
|
||||
if err == http.ErrServerClosed {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Fatalf("metrics server closed unexpect: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Generate GRPC limit listener.
|
||||
ip, ok := ip.FormatIP(s.config.Server.ListenIP.String())
|
||||
if !ok {
|
||||
return errors.New("format ip failed")
|
||||
}
|
||||
|
||||
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, s.config.Server.Port))
|
||||
if err != nil {
|
||||
logger.Fatalf("net listener failed to start: %s", err.Error())
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
// Started GRPC server.
|
||||
logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String())
|
||||
if err := s.grpcServer.Serve(listener); err != nil {
|
||||
logger.Errorf("stoped grpc server: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) Stop() {
|
||||
// Clean storage file.
|
||||
if err := s.storage.Clear(); err != nil {
|
||||
logger.Errorf("clean storage file failed %s", err.Error())
|
||||
} else {
|
||||
logger.Info("clean storage file completed")
|
||||
}
|
||||
|
||||
// Stop metrics server.
|
||||
if s.metricsServer != nil {
|
||||
if err := s.metricsServer.Shutdown(context.Background()); err != nil {
|
||||
logger.Errorf("metrics server failed to stop: %s", err.Error())
|
||||
} else {
|
||||
logger.Info("metrics server closed under request")
|
||||
}
|
||||
}
|
||||
|
||||
// Stop GRPC server.
|
||||
stopped := make(chan struct{})
|
||||
go func() {
|
||||
s.grpcServer.GracefulStop()
|
||||
logger.Info("grpc server closed under request")
|
||||
close(stopped)
|
||||
}()
|
||||
|
||||
t := time.NewTimer(gracefulStopTimeout)
|
||||
select {
|
||||
case <-t.C:
|
||||
s.grpcServer.Stop()
|
||||
case <-stopped:
|
||||
t.Stop()
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue