feat: add v2 version of dfdaemon client (#2050)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-02-07 14:09:00 +08:00
parent 29e57d1129
commit 7dc3c826f2
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
18 changed files with 473 additions and 112 deletions

View File

@ -57,7 +57,7 @@ type pieceTaskSynchronizer struct {
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
span trace.Span span trace.Span
syncPiecesStream dfdaemonv1.Daemon_SyncPieceTasksClient syncPiecesStream dfdaemonv1.Daemon_SyncPieceTasksClient
grpcClient dfdaemonclient.Client grpcClient dfdaemonclient.V1
dstPeer *schedulerv1.PeerPacket_DestPeer dstPeer *schedulerv1.PeerPacket_DestPeer
error atomic.Value error atomic.Value
grpcInitialized *atomic.Bool grpcInitialized *atomic.Bool
@ -264,7 +264,7 @@ func (s *pieceTaskSynchronizer) start(request *commonv1.PieceTaskRequest, dstPee
credentialOpt := grpc.WithTransportCredentials(s.peerTaskConductor.GRPCCredentials) credentialOpt := grpc.WithTransportCredentials(s.peerTaskConductor.GRPCCredentials)
dialCtx, cancel := context.WithTimeout(s.ctx, s.peerTaskConductor.GRPCDialTimeout) dialCtx, cancel := context.WithTimeout(s.ctx, s.peerTaskConductor.GRPCDialTimeout)
grpcClient, err := dfdaemonclient.GetClient(dialCtx, netAddr.String(), credentialOpt) grpcClient, err := dfdaemonclient.GetV1(dialCtx, netAddr.String(), credentialOpt)
cancel() cancel()
if err != nil { if err != nil {

View File

@ -1142,7 +1142,7 @@ func TestServer_SyncPieceTasks(t *testing.T) {
} }
} }
func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.Client { func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.V1 {
srv.peerServer = dfdaemonserver.New(srv) srv.peerServer = dfdaemonserver.New(srv)
port, err := freeport.GetFreePort() port, err := freeport.GetFreePort()
if err != nil { if err != nil {
@ -1161,7 +1161,7 @@ func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.A
Type: dfnet.TCP, Type: dfnet.TCP,
Addr: fmt.Sprintf(":%d", port), Addr: fmt.Sprintf(":%d", port),
} }
client, err := dfdaemonclient.GetInsecureClient(context.Background(), netAddr.String()) client, err := dfdaemonclient.GetInsecureV1(context.Background(), netAddr.String())
assert.Nil(err, "grpc dial should be ok") assert.Nil(err, "grpc dial should be ok")
return client return client
} }

View File

@ -43,7 +43,7 @@ func newCid(cid string) string {
// Stat checks if the given cache entry exists in local storage and/or in P2P network, and returns // Stat checks if the given cache entry exists in local storage and/or in P2P network, and returns
// os.ErrNotExist if cache is not found. // os.ErrNotExist if cache is not found.
func Stat(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { func Stat(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error {
var ( var (
ctx = context.Background() ctx = context.Background()
cancel context.CancelFunc cancel context.CancelFunc
@ -76,7 +76,7 @@ func Stat(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error {
return statError return statError
} }
func statTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { func statTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error {
if client == nil { if client == nil {
return errors.New("stat has no daemon client") return errors.New("stat has no daemon client")
} }
@ -109,7 +109,7 @@ func newStatRequest(cfg *config.DfcacheConfig) *dfdaemonv1.StatTaskRequest {
} }
// Import imports the given cache into P2P network. // Import imports the given cache into P2P network.
func Import(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { func Import(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error {
var ( var (
ctx = context.Background() ctx = context.Background()
cancel context.CancelFunc cancel context.CancelFunc
@ -142,7 +142,7 @@ func Import(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error {
return importError return importError
} }
func importTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { func importTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error {
if client == nil { if client == nil {
return errors.New("import has no daemon client") return errors.New("import has no daemon client")
} }
@ -171,7 +171,7 @@ func newImportRequest(cfg *config.DfcacheConfig) *dfdaemonv1.ImportTaskRequest {
// Export exports or downloads the given cache from P2P network, and return os.ErrNotExist if cache // Export exports or downloads the given cache from P2P network, and return os.ErrNotExist if cache
// doesn't exist. // doesn't exist.
func Export(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { func Export(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error {
var ( var (
ctx = context.Background() ctx = context.Background()
cancel context.CancelFunc cancel context.CancelFunc
@ -204,7 +204,7 @@ func Export(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error {
return exportError return exportError
} }
func exportTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { func exportTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error {
if client == nil { if client == nil {
return errors.New("export has no daemon client") return errors.New("export has no daemon client")
} }
@ -241,7 +241,7 @@ func newExportRequest(cfg *config.DfcacheConfig) *dfdaemonv1.ExportTaskRequest {
} }
} }
func Delete(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error { func Delete(cfg *config.DfcacheConfig, client dfdaemonclient.V1) error {
var ( var (
ctx = context.Background() ctx = context.Background()
cancel context.CancelFunc cancel context.CancelFunc
@ -274,7 +274,7 @@ func Delete(cfg *config.DfcacheConfig, client dfdaemonclient.Client) error {
return deleteError return deleteError
} }
func deleteTask(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error { func deleteTask(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfcacheConfig, wLog *logger.SugaredLoggerOnWith) error {
if client == nil { if client == nil {
return errors.New("delete has no daemon client") return errors.New("delete has no daemon client")
} }

View File

@ -44,7 +44,7 @@ import (
pkgstrings "d7y.io/dragonfly/v2/pkg/strings" pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
) )
func Download(cfg *config.DfgetConfig, client dfdaemonclient.Client) error { func Download(cfg *config.DfgetConfig, client dfdaemonclient.V1) error {
var ( var (
ctx = context.Background() ctx = context.Background()
cancel context.CancelFunc cancel context.CancelFunc
@ -74,14 +74,14 @@ func Download(cfg *config.DfgetConfig, client dfdaemonclient.Client) error {
return downError return downError
} }
func download(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error { func download(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error {
if cfg.Recursive { if cfg.Recursive {
return recursiveDownload(ctx, client, cfg) return recursiveDownload(ctx, client, cfg)
} }
return singleDownload(ctx, client, cfg, wLog) return singleDownload(ctx, client, cfg, wLog)
} }
func singleDownload(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error { func singleDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error {
hdr := parseHeader(cfg.Header) hdr := parseHeader(cfg.Header)
if client == nil { if client == nil {
@ -293,7 +293,7 @@ func rejectRegex(u string, reject string) bool {
} }
// recursiveDownload breadth-first download all resources // recursiveDownload breadth-first download all resources
func recursiveDownload(ctx context.Context, client dfdaemonclient.Client, cfg *config.DfgetConfig) error { func recursiveDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig) error {
// if recursive level is 0, skip recursive level check // if recursive level is 0, skip recursive level check
var skipLevel bool var skipLevel bool
if cfg.RecursiveLevel == 0 { if cfg.RecursiveLevel == 0 {

View File

@ -45,6 +45,6 @@ func initDelete() {
rootCmd.AddCommand(deleteCmd) rootCmd.AddCommand(deleteCmd)
} }
func runDelete(cfg *config.DfcacheConfig, client client.Client) error { func runDelete(cfg *config.DfcacheConfig, client client.V1) error {
return dfcache.Delete(cfg, client) return dfcache.Delete(cfg, client)
} }

View File

@ -55,6 +55,6 @@ func initExport() {
} }
} }
func runExport(cfg *config.DfcacheConfig, client client.Client) error { func runExport(cfg *config.DfcacheConfig, client client.V1) error {
return dfcache.Export(cfg, client) return dfcache.Export(cfg, client)
} }

View File

@ -54,6 +54,6 @@ func initImport() {
} }
} }
func runImport(cfg *config.DfcacheConfig, client client.Client) error { func runImport(cfg *config.DfcacheConfig, client client.V1) error {
return dfcache.Import(cfg, client) return dfcache.Import(cfg, client)
} }

View File

@ -122,7 +122,7 @@ func runDfcacheSubcmd(cmdName string, args []string) error {
} }
var ( var (
dfdaemonClient client.Client dfdaemonClient client.V1
err error err error
) )
@ -170,9 +170,9 @@ func runDfcacheSubcmd(cmdName string, args []string) error {
} }
// checkDaemon checks if daemon is running // checkDaemon checks if daemon is running
func checkDaemon(daemonSockPath string) (client.Client, error) { func checkDaemon(daemonSockPath string) (client.V1, error) {
netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}
dfdaemonClient, err := client.GetInsecureClient(context.Background(), netAddr.String()) dfdaemonClient, err := client.GetInsecureV1(context.Background(), netAddr.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -54,6 +54,6 @@ func initStat() {
} }
} }
func runStat(cfg *config.DfcacheConfig, client client.Client) error { func runStat(cfg *config.DfcacheConfig, client client.V1) error {
return dfcache.Stat(cfg, client) return dfcache.Stat(cfg, client)
} }

View File

@ -133,7 +133,7 @@ func initDaemonDfpath(cfg *config.DaemonOption) (dfpath.Dfpath, error) {
func runDaemon(d dfpath.Dfpath) error { func runDaemon(d dfpath.Dfpath) error {
logger.Infof("Version:\n%s", version.Version()) logger.Infof("Version:\n%s", version.Version())
netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()} netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()}
daemonClient, err := client.GetInsecureClient(context.Background(), netAddr.String()) daemonClient, err := client.GetInsecureV1(context.Background(), netAddr.String())
if err != nil { if err != nil {
return err return err
} }

View File

@ -227,7 +227,7 @@ func runDfget(cmd *cobra.Command, dfgetLockPath, daemonSockPath string) error {
defer ff() defer ff()
var ( var (
dfdaemonClient client.Client dfdaemonClient client.V1
err error err error
) )
@ -269,9 +269,9 @@ func loadSourceClients(cmd *cobra.Command) error {
} }
// checkAndSpawnDaemon do checking at three checkpoints // checkAndSpawnDaemon do checking at three checkpoints
func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.Client, error) { func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.V1, error) {
netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}
dfdaemonClient, err := client.GetInsecureClient(context.Background(), netAddr.String()) dfdaemonClient, err := client.GetInsecureV1(context.Background(), netAddr.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 The Dragonfly Authors * Copyright 2023 The Dragonfly Authors
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -14,13 +14,12 @@
* limitations under the License. * limitations under the License.
*/ */
//go:generate mockgen -destination mocks/client_mock.go -source client.go -package mocks //go:generate mockgen -destination mocks/client_v1_mock.go -source client_v1.go -package mocks
package client package client
import ( import (
"context" "context"
"time"
"github.com/google/uuid" "github.com/google/uuid"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@ -39,20 +38,8 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc"
) )
const ( // GetV1 returns v1 version of the dfdaemon client.
// contextTimeout is timeout of grpc invoke. func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
contextTimeout = 2 * time.Minute
// maxRetries is maximum number of retries.
maxRetries = 3
// backoffWaitBetween is waiting for a fixed period of
// time between calls in backoff linear.
backoffWaitBetween = 500 * time.Millisecond
)
// GetClient returns dfdaemon client.
func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Client, error) {
if rpc.IsVsock(target) { if rpc.IsVsock(target) {
opts = append(opts, grpc.WithContextDialer(rpc.VsockDialer)) opts = append(opts, grpc.WithContextDialer(rpc.VsockDialer))
} }
@ -83,20 +70,20 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli
return nil, err return nil, err
} }
return &client{ return &v1{
DaemonClient: dfdaemonv1.NewDaemonClient(conn), DaemonClient: dfdaemonv1.NewDaemonClient(conn),
ClientConn: conn, ClientConn: conn,
}, nil }, nil
} }
// GetInsecureClient returns dfdaemon client. // GetInsecureV1 returns v1 version of the dfdaemon client.
// FIXME use GetClient + insecure.NewCredentials instead of this function // FIXME use GetV1 and insecure.NewCredentials instead of this function
func GetInsecureClient(ctx context.Context, target string, opts ...grpc.DialOption) (Client, error) { func GetInsecureV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
return GetClient(ctx, target, append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) return GetV1(ctx, target, append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
} }
// Client is the interface for grpc client. // V1 is the interface for v1 version of the grpc client.
type Client interface { type V1 interface {
// Trigger client to download file. // Trigger client to download file.
Download(context.Context, *dfdaemonv1.DownRequest, ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error) Download(context.Context, *dfdaemonv1.DownRequest, ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error)
@ -125,29 +112,29 @@ type Client interface {
Close() error Close() error
} }
// client provides dfdaemon grpc function. // v1 provides v1 version of the dfdaemon grpc function.
type client struct { type v1 struct {
dfdaemonv1.DaemonClient dfdaemonv1.DaemonClient
*grpc.ClientConn *grpc.ClientConn
} }
// Trigger client to download file. // Trigger client to download file.
func (c *client) Download(ctx context.Context, req *dfdaemonv1.DownRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error) { func (v *v1) Download(ctx context.Context, req *dfdaemonv1.DownRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_DownloadClient, error) {
req.Uuid = uuid.New().String() req.Uuid = uuid.New().String()
return c.DaemonClient.Download(ctx, req, opts...) return v.DaemonClient.Download(ctx, req, opts...)
} }
// Get piece tasks from other peers. // Get piece tasks from other peers.
func (c *client) GetPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (*commonv1.PiecePacket, error) { func (v *v1) GetPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (*commonv1.PiecePacket, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout) ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel() defer cancel()
return c.DaemonClient.GetPieceTasks(ctx, req, opts...) return v.DaemonClient.GetPieceTasks(ctx, req, opts...)
} }
// Sync piece tasks with other peers. // Sync piece tasks with other peers.
func (c *client) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_SyncPieceTasksClient, error) { func (v *v1) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, opts ...grpc.CallOption) (dfdaemonv1.Daemon_SyncPieceTasksClient, error) {
stream, err := c.DaemonClient.SyncPieceTasks(ctx, opts...) stream, err := v.DaemonClient.SyncPieceTasks(ctx, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -156,40 +143,40 @@ func (c *client) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequ
} }
// Check if given task exists in P2P cache system. // Check if given task exists in P2P cache system.
func (c *client) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest, opts ...grpc.CallOption) error { func (v *v1) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout) ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel() defer cancel()
_, err := c.DaemonClient.StatTask(ctx, req, opts...) _, err := v.DaemonClient.StatTask(ctx, req, opts...)
return err return err
} }
// Import the given file into P2P cache system. // Import the given file into P2P cache system.
func (c *client) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest, opts ...grpc.CallOption) error { func (v *v1) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest, opts ...grpc.CallOption) error {
_, err := c.DaemonClient.ImportTask(ctx, req, opts...) _, err := v.DaemonClient.ImportTask(ctx, req, opts...)
return err return err
} }
// Export or download file from P2P cache system. // Export or download file from P2P cache system.
func (c *client) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest, opts ...grpc.CallOption) error { func (v *v1) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest, opts ...grpc.CallOption) error {
_, err := c.DaemonClient.ExportTask(ctx, req, opts...) _, err := v.DaemonClient.ExportTask(ctx, req, opts...)
return err return err
} }
// Delete file from P2P cache system. // Delete file from P2P cache system.
func (c *client) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest, opts ...grpc.CallOption) error { func (v *v1) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout) ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel() defer cancel()
_, err := c.DaemonClient.DeleteTask(ctx, req, opts...) _, err := v.DaemonClient.DeleteTask(ctx, req, opts...)
return err return err
} }
// Check daemon health. // Check daemon health.
func (c *client) CheckHealth(ctx context.Context, opts ...grpc.CallOption) error { func (v *v1) CheckHealth(ctx context.Context, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout) ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel() defer cancel()
_, err := c.DaemonClient.CheckHealth(ctx, new(emptypb.Empty), opts...) _, err := v.DaemonClient.CheckHealth(ctx, new(emptypb.Empty), opts...)
return err return err
} }

View File

@ -0,0 +1,173 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/client_v2_mock.go -source client_v2.go -package mocks
package client
import (
"context"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
commonv2 "d7y.io/api/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/pkg/apis/dfdaemon/v2"
logger "d7y.io/dragonfly/v2/internal/dflog"
)
// GetV2 returns v2 version of the dfdaemon client.
func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
conn, err := grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(maxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)),
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
}, opts...)...,
)
if err != nil {
return nil, err
}
return &v2{
DfdaemonClient: dfdaemonv2.NewDfdaemonClient(conn),
ClientConn: conn,
}, nil
}
// V2 is the interface for v2 version of the grpc client.
type V2 interface {
// SyncPieces syncs pieces from the other peers.
SyncPieces(context.Context, ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error)
// DownloadTask downloads task back-to-source.
DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error
// StatTask stats task information.
StatTask(context.Context, *dfdaemonv2.StatTaskRequest, ...grpc.CallOption) (*commonv2.Task, error)
// ImportTask imports task to p2p network.
ImportTask(context.Context, *dfdaemonv2.ImportTaskRequest, ...grpc.CallOption) error
// ExportTask exports task from p2p network.
ExportTask(context.Context, *dfdaemonv2.ExportTaskRequest, ...grpc.CallOption) error
// DeleteTask deletes task from p2p network.
DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error
// Close tears down the ClientConn and all underlying connections.
Close() error
}
// v2 provides v2 version of the dfdaemon grpc function.
type v2 struct {
dfdaemonv2.DfdaemonClient
*grpc.ClientConn
}
// Trigger client to download file.
func (v *v2) SyncPieces(ctx context.Context, opts ...grpc.CallOption) (dfdaemonv2.Dfdaemon_SyncPiecesClient, error) {
return v.DfdaemonClient.SyncPieces(
ctx,
opts...,
)
}
// DownloadTask downloads task back-to-source.
func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := v.DfdaemonClient.DownloadTask(
ctx,
req,
opts...,
)
return err
}
// StatTask stats task information.
func (v *v2) StatTask(ctx context.Context, req *dfdaemonv2.StatTaskRequest, opts ...grpc.CallOption) (*commonv2.Task, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
return v.DfdaemonClient.StatTask(
ctx,
req,
opts...,
)
}
// ImportTask imports task to p2p network.
func (v *v2) ImportTask(ctx context.Context, req *dfdaemonv2.ImportTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := v.DfdaemonClient.ImportTask(
ctx,
req,
opts...,
)
return err
}
// ExportTask exports task from p2p network.
func (v *v2) ExportTask(ctx context.Context, req *dfdaemonv2.ExportTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := v.DfdaemonClient.ExportTask(
ctx,
req,
opts...,
)
return err
}
// DeleteTask deletes task from p2p network.
func (v *v2) DeleteTask(ctx context.Context, req *dfdaemonv2.DeleteTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := v.DfdaemonClient.DeleteTask(
ctx,
req,
opts...,
)
return err
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"time"
)
const (
// contextTimeout is timeout of grpc invoke.
contextTimeout = 2 * time.Minute
// maxRetries is maximum number of retries.
maxRetries = 3
// backoffWaitBetween is waiting for a fixed period of
// time between calls in backoff linear.
backoffWaitBetween = 500 * time.Millisecond
)

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: client.go // Source: client_v1.go
// Package mocks is a generated GoMock package. // Package mocks is a generated GoMock package.
package mocks package mocks
@ -14,31 +14,31 @@ import (
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
) )
// MockClient is a mock of Client interface. // MockV1 is a mock of V1 interface.
type MockClient struct { type MockV1 struct {
ctrl *gomock.Controller ctrl *gomock.Controller
recorder *MockClientMockRecorder recorder *MockV1MockRecorder
} }
// MockClientMockRecorder is the mock recorder for MockClient. // MockV1MockRecorder is the mock recorder for MockV1.
type MockClientMockRecorder struct { type MockV1MockRecorder struct {
mock *MockClient mock *MockV1
} }
// NewMockClient creates a new mock instance. // NewMockV1 creates a new mock instance.
func NewMockClient(ctrl *gomock.Controller) *MockClient { func NewMockV1(ctrl *gomock.Controller) *MockV1 {
mock := &MockClient{ctrl: ctrl} mock := &MockV1{ctrl: ctrl}
mock.recorder = &MockClientMockRecorder{mock} mock.recorder = &MockV1MockRecorder{mock}
return mock return mock
} }
// EXPECT returns an object that allows the caller to indicate expected use. // EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockClient) EXPECT() *MockClientMockRecorder { func (m *MockV1) EXPECT() *MockV1MockRecorder {
return m.recorder return m.recorder
} }
// CheckHealth mocks base method. // CheckHealth mocks base method.
func (m *MockClient) CheckHealth(arg0 context.Context, arg1 ...grpc.CallOption) error { func (m *MockV1) CheckHealth(arg0 context.Context, arg1 ...grpc.CallOption) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0} varargs := []interface{}{arg0}
for _, a := range arg1 { for _, a := range arg1 {
@ -50,14 +50,14 @@ func (m *MockClient) CheckHealth(arg0 context.Context, arg1 ...grpc.CallOption)
} }
// CheckHealth indicates an expected call of CheckHealth. // CheckHealth indicates an expected call of CheckHealth.
func (mr *MockClientMockRecorder) CheckHealth(arg0 interface{}, arg1 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) CheckHealth(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...) varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockClient)(nil).CheckHealth), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockV1)(nil).CheckHealth), varargs...)
} }
// Close mocks base method. // Close mocks base method.
func (m *MockClient) Close() error { func (m *MockV1) Close() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close") ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
@ -65,13 +65,13 @@ func (m *MockClient) Close() error {
} }
// Close indicates an expected call of Close. // Close indicates an expected call of Close.
func (mr *MockClientMockRecorder) Close() *gomock.Call { func (mr *MockV1MockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockV1)(nil).Close))
} }
// DeleteTask mocks base method. // DeleteTask mocks base method.
func (m *MockClient) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error { func (m *MockV1) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -83,14 +83,14 @@ func (m *MockClient) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskR
} }
// DeleteTask indicates an expected call of DeleteTask. // DeleteTask indicates an expected call of DeleteTask.
func (mr *MockClientMockRecorder) DeleteTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) DeleteTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockClient)(nil).DeleteTask), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV1)(nil).DeleteTask), varargs...)
} }
// Download mocks base method. // Download mocks base method.
func (m *MockClient) Download(arg0 context.Context, arg1 *dfdaemon.DownRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_DownloadClient, error) { func (m *MockV1) Download(arg0 context.Context, arg1 *dfdaemon.DownRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_DownloadClient, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -103,14 +103,14 @@ func (m *MockClient) Download(arg0 context.Context, arg1 *dfdaemon.DownRequest,
} }
// Download indicates an expected call of Download. // Download indicates an expected call of Download.
func (mr *MockClientMockRecorder) Download(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) Download(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockClient)(nil).Download), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockV1)(nil).Download), varargs...)
} }
// ExportTask mocks base method. // ExportTask mocks base method.
func (m *MockClient) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskRequest, arg2 ...grpc.CallOption) error { func (m *MockV1) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -122,14 +122,14 @@ func (m *MockClient) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskR
} }
// ExportTask indicates an expected call of ExportTask. // ExportTask indicates an expected call of ExportTask.
func (mr *MockClientMockRecorder) ExportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) ExportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockClient)(nil).ExportTask), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockV1)(nil).ExportTask), varargs...)
} }
// GetPieceTasks mocks base method. // GetPieceTasks mocks base method.
func (m *MockClient) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (*common.PiecePacket, error) { func (m *MockV1) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (*common.PiecePacket, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -142,14 +142,14 @@ func (m *MockClient) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskR
} }
// GetPieceTasks indicates an expected call of GetPieceTasks. // GetPieceTasks indicates an expected call of GetPieceTasks.
func (mr *MockClientMockRecorder) GetPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) GetPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockClient)(nil).GetPieceTasks), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockV1)(nil).GetPieceTasks), varargs...)
} }
// ImportTask mocks base method. // ImportTask mocks base method.
func (m *MockClient) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskRequest, arg2 ...grpc.CallOption) error { func (m *MockV1) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -161,14 +161,14 @@ func (m *MockClient) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskR
} }
// ImportTask indicates an expected call of ImportTask. // ImportTask indicates an expected call of ImportTask.
func (mr *MockClientMockRecorder) ImportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) ImportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockClient)(nil).ImportTask), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockV1)(nil).ImportTask), varargs...)
} }
// StatTask mocks base method. // StatTask mocks base method.
func (m *MockClient) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) error { func (m *MockV1) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -180,14 +180,14 @@ func (m *MockClient) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskReque
} }
// StatTask indicates an expected call of StatTask. // StatTask indicates an expected call of StatTask.
func (mr *MockClientMockRecorder) StatTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) StatTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockClient)(nil).StatTask), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockV1)(nil).StatTask), varargs...)
} }
// SyncPieceTasks mocks base method. // SyncPieceTasks mocks base method.
func (m *MockClient) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_SyncPieceTasksClient, error) { func (m *MockV1) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (dfdaemon.Daemon_SyncPieceTasksClient, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {
@ -200,8 +200,8 @@ func (m *MockClient) SyncPieceTasks(arg0 context.Context, arg1 *common.PieceTask
} }
// SyncPieceTasks indicates an expected call of SyncPieceTasks. // SyncPieceTasks indicates an expected call of SyncPieceTasks.
func (mr *MockClientMockRecorder) SyncPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { func (mr *MockV1MockRecorder) SyncPieceTasks(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...) varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockClient)(nil).SyncPieceTasks), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockV1)(nil).SyncPieceTasks), varargs...)
} }

