From f631263d2573f9a865c6ac9927d671f6b34ad4bf Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 18 Aug 2022 20:12:36 +0800 Subject: [PATCH] refactor: dfnet package (#1578) Signed-off-by: Gaius --- .../daemon/peer/peertask_piecetask_poller.go | 5 +- .../peer/peertask_piecetask_synchronizer.go | 5 +- client/daemon/rpcserver/rpcserver_test.go | 5 +- cmd/dfcache/cmd/root.go | 3 +- cmd/dfget/cmd/daemon.go | 3 +- cmd/dfget/cmd/root.go | 3 +- pkg/dfnet/dfnet.go | 62 +++++++++---------- pkg/rpc/vsock.go | 2 +- 8 files changed, 46 insertions(+), 42 deletions(-) diff --git a/client/daemon/peer/peertask_piecetask_poller.go b/client/daemon/peer/peertask_piecetask_poller.go index 3e69064f3..430325e86 100644 --- a/client/daemon/peer/peertask_piecetask_poller.go +++ b/client/daemon/peer/peertask_piecetask_poller.go @@ -167,10 +167,11 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer( ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second) defer cancel() - client, err := dfdaemonclient.GetClient(dfnet.NetAddr{ + netAddr := &dfnet.NetAddr{ Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", peer.Ip, peer.RpcPort), - }.GetEndpoint()) + } + client, err := dfdaemonclient.GetClient(netAddr.String()) if err != nil { ptc.Errorf("get dfdaemon client error: %s", err) span.RecordError(err) diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 2c9f48ba0..d9d155cbc 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -162,10 +162,11 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( delete(s.workers, dstPeer.PeerId) } - client, err := dfdaemonclient.GetClient(dfnet.NetAddr{ + netAddr := &dfnet.NetAddr{ Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", dstPeer.Ip, dstPeer.RpcPort), - }.GetEndpoint()) + } + client, err := dfdaemonclient.GetClient(netAddr.String()) if err != nil { s.peerTaskConductor.Errorf("get dfdaemon client error: %s, dest peer: %s", err, dstPeer.PeerId) return err diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index d701d1d2a..98bcb9192 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -562,10 +562,11 @@ func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.A } }() - client, err := dfdaemonclient.GetClient(dfnet.NetAddr{ + netAddr := &dfnet.NetAddr{ Type: dfnet.TCP, Addr: fmt.Sprintf(":%d", port), - }.GetEndpoint()) + } + client, err := dfdaemonclient.GetClient(netAddr.String()) assert.Nil(err, "grpc dial should be ok") return client } diff --git a/cmd/dfcache/cmd/root.go b/cmd/dfcache/cmd/root.go index b3ca784b8..e89c9d8a5 100644 --- a/cmd/dfcache/cmd/root.go +++ b/cmd/dfcache/cmd/root.go @@ -172,7 +172,8 @@ func runDfcacheSubcmd(cmdName string, args []string) error { // checkDaemon checks if daemon is running func checkDaemon(daemonSockPath string) (client.Client, error) { - dfdaemonClient, err := client.GetClient(dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}.GetEndpoint()) + netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} + dfdaemonClient, err := client.GetClient(netAddr.String()) if err != nil { return nil, err } diff --git a/cmd/dfget/cmd/daemon.go b/cmd/dfget/cmd/daemon.go index f67616cad..70b78e496 100644 --- a/cmd/dfget/cmd/daemon.go +++ b/cmd/dfget/cmd/daemon.go @@ -129,7 +129,8 @@ func initDaemonDfpath(cfg *config.DaemonOption) (dfpath.Dfpath, error) { func runDaemon(d dfpath.Dfpath) error { logger.Infof("Version:\n%s", version.Version()) - daemonClient, err := client.GetClient(dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()}.GetEndpoint()) + netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()} + daemonClient, err := client.GetClient(netAddr.String()) if err != nil { return err } diff --git a/cmd/dfget/cmd/root.go b/cmd/dfget/cmd/root.go index 84f319b89..9257bdc2f 100644 --- a/cmd/dfget/cmd/root.go +++ b/cmd/dfget/cmd/root.go @@ -244,7 +244,8 @@ func runDfget(dfgetLockPath, daemonSockPath string) error { // checkAndSpawnDaemon do checking at three checkpoints func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.Client, error) { - dfdaemonClient, err := client.GetClient(dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}.GetEndpoint()) + netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} + dfdaemonClient, err := client.GetClient(netAddr.String()) if err != nil { return nil, err } diff --git a/pkg/dfnet/dfnet.go b/pkg/dfnet/dfnet.go index 7a72b0b6d..5506d0ba6 100644 --- a/pkg/dfnet/dfnet.go +++ b/pkg/dfnet/dfnet.go @@ -19,6 +19,7 @@ package dfnet import ( "encoding/json" "errors" + "fmt" "gopkg.in/yaml.v3" ) @@ -26,36 +27,39 @@ import ( type NetworkType string const ( - TCP NetworkType = "tcp" - UNIX NetworkType = "unix" - VSOCK NetworkType = "vsock" + // TCP represents protocol of tcp. + TCP NetworkType = "tcp" - TCPEndpointPrefix string = "dns:///" - UnixEndpointPrefix string = "unix://" - VsockEndpointPrefix string = "vsock://" + // TCP represents protocol of unix. + UNIX NetworkType = "unix" + + // TCP represents protocol of vsock. + VSOCK NetworkType = "vsock" ) +// NetAddr is the definition structure of grpc address, +// refer to https://github.com/grpc/grpc/blob/master/doc/naming.md. type NetAddr struct { + // Type is the type of network. Type NetworkType `mapstructure:"type" yaml:"type"` - // see https://github.com/grpc/grpc/blob/master/doc/naming.md + + // Addr is the address of network. Addr string `mapstructure:"addr" yaml:"addr"` } -func (n NetAddr) GetEndpoint() string { +// String returns the endpoint of network address. +func (n *NetAddr) String() string { switch n.Type { case UNIX: - return UnixEndpointPrefix + n.Addr + return fmt.Sprintf("unix://%s", n.Addr) case VSOCK: - return VsockEndpointPrefix + n.Addr + return fmt.Sprintf("vsock://%s", n.Addr) default: - return TCPEndpointPrefix + n.Addr + return fmt.Sprintf("dns:///%s", n.Addr) } } -func (n NetAddr) String() string { - return n.GetEndpoint() -} - +// UnmarshalJSON parses the JSON-encoded data and stores the result in NetAddr. func (n *NetAddr) UnmarshalJSON(b []byte) error { var v any if err := json.Unmarshal(b, &v); err != nil { @@ -77,6 +81,7 @@ func (n *NetAddr) UnmarshalJSON(b []byte) error { } } +// UnmarshalYAML parses the YAML-encoded data and stores the result in NetAddr. func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error { switch node.Kind { case yaml.ScalarNode: @@ -84,6 +89,7 @@ func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error { if err := node.Decode(&addr); err != nil { return err } + n.Type = TCP n.Addr = addr return nil @@ -97,9 +103,11 @@ func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error { if err := node.Content[i].Decode(&key); err != nil { return err } + if err := node.Content[i+1].Decode(&value); err != nil { return err } + m[key] = value } @@ -111,35 +119,25 @@ func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error { if err := n.unmarshal(yaml.Unmarshal, b); err != nil { return err } + return nil default: return errors.New("invalid net addr") } } +// unmarshal parses the encoded data and stores the result. func (n *NetAddr) unmarshal(unmarshal func(in []byte, out any) (err error), b []byte) error { - nt := struct { + netAddr := struct { Type NetworkType `json:"type" yaml:"type"` - Addr string `json:"addr" yaml:"addr"` // see https://github.com/grpc/grpc/blob/master/doc/naming.md + Addr string `json:"addr" yaml:"addr"` }{} - if err := unmarshal(b, &nt); err != nil { + if err := unmarshal(b, &netAddr); err != nil { return err } - n.Type = nt.Type - n.Addr = nt.Addr - + n.Type = netAddr.Type + n.Addr = netAddr.Addr return nil } - -func Convert2NetAddr(addrs []string) []NetAddr { - netAddrs := make([]NetAddr, 0, len(addrs)) - for i := range addrs { - netAddrs = append(netAddrs, NetAddr{ - Type: TCP, - Addr: addrs[i], - }) - } - return netAddrs -} diff --git a/pkg/rpc/vsock.go b/pkg/rpc/vsock.go index 61beb6d26..f7395855c 100644 --- a/pkg/rpc/vsock.go +++ b/pkg/rpc/vsock.go @@ -55,5 +55,5 @@ func VsockDialer(ctx context.Context, address string) (net.Conn, error) { // IsVsock returns whether the address is vsock. func IsVsock(target string) bool { - return strings.HasPrefix(target, dfnet.VsockEndpointPrefix) + return strings.HasPrefix(target, string(dfnet.VSOCK)) }