diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 5fea9b10f..3debd6bd7 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -57,7 +57,7 @@ type pieceTaskSynchronizer struct { ctxCancel context.CancelFunc span trace.Span syncPiecesStream dfdaemonv1.Daemon_SyncPieceTasksClient - grpcClient dfdaemonclient.Client + grpcClient dfdaemonclient.V1 dstPeer *schedulerv1.PeerPacket_DestPeer error atomic.Value grpcInitialized *atomic.Bool @@ -264,7 +264,7 @@ func (s *pieceTaskSynchronizer) start(request *commonv1.PieceTaskRequest, dstPee credentialOpt := grpc.WithTransportCredentials(s.peerTaskConductor.GRPCCredentials) dialCtx, cancel := context.WithTimeout(s.ctx, s.peerTaskConductor.GRPCDialTimeout) - grpcClient, err := dfdaemonclient.GetClient(dialCtx, netAddr.String(), credentialOpt) + grpcClient, err := dfdaemonclient.GetV1(dialCtx, netAddr.String(), credentialOpt) cancel() if err != nil { diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index 51b23b734..1a45b7cbd 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -1142,7 +1142,7 @@ func TestServer_SyncPieceTasks(t *testing.T) { } } -func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.Client { +func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.V1 { srv.peerServer = dfdaemonserver.New(srv) port, err := freeport.GetFreePort() if err != nil { @@ -1161,7 +1161,7 @@ func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.A Type: dfnet.TCP, Addr: fmt.Sprintf(":%d", port), } - client, err := dfdaemonclient.GetInsecureClient(context.Background(), netAddr.String()) + client, err := dfdaemonclient.GetInsecureV1(context.Background(), netAddr.String()) assert.Nil(err, "grpc dial should be ok") return client } diff --git a/client/dfcache/dfcache.go b/client/dfcache/dfcache.go index 4debf9212..ce2ebc44b 100644 --- a/client/dfcache/dfcache.go +++ b/client/dfcache/dfcache.go @@ -43,7 +43,7 @@ func newCid(cid string) string { // Stat checks if the given cache entry exists in local storage and/or in P2P network, and returns // os.ErrNotExist if cache is not found. -func Stat(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { +func Stat(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error { var ( ctx = context.Background() cancel context.CancelFunc @@ -76,7 +76,7 @@ func Stat(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { return statError } -func statTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { +func statTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { if client == nil { return errors.New("stat has no daemon client") } @@ -109,7 +109,7 @@ func newStatRequest(cfg *config.DfcacheConfig) *dfdaemonv1.StatTaskRequest { } // Import imports the given cache into P2P network. -func Import(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { +func Import(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error { var ( ctx = context.Background() cancel context.CancelFunc @@ -142,7 +142,7 @@ func Import(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { return importError } -func importTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { +func importTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { if client == nil { return errors.New("import has no daemon client") } @@ -171,7 +171,7 @@ func newImportRequest(cfg *config.DfcacheConfig) *dfdaemonv1.ImportTaskRequest { // Export exports or downloads the given cache from P2P network, and return os.ErrNotExist if cache // doesn't exist. -func Export(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { +func Export(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error { var ( ctx = context.Background() cancel context.CancelFunc @@ -204,7 +204,7 @@ func Export(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { return exportError } -func exportTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { +func exportTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { if client == nil { return errors.New("export has no daemon client") } @@ -241,7 +241,7 @@ func newExportRequest(cfg *config.DfcacheConfig) *dfdaemonv1.ExportTaskRequest { } } -func Delete(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { +func Delete(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error { var ( ctx = context.Background() cancel context.CancelFunc @@ -274,7 +274,7 @@ func Delete(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { return deleteError } -func deleteTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { +func deleteTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { if client == nil { return errors.New("delete has no daemon client") } diff --git a/client/dfget/dfget.go b/client/dfget/dfget.go index f3ad5df13..91968850f 100644 --- a/client/dfget/dfget.go +++ b/client/dfget/dfget.go @@ -44,7 +44,7 @@ import ( pkgstrings "d7y.io/dragonfly/v2/pkg/strings" ) -func Download(cfg *config.DfgetConfig, client dfdaemonclient.Client) error { +func Download(cfg *config.DfgetConfig, client dfdaemonclient.V1) error { var ( ctx = context.Background() cancel context.CancelFunc @@ -74,14 +74,14 @@ func Download(cfg *config.DfgetConfig, client dfdaemonclient.Client) error { return downError } -func download(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error { +func download(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error { if cfg.Recursive { return recursiveDownload(ctx, client, cfg) } return singleDownload(ctx, client, cfg, wLog) } -func singleDownload(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error { +func singleDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error { hdr := parseHeader(cfg.Header) if client == nil { @@ -293,7 +293,7 @@ func rejectRegex(u string, reject string) bool { } // recursiveDownload breadth-first download all resources -func recursiveDownload(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfgetConfig) error { +func recursiveDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig) error { // if recursive level is 0, skip recursive level check var skipLevel bool if cfg.RecursiveLevel == 0 { diff --git a/cmd/dfcache/cmd/delete.go b/cmd/dfcache/cmd/delete.go index 5a21791e2..ec55e0e49 100644 --- a/cmd/dfcache/cmd/delete.go +++ b/cmd/dfcache/cmd/delete.go @@ -45,6 +45,6 @@ func initDelete() { rootCmd.AddCommand(deleteCmd) } -func runDelete(cfg *config.DfcacheConfig, client client.Client) error { +func runDelete(cfg *config.DfcacheConfig, client client.V1) error { return dfcache.Delete(cfg, client) } diff --git a/cmd/dfcache/cmd/export.go b/cmd/dfcache/cmd/export.go index 4bbb62352..2b3922045 100644 --- a/cmd/dfcache/cmd/export.go +++ b/cmd/dfcache/cmd/export.go @@ -55,6 +55,6 @@ func initExport() { } } -func runExport(cfg *config.DfcacheConfig, client client.Client) error { +func runExport(cfg *config.DfcacheConfig, client client.V1) error { return dfcache.Export(cfg, client) } diff --git a/cmd/dfcache/cmd/import.go b/cmd/dfcache/cmd/import.go index 6822a00a6..cd4118f29 100644 --- a/cmd/dfcache/cmd/import.go +++ b/cmd/dfcache/cmd/import.go @@ -54,6 +54,6 @@ func initImport() { } } -func runImport(cfg *config.DfcacheConfig, client client.Client) error { +func runImport(cfg *config.DfcacheConfig, client client.V1) error { return dfcache.Import(cfg, client) } diff --git a/cmd/dfcache/cmd/root.go b/cmd/dfcache/cmd/root.go index 72b59a8ff..d62e66d19 100644 --- a/cmd/dfcache/cmd/root.go +++ b/cmd/dfcache/cmd/root.go @@ -122,7 +122,7 @@ func runDfcacheSubcmd(cmdName string, args []string) error { } var ( - dfdaemonClient client.Client + dfdaemonClient client.V1 err error ) @@ -170,9 +170,9 @@ func runDfcacheSubcmd(cmdName string, args []string) error { } // checkDaemon checks if daemon is running -func checkDaemon(daemonSockPath string) (client.Client, error) { +func checkDaemon(daemonSockPath string) (client.V1, error) { netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} - dfdaemonClient, err := client.GetInsecureClient(context.Background(), netAddr.String()) + dfdaemonClient, err := client.GetInsecureV1(context.Background(), netAddr.String()) if err != nil { return nil, err } diff --git a/cmd/dfcache/cmd/stat.go b/cmd/dfcache/cmd/stat.go index 6cec8c22b..84ebe3fe1 100644 --- a/cmd/dfcache/cmd/stat.go +++ b/cmd/dfcache/cmd/stat.go @@ -54,6 +54,6 @@ func initStat() { } } -func runStat(cfg *config.DfcacheConfig, client client.Client) error { +func runStat(cfg *config.DfcacheConfig, client client.V1) error { return dfcache.Stat(cfg, client) } diff --git a/cmd/dfget/cmd/daemon.go b/cmd/dfget/cmd/daemon.go index a68b0c644..a6d8e4864 100644 --- a/cmd/dfget/cmd/daemon.go +++ b/cmd/dfget/cmd/daemon.go @@ -133,7 +133,7 @@ func initDaemonDfpath(cfg *config.DaemonOption) (dfpath.Dfpath, error) { func runDaemon(d dfpath.Dfpath) error { logger.Infof("Version:\n%s", version.Version()) netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()} - daemonClient, err := client.GetInsecureClient(context.Background(), netAddr.String()) + daemonClient, err := client.GetInsecureV1(context.Background(), netAddr.String()) if err != nil { return err } diff --git a/cmd/dfget/cmd/root.go b/cmd/dfget/cmd/root.go index 916dabeba..2c5784de5 100644 --- a/cmd/dfget/cmd/root.go +++ b/cmd/dfget/cmd/root.go @@ -227,7 +227,7 @@ func runDfget(cmd *cobra.Command, dfgetLockPath, daemonSockPath string) error { defer ff() var ( - dfdaemonClient client.Client + dfdaemonClient client.V1 err error ) @@ -269,9 +269,9 @@ func loadSourceClients(cmd *cobra.Command) error { } // checkAndSpawnDaemon do checking at three checkpoints -func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.Client, error) { +func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.V1, error) { netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} - dfdaemonClient, err := client.GetInsecureClient(context.Background(), netAddr.String()) + dfdaemonClient, err := client.GetInsecureV1(context.Background(), netAddr.String()) if err != nil { return nil, err } diff --git a/pkg/rpc/dfdaemon/client/client.go b/pkg/rpc/dfdaemon/client/client_v1.go similarity index 63% rename from pkg/rpc/dfdaemon/client/client.go rename to pkg/rpc/dfdaemon/client/client_v1.go index bf4c089c1..75388a587 100644 --- a/pkg/rpc/dfdaemon/client/client.go +++ b/pkg/rpc/dfdaemon/client/client_v1.go @@ -1,5 +1,5 @@ /* - * Copyright 2020 The Dragonfly Authors + * Copyright 2023 The Dragonfly Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,13 +14,12 @@ * limitations under the License. */ -//go:generate mockgen -destination mocks/client_mock.go -source client.go -package mocks +//go:generate mockgen -destination mocks/client_v1_mock.go -source client_v1.go -package mocks package client import ( "context" - "time" "github.com/google/uuid" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -39,20 +38,8 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc" ) -const ( - // contextTimeout is timeout of grpc invoke. - contextTimeout = 2 * time.Minute - - // maxRetries is maximum number of retries. - maxRetries = 3 - - // backoffWaitBetween is waiting for a fixed period of - // time between calls in backoff linear. - backoffWaitBetween = 500 * time.Millisecond -) - -// GetClient returns dfdaemon client. -func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Client, error) { +// GetV1 returns v1 version of the dfdaemon client. +func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) { if rpc.IsVsock(target) { opts = append(opts, grpc.WithContextDialer(rpc.VsockDialer)) } @@ -83,20 +70,20 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli return nil, err } - return &client{ + return &v1{ DaemonClient: dfdaemonv1.NewDaemonClient(conn), ClientConn: conn, }, nil } -// GetInsecureClient returns dfdaemon client. -// FIXME use GetClient + insecure.NewCredentials instead of this function -func GetInsecureClient(ctx context.Context, target string, opts ...grpc.DialOption) (Client, error) { - return GetClient(ctx, target, append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) +// GetInsecureV1 returns v1 version of the dfdaemon client. +// FIXME use GetV1 and insecure.NewCredentials instead of this function +func GetInsecureV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) { + return GetV1(ctx, target, append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) } -// Client is the interface for grpc client. -type Client interface { +// V1 is the interface for v1 version of the grpc client. +type V1 interface { // Trigger client to download file. Download(context.Context, *dfdaemonv1.DownRequest, ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error) @@ -125,29 +112,29 @@ type Client interface { Close() error } -// client provides dfdaemon grpc function. -type client struct { +// v1 provides v1 version of the dfdaemon grpc function. +type v1 struct { dfdaemonv1.DaemonClient *grpc.ClientConn } // Trigger client to download file. -func (c *client) Download(ctx context.Context, req *dfdaemonv1.DownRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error) { +func (v *v1) Download(ctx context.Context, req *dfdaemonv1.DownRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error) { req.Uuid = uuid.New().String() - return c.DaemonClient.Download(ctx, req, opts...) + return v.DaemonClient.Download(ctx, req, opts...) } // Get piece tasks from other peers. -func (c *client) GetPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (*commonv1.PiecePacket, error) { +func (v *v1) GetPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (*commonv1.PiecePacket, error) { ctx, cancel := context.WithTimeout(ctx, contextTimeout) defer cancel() - return c.DaemonClient.GetPieceTasks(ctx, req, opts...) + return v.DaemonClient.GetPieceTasks(ctx, req, opts...) } // Sync piece tasks with other peers. -func (c *client) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_SyncPieceTasksClient, error) { - stream, err := c.DaemonClient.SyncPieceTasks(ctx, opts...) +func (v *v1) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_SyncPieceTasksClient, error) { + stream, err := v.DaemonClient.SyncPieceTasks(ctx, opts...) if err != nil { return nil, err } @@ -156,40 +143,40 @@ func (c *client) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequ } // Check if given task exists in P2P cache system. -func (c *client) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest, opts ...grpc.CallOption) error { +func (v *v1) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest, opts ...grpc.CallOption) error { ctx, cancel := context.WithTimeout(ctx, contextTimeout) defer cancel() - _, err := c.DaemonClient.StatTask(ctx, req, opts...) + _, err := v.DaemonClient.StatTask(ctx, req, opts...) return err } // Import the given file into P2P cache system. -func (c *client) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest, opts ...grpc.CallOption) error { - _, err := c.DaemonClient.ImportTask(ctx, req, opts...) +func (v *v1) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest, opts ...grpc.CallOption) error { + _, err := v.DaemonClient.ImportTask(ctx, req, opts...) return err } // Export or download file from P2P cache system. -func (c *client) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest, opts ...grpc.CallOption) error { - _, err := c.DaemonClient.ExportTask(ctx, req, opts...) +func (v *v1) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest, opts ...grpc.CallOption) error { + _, err := v.DaemonClient.ExportTask(ctx, req, opts...) return err } // Delete file from P2P cache system. -func (c *client) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest, opts ...grpc.CallOption) error { +func (v *v1) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest, opts ...grpc.CallOption) error { ctx, cancel := context.WithTimeout(ctx, contextTimeout) defer cancel() - _, err := c.DaemonClient.DeleteTask(ctx, req, opts...) + _, err := v.DaemonClient.DeleteTask(ctx, req, opts...) return err } // Check daemon health. -func (c *client) CheckHealth(ctx context.Context, opts ...grpc.CallOption) error { +func (v *v1) CheckHealth(ctx context.Context, opts ...grpc.CallOption) error { ctx, cancel := context.WithTimeout(ctx, contextTimeout) defer cancel() - _, err := c.DaemonClient.CheckHealth(ctx, new(emptypb.Empty), opts...) + _, err := v.DaemonClient.CheckHealth(ctx, new(emptypb.Empty), opts...) return err } diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go new file mode 100644 index 000000000..76e18ca56 --- /dev/null +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -0,0 +1,173 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination mocks/client_v2_mock.go -source client_v2.go -package mocks + +package client + +import ( + "context" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + + commonv2 "d7y.io/api/pkg/apis/common/v2" + dfdaemonv2 "d7y.io/api/pkg/apis/dfdaemon/v2" + + logger "d7y.io/dragonfly/v2/internal/dflog" +) + +// GetV2 returns v2 version of the dfdaemon client. +func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) { + conn, err := grpc.DialContext( + ctx, + target, + append([]grpc.DialOption{ + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otelgrpc.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), + }, opts...)..., + ) + if err != nil { + return nil, err + } + + return &v2{ + DfdaemonClient: dfdaemonv2.NewDfdaemonClient(conn), + ClientConn: conn, + }, nil +} + +// V2 is the interface for v2 version of the grpc client. +type V2 interface { + // SyncPieces syncs pieces from the other peers. + SyncPieces(context.Context, ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) + + // DownloadTask downloads task back-to-source. + DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error + + // StatTask stats task information. + StatTask(context.Context, *dfdaemonv2.StatTaskRequest, ...grpc.CallOption) (*commonv2.Task, error) + + // ImportTask imports task to p2p network. + ImportTask(context.Context, *dfdaemonv2.ImportTaskRequest, ...grpc.CallOption) error + + // ExportTask exports task from p2p network. + ExportTask(context.Context, *dfdaemonv2.ExportTaskRequest, ...grpc.CallOption) error + + // DeleteTask deletes task from p2p network. + DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error + + // Close tears down the ClientConn and all underlying connections. + Close() error +} + +// v2 provides v2 version of the dfdaemon grpc function. +type v2 struct { + dfdaemonv2.DfdaemonClient + *grpc.ClientConn +} + +// Trigger client to download file. +func (v *v2) SyncPieces(ctx context.Context, opts ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) { + return v.DfdaemonClient.SyncPieces( + ctx, + opts..., + ) +} + +// DownloadTask downloads task back-to-source. +func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + _, err := v.DfdaemonClient.DownloadTask( + ctx, + req, + opts..., + ) + + return err +} + +// StatTask stats task information. +func (v *v2) StatTask(ctx context.Context, req *dfdaemonv2.StatTaskRequest, opts ...grpc.CallOption) (*commonv2.Task, error) { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + return v.DfdaemonClient.StatTask( + ctx, + req, + opts..., + ) +} + +// ImportTask imports task to p2p network. +func (v *v2) ImportTask(ctx context.Context, req *dfdaemonv2.ImportTaskRequest, opts ...grpc.CallOption) error { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + _, err := v.DfdaemonClient.ImportTask( + ctx, + req, + opts..., + ) + + return err +} + +// ExportTask exports task from p2p network. +func (v *v2) ExportTask(ctx context.Context, req *dfdaemonv2.ExportTaskRequest, opts ...grpc.CallOption) error { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + _, err := v.DfdaemonClient.ExportTask( + ctx, + req, + opts..., + ) + + return err +} + +// DeleteTask deletes task from p2p network. +func (v *v2) DeleteTask(ctx context.Context, req *dfdaemonv2.DeleteTaskRequest, opts ...grpc.CallOption) error { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + _, err := v.DfdaemonClient.DeleteTask( + ctx, + req, + opts..., + ) + + return err +} diff --git a/pkg/rpc/dfdaemon/client/constants.go b/pkg/rpc/dfdaemon/client/constants.go new file mode 100644 index 000000000..fbaf9282f --- /dev/null +++ b/pkg/rpc/dfdaemon/client/constants.go @@ -0,0 +1,33 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "time" +) + +const ( + // contextTimeout is timeout of grpc invoke. + contextTimeout = 2 * time.Minute + + // maxRetries is maximum number of retries. + maxRetries = 3 + + // backoffWaitBetween is waiting for a fixed period of + // time between calls in backoff linear. + backoffWaitBetween = 500 * time.Millisecond +) diff --git a/pkg/rpc/dfdaemon/client/mocks/client_mock.go b/pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go similarity index 60% rename from pkg/rpc/dfdaemon/client/mocks/client_mock.go rename to pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go index eda0136df..7692a0bc6 100644 --- a/pkg/rpc/dfdaemon/client/mocks/client_mock.go +++ b/pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: client.go +// Source: client_v1.go // Package mocks is a generated GoMock package. package mocks @@ -14,31 +14,31 @@ import ( grpc "google.golang.org/grpc" ) -// MockClient is a mock of Client interface. -type MockClient struct { +// MockV1 is a mock of V1 interface. +type MockV1 struct { ctrl *gomock.Controller - recorder *MockClientMockRecorder + recorder *MockV1MockRecorder } -// MockClientMockRecorder is the mock recorder for MockClient. -type MockClientMockRecorder struct { - mock *MockClient +// MockV1MockRecorder is the mock recorder for MockV1. +type MockV1MockRecorder struct { + mock *MockV1 } -// NewMockClient creates a new mock instance. -func NewMockClient(ctrl *gomock.Controller) *MockClient { - mock := &MockClient{ctrl: ctrl} - mock.recorder = &MockClientMockRecorder{mock} +// NewMockV1 creates a new mock instance. +func NewMockV1(ctrl *gomock.Controller) *MockV1 { + mock := &MockV1{ctrl: ctrl} + mock.recorder = &MockV1MockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockClient) EXPECT() *MockClientMockRecorder { +func (m *MockV1) EXPECT() *MockV1MockRecorder { return m.recorder } // CheckHealth mocks base method. -func (m *MockClient) CheckHealth(arg0 context.Context, arg1 ...grpc.CallOption) error { +func (m *MockV1) CheckHealth(arg0 context.Context, arg1 ...grpc.CallOption) error { m.ctrl.T.Helper() varargs := []interface{}{arg0} for _, a := range arg1 { @@ -50,14 +50,14 @@ func (m *MockClient) CheckHealth(arg0 context.Context, arg1 ...grpc.CallOption) } // CheckHealth indicates an expected call of CheckHealth. -func (mr *MockClientMockRecorder) CheckHealth(arg0 interface{}, arg1 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) CheckHealth(arg0 interface{}, arg1 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockClient)(nil).CheckHealth), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockV1)(nil).CheckHealth), varargs...) } // Close mocks base method. -func (m *MockClient) Close() error { +func (m *MockV1) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) @@ -65,13 +65,13 @@ func (m *MockClient) Close() error { } // Close indicates an expected call of Close. -func (mr *MockClientMockRecorder) Close() *gomock.Call { +func (mr *MockV1MockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockV1)(nil).Close)) } // DeleteTask mocks base method. -func (m *MockClient) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error { +func (m *MockV1) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -83,14 +83,14 @@ func (m *MockClient) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskR } // DeleteTask indicates an expected call of DeleteTask. -func (mr *MockClientMockRecorder) DeleteTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) DeleteTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockClient)(nil).DeleteTask), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV1)(nil).DeleteTask), varargs...) } // Download mocks base method. -func (m *MockClient) Download(arg0 context.Context, arg1 *dfdaemon.DownRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_DownloadClient, error) { +func (m *MockV1) Download(arg0 context.Context, arg1 *dfdaemon.DownRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_DownloadClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -103,14 +103,14 @@ func (m *MockClient) Download(arg0 context.Context, arg1 *dfdaemon.DownRequest, } // Download indicates an expected call of Download. -func (mr *MockClientMockRecorder) Download(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) Download(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockClient)(nil).Download), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockV1)(nil).Download), varargs...) } // ExportTask mocks base method. -func (m *MockClient) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskRequest, arg2 ...grpc.CallOption) error { +func (m *MockV1) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskRequest, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -122,14 +122,14 @@ func (m *MockClient) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskR } // ExportTask indicates an expected call of ExportTask. -func (mr *MockClientMockRecorder) ExportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) ExportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockClient)(nil).ExportTask), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockV1)(nil).ExportTask), varargs...) } // GetPieceTasks mocks base method. -func (m *MockClient) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (*common.PiecePacket, error) { +func (m *MockV1) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (*common.PiecePacket, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -142,14 +142,14 @@ func (m *MockClient) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskR } // GetPieceTasks indicates an expected call of GetPieceTasks. -func (mr *MockClientMockRecorder) GetPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) GetPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockClient)(nil).GetPieceTasks), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockV1)(nil).GetPieceTasks), varargs...) } // ImportTask mocks base method. -func (m *MockClient) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskRequest, arg2 ...grpc.CallOption) error { +func (m *MockV1) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskRequest, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -161,14 +161,14 @@ func (m *MockClient) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskR } // ImportTask indicates an expected call of ImportTask. -func (mr *MockClientMockRecorder) ImportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) ImportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockClient)(nil).ImportTask), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockV1)(nil).ImportTask), varargs...) } // StatTask mocks base method. -func (m *MockClient) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) error { +func (m *MockV1) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) error { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -180,14 +180,14 @@ func (m *MockClient) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskReque } // StatTask indicates an expected call of StatTask. -func (mr *MockClientMockRecorder) StatTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) StatTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockClient)(nil).StatTask), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockV1)(nil).StatTask), varargs...) } // SyncPieceTasks mocks base method. -func (m *MockClient) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_SyncPieceTasksClient, error) { +func (m *MockV1) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_SyncPieceTasksClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -200,8 +200,8 @@ func (m *MockClient) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTask } // SyncPieceTasks indicates an expected call of SyncPieceTasks. -func (mr *MockClientMockRecorder) SyncPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockV1MockRecorder) SyncPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockClient)(nil).SyncPieceTasks), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockV1)(nil).SyncPieceTasks), varargs...) } diff --git a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go new file mode 100644 index 000000000..b4bb55a38 --- /dev/null +++ b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go @@ -0,0 +1,168 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: client_v2.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + common "d7y.io/api/pkg/apis/common/v2" + dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v2" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockV2 is a mock of V2 interface. +type MockV2 struct { + ctrl *gomock.Controller + recorder *MockV2MockRecorder +} + +// MockV2MockRecorder is the mock recorder for MockV2. +type MockV2MockRecorder struct { + mock *MockV2 +} + +// NewMockV2 creates a new mock instance. +func NewMockV2(ctrl *gomock.Controller) *MockV2 { + mock := &MockV2{ctrl: ctrl} + mock.recorder = &MockV2MockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockV2) EXPECT() *MockV2MockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockV2) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockV2MockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockV2)(nil).Close)) +} + +// DeleteTask mocks base method. +func (m *MockV2) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteTask", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTask indicates an expected call of DeleteTask. +func (mr *MockV2MockRecorder) DeleteTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV2)(nil).DeleteTask), varargs...) +} + +// DownloadTask mocks base method. +func (m *MockV2) DownloadTask(arg0 context.Context, arg1 *dfdaemon.DownloadTaskRequest, arg2 ...grpc.CallOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DownloadTask", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// DownloadTask indicates an expected call of DownloadTask. +func (mr *MockV2MockRecorder) DownloadTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockV2)(nil).DownloadTask), varargs...) +} + +// ExportTask mocks base method. +func (m *MockV2) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskRequest, arg2 ...grpc.CallOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ExportTask", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// ExportTask indicates an expected call of ExportTask. +func (mr *MockV2MockRecorder) ExportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockV2)(nil).ExportTask), varargs...) +} + +// ImportTask mocks base method. +func (m *MockV2) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskRequest, arg2 ...grpc.CallOption) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ImportTask", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// ImportTask indicates an expected call of ImportTask. +func (mr *MockV2MockRecorder) ImportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockV2)(nil).ImportTask), varargs...) +} + +// StatTask mocks base method. +func (m *MockV2) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) (*common.Task, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "StatTask", varargs...) + ret0, _ := ret[0].(*common.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StatTask indicates an expected call of StatTask. +func (mr *MockV2MockRecorder) StatTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockV2)(nil).StatTask), varargs...) +} + +// SyncPieces mocks base method. +func (m *MockV2) SyncPieces(arg0 context.Context, arg1 ...grpc.CallOption) (dfdaemon.Dfdaemon_SyncPiecesClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SyncPieces", varargs...) + ret0, _ := ret[0].(dfdaemon.Dfdaemon_SyncPiecesClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncPieces indicates an expected call of SyncPieces. +func (mr *MockV2MockRecorder) SyncPieces(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockV2)(nil).SyncPieces), varargs...) +} diff --git a/pkg/rpc/scheduler/client/client_v2.go b/pkg/rpc/scheduler/client/client_v2.go index ba233fc5d..37bc64754 100644 --- a/pkg/rpc/scheduler/client/client_v2.go +++ b/pkg/rpc/scheduler/client/client_v2.go @@ -88,7 +88,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt // V2 is the interface for v1 version of the grpc client. type V2 interface { // AnnouncePeer announces peer to scheduler. - AnnouncePeer(context.Context, *schedulerv2.AnnouncePeerRequest, ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) + AnnouncePeer(context.Context, string, ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) // Checks information of peer. StatPeer(context.Context, *schedulerv2.StatPeerRequest, ...grpc.CallOption) (*commonv2.Peer, error) @@ -123,9 +123,9 @@ type v2 struct { } // AnnouncePeer announces peer to scheduler. -func (v *v2) AnnouncePeer(ctx context.Context, req *schedulerv2.AnnouncePeerRequest, opts ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) { +func (v *v2) AnnouncePeer(ctx context.Context, taskID string, opts ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) { return v.SchedulerClient.AnnouncePeer( - context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, taskID), opts..., ) } diff --git a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go index 2c29d5b23..9d99bacf1 100644 --- a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go +++ b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go @@ -57,7 +57,7 @@ func (mr *MockV2MockRecorder) AnnounceHost(arg0, arg1 interface{}, arg2 ...inter } // AnnouncePeer mocks base method. -func (m *MockV2) AnnouncePeer(arg0 context.Context, arg1 *scheduler.AnnouncePeerRequest, arg2 ...grpc.CallOption) (scheduler.Scheduler_AnnouncePeerClient, error) { +func (m *MockV2) AnnouncePeer(arg0 context.Context, arg1 string, arg2 ...grpc.CallOption) (scheduler.Scheduler_AnnouncePeerClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 {