View File

@ -0,0 +1,168 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: client_v2.go
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
common "d7y.io/api/pkg/apis/common/v2"
dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v2"
gomock "github.com/golang/mock/gomock"
grpc "google.golang.org/grpc"
)
// MockV2 is a mock of V2 interface.
type MockV2 struct {
ctrl *gomock.Controller
recorder *MockV2MockRecorder
}
// MockV2MockRecorder is the mock recorder for MockV2.
type MockV2MockRecorder struct {
mock *MockV2
}
// NewMockV2 creates a new mock instance.
func NewMockV2(ctrl *gomock.Controller) *MockV2 {
mock := &MockV2{ctrl: ctrl}
mock.recorder = &MockV2MockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockV2) EXPECT() *MockV2MockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockV2) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockV2MockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockV2)(nil).Close))
}
// DeleteTask mocks base method.
func (m *MockV2) DeleteTask(arg0 context.Context, arg1 *dfdaemon.DeleteTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DeleteTask", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteTask indicates an expected call of DeleteTask.
func (mr *MockV2MockRecorder) DeleteTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV2)(nil).DeleteTask), varargs...)
}
// DownloadTask mocks base method.
func (m *MockV2) DownloadTask(arg0 context.Context, arg1 *dfdaemon.DownloadTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DownloadTask", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// DownloadTask indicates an expected call of DownloadTask.
func (mr *MockV2MockRecorder) DownloadTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockV2)(nil).DownloadTask), varargs...)
}
// ExportTask mocks base method.
func (m *MockV2) ExportTask(arg0 context.Context, arg1 *dfdaemon.ExportTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ExportTask", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// ExportTask indicates an expected call of ExportTask.
func (mr *MockV2MockRecorder) ExportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockV2)(nil).ExportTask), varargs...)
}
// ImportTask mocks base method.
func (m *MockV2) ImportTask(arg0 context.Context, arg1 *dfdaemon.ImportTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ImportTask", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// ImportTask indicates an expected call of ImportTask.
func (mr *MockV2MockRecorder) ImportTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockV2)(nil).ImportTask), varargs...)
}
// StatTask mocks base method.
func (m *MockV2) StatTask(arg0 context.Context, arg1 *dfdaemon.StatTaskRequest, arg2 ...grpc.CallOption) (*common.Task, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "StatTask", varargs...)
ret0, _ := ret[0].(*common.Task)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StatTask indicates an expected call of StatTask.
func (mr *MockV2MockRecorder) StatTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockV2)(nil).StatTask), varargs...)
}
// SyncPieces mocks base method.
func (m *MockV2) SyncPieces(arg0 context.Context, arg1 ...grpc.CallOption) (dfdaemon.Dfdaemon_SyncPiecesClient, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0}
for _, a := range arg1 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "SyncPieces", varargs...)
ret0, _ := ret[0].(dfdaemon.Dfdaemon_SyncPiecesClient)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SyncPieces indicates an expected call of SyncPieces.
func (mr *MockV2MockRecorder) SyncPieces(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockV2)(nil).SyncPieces), varargs...)
}

