refactor: dfnet package (#1578)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-08-18 20:12:36 +08:00
parent a6a44269b4
commit f631263d25
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
8 changed files with 46 additions and 42 deletions

View File

@ -167,10 +167,11 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer(
ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second) ctx, cancel := context.WithTimeout(ptc.ctx, 4*time.Second)
defer cancel() defer cancel()
client, err := dfdaemonclient.GetClient(dfnet.NetAddr{ netAddr := &dfnet.NetAddr{
Type: dfnet.TCP, Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", peer.Ip, peer.RpcPort), Addr: fmt.Sprintf("%s:%d", peer.Ip, peer.RpcPort),
}.GetEndpoint()) }
client, err := dfdaemonclient.GetClient(netAddr.String())
if err != nil { if err != nil {
ptc.Errorf("get dfdaemon client error: %s", err) ptc.Errorf("get dfdaemon client error: %s", err)
span.RecordError(err) span.RecordError(err)

View File

@ -162,10 +162,11 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
delete(s.workers, dstPeer.PeerId) delete(s.workers, dstPeer.PeerId)
} }
client, err := dfdaemonclient.GetClient(dfnet.NetAddr{ netAddr := &dfnet.NetAddr{
Type: dfnet.TCP, Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", dstPeer.Ip, dstPeer.RpcPort), Addr: fmt.Sprintf("%s:%d", dstPeer.Ip, dstPeer.RpcPort),
}.GetEndpoint()) }
client, err := dfdaemonclient.GetClient(netAddr.String())
if err != nil { if err != nil {
s.peerTaskConductor.Errorf("get dfdaemon client error: %s, dest peer: %s", err, dstPeer.PeerId) s.peerTaskConductor.Errorf("get dfdaemon client error: %s, dest peer: %s", err, dstPeer.PeerId)
return err return err

View File

@ -562,10 +562,11 @@ func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.A
} }
}() }()
client, err := dfdaemonclient.GetClient(dfnet.NetAddr{ netAddr := &dfnet.NetAddr{
Type: dfnet.TCP, Type: dfnet.TCP,
Addr: fmt.Sprintf(":%d", port), Addr: fmt.Sprintf(":%d", port),
}.GetEndpoint()) }
client, err := dfdaemonclient.GetClient(netAddr.String())
assert.Nil(err, "grpc dial should be ok") assert.Nil(err, "grpc dial should be ok")
return client return client
} }

View File

@ -172,7 +172,8 @@ func runDfcacheSubcmd(cmdName string, args []string) error {
// checkDaemon checks if daemon is running // checkDaemon checks if daemon is running
func checkDaemon(daemonSockPath string) (client.Client, error) { func checkDaemon(daemonSockPath string) (client.Client, error) {
dfdaemonClient, err := client.GetClient(dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}.GetEndpoint()) netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}
dfdaemonClient, err := client.GetClient(netAddr.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -129,7 +129,8 @@ func initDaemonDfpath(cfg *config.DaemonOption) (dfpath.Dfpath, error) {
func runDaemon(d dfpath.Dfpath) error { func runDaemon(d dfpath.Dfpath) error {
logger.Infof("Version:\n%s", version.Version()) logger.Infof("Version:\n%s", version.Version())
daemonClient, err := client.GetClient(dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()}.GetEndpoint()) netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()}
daemonClient, err := client.GetClient(netAddr.String())
if err != nil { if err != nil {
return err return err
} }

View File

@ -244,7 +244,8 @@ func runDfget(dfgetLockPath, daemonSockPath string) error {
// checkAndSpawnDaemon do checking at three checkpoints // checkAndSpawnDaemon do checking at three checkpoints
func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.Client, error) { func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.Client, error) {
dfdaemonClient, err := client.GetClient(dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}.GetEndpoint()) netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath}
dfdaemonClient, err := client.GetClient(netAddr.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -19,6 +19,7 @@ package dfnet
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
@ -26,36 +27,39 @@ import (
type NetworkType string type NetworkType string
const ( const (
// TCP represents protocol of tcp.
TCP NetworkType = "tcp" TCP NetworkType = "tcp"
UNIX NetworkType = "unix"
VSOCK NetworkType = "vsock"
TCPEndpointPrefix string = "dns:///" // TCP represents protocol of unix.
UnixEndpointPrefix string = "unix://" UNIX NetworkType = "unix"
VsockEndpointPrefix string = "vsock://"
// TCP represents protocol of vsock.
VSOCK NetworkType = "vsock"
) )
// NetAddr is the definition structure of grpc address,
// refer to https://github.com/grpc/grpc/blob/master/doc/naming.md.
type NetAddr struct { type NetAddr struct {
// Type is the type of network.
Type NetworkType `mapstructure:"type" yaml:"type"` Type NetworkType `mapstructure:"type" yaml:"type"`
// see https://github.com/grpc/grpc/blob/master/doc/naming.md
// Addr is the address of network.
Addr string `mapstructure:"addr" yaml:"addr"` Addr string `mapstructure:"addr" yaml:"addr"`
} }
func (n NetAddr) GetEndpoint() string { // String returns the endpoint of network address.
func (n *NetAddr) String() string {
switch n.Type { switch n.Type {
case UNIX: case UNIX:
return UnixEndpointPrefix + n.Addr return fmt.Sprintf("unix://%s", n.Addr)
case VSOCK: case VSOCK:
return VsockEndpointPrefix + n.Addr return fmt.Sprintf("vsock://%s", n.Addr)
default: default:
return TCPEndpointPrefix + n.Addr return fmt.Sprintf("dns:///%s", n.Addr)
} }
} }
func (n NetAddr) String() string { // UnmarshalJSON parses the JSON-encoded data and stores the result in NetAddr.
return n.GetEndpoint()
}
func (n *NetAddr) UnmarshalJSON(b []byte) error { func (n *NetAddr) UnmarshalJSON(b []byte) error {
var v any var v any
if err := json.Unmarshal(b, &v); err != nil { if err := json.Unmarshal(b, &v); err != nil {
@ -77,6 +81,7 @@ func (n *NetAddr) UnmarshalJSON(b []byte) error {
} }
} }
// UnmarshalYAML parses the YAML-encoded data and stores the result in NetAddr.
func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error { func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error {
switch node.Kind { switch node.Kind {
case yaml.ScalarNode: case yaml.ScalarNode:
@ -84,6 +89,7 @@ func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error {
if err := node.Decode(&addr); err != nil { if err := node.Decode(&addr); err != nil {
return err return err
} }
n.Type = TCP n.Type = TCP
n.Addr = addr n.Addr = addr
return nil return nil
@ -97,9 +103,11 @@ func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error {
if err := node.Content[i].Decode(&key); err != nil { if err := node.Content[i].Decode(&key); err != nil {
return err return err
} }
if err := node.Content[i+1].Decode(&value); err != nil { if err := node.Content[i+1].Decode(&value); err != nil {
return err return err
} }
m[key] = value m[key] = value
} }
@ -111,35 +119,25 @@ func (n *NetAddr) UnmarshalYAML(node *yaml.Node) error {
if err := n.unmarshal(yaml.Unmarshal, b); err != nil { if err := n.unmarshal(yaml.Unmarshal, b); err != nil {
return err return err
} }
return nil return nil
default: default:
return errors.New("invalid net addr") return errors.New("invalid net addr")
} }
} }
// unmarshal parses the encoded data and stores the result.
func (n *NetAddr) unmarshal(unmarshal func(in []byte, out any) (err error), b []byte) error { func (n *NetAddr) unmarshal(unmarshal func(in []byte, out any) (err error), b []byte) error {
nt := struct { netAddr := struct {
Type NetworkType `json:"type" yaml:"type"` Type NetworkType `json:"type" yaml:"type"`
Addr string `json:"addr" yaml:"addr"` // see https://github.com/grpc/grpc/blob/master/doc/naming.md Addr string `json:"addr" yaml:"addr"`
}{} }{}
if err := unmarshal(b, &nt); err != nil { if err := unmarshal(b, &netAddr); err != nil {
return err return err
} }
n.Type = nt.Type n.Type = netAddr.Type
n.Addr = nt.Addr n.Addr = netAddr.Addr
return nil return nil
} }
func Convert2NetAddr(addrs []string) []NetAddr {
netAddrs := make([]NetAddr, 0, len(addrs))
for i := range addrs {
netAddrs = append(netAddrs, NetAddr{
Type: TCP,
Addr: addrs[i],
})
}
return netAddrs
}

View File

@ -55,5 +55,5 @@ func VsockDialer(ctx context.Context, address string) (net.Conn, error) {
// IsVsock returns whether the address is vsock. // IsVsock returns whether the address is vsock.
func IsVsock(target string) bool { func IsVsock(target string) bool {
return strings.HasPrefix(target, dfnet.VsockEndpointPrefix) return strings.HasPrefix(target, string(dfnet.VSOCK))
} }