View File

@ -88,7 +88,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
// V2 is the interface for v1 version of the grpc client. // V2 is the interface for v1 version of the grpc client.
type V2 interface { type V2 interface {
// AnnouncePeer announces peer to scheduler. // AnnouncePeer announces peer to scheduler.
AnnouncePeer(context.Context, *schedulerv2.AnnouncePeerRequest, ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) AnnouncePeer(context.Context, string, ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error)
// Checks information of peer. // Checks information of peer.
StatPeer(context.Context, *schedulerv2.StatPeerRequest, ...grpc.CallOption) (*commonv2.Peer, error) StatPeer(context.Context, *schedulerv2.StatPeerRequest, ...grpc.CallOption) (*commonv2.Peer, error)
@ -123,9 +123,9 @@ type v2 struct {
} }
// AnnouncePeer announces peer to scheduler. // AnnouncePeer announces peer to scheduler.
func (v *v2) AnnouncePeer(ctx context.Context, req *schedulerv2.AnnouncePeerRequest, opts ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) { func (v *v2) AnnouncePeer(ctx context.Context, taskID string, opts ...grpc.CallOption) (schedulerv2.Scheduler_AnnouncePeerClient, error) {
return v.SchedulerClient.AnnouncePeer( return v.SchedulerClient.AnnouncePeer(
context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), context.WithValue(ctx, pkgbalancer.ContextKey, taskID),
opts..., opts...,
) )
} }

View File

@ -57,7 +57,7 @@ func (mr *MockV2MockRecorder) AnnounceHost(arg0, arg1 interface{}, arg2 ...inter
} }
// AnnouncePeer mocks base method. // AnnouncePeer mocks base method.
func (m *MockV2) AnnouncePeer(arg0 context.Context, arg1 *scheduler.AnnouncePeerRequest, arg2 ...grpc.CallOption) (scheduler.Scheduler_AnnouncePeerClient, error) { func (m *MockV2) AnnouncePeer(arg0 context.Context, arg1 string, arg2 ...grpc.CallOption) (scheduler.Scheduler_AnnouncePeerClient, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1} varargs := []interface{}{arg0, arg1}
for _, a := range arg2 { for _, a := range arg2 {