diff --git a/client.go b/client.go index 5ef73652..a738a91b 100644 --- a/client.go +++ b/client.go @@ -10,6 +10,7 @@ import ( "os/user" "sort" "strings" + "time" "github.com/colinmarc/hdfs/v2/hadoopconf" hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" @@ -67,6 +68,10 @@ type ClientOptions struct { // NamenodeDialFunc is used to connect to the namenodes. If nil, then // (&net.Dialer{}).DialContext is used. NamenodeDialFunc func(ctx context.Context, network, addr string) (net.Conn, error) + // NamenodeReadTimeout determines the deadline when reading from datanode connection + NamenodeReadTimeout time.Duration + // NamenodeWriteTimeout determines the deadline when writing to datanode connection + NamenodeWriteTimeout time.Duration // DatanodeDialFunc is used to connect to the datanodes. If nil, then // (&net.Dialer{}).DialContext is used. DatanodeDialFunc func(ctx context.Context, network, addr string) (net.Conn, error) @@ -185,6 +190,8 @@ func NewClient(options ClientOptions) (*Client, error) { Addresses: options.Addresses, User: options.User, DialFunc: options.NamenodeDialFunc, + ReadTimeout: options.NamenodeReadTimeout, + WriteTimeout: options.NamenodeWriteTimeout, KerberosClient: options.KerberosClient, KerberosServicePrincipleName: options.KerberosServicePrincipleName, }, diff --git a/internal/rpc/namenode.go b/internal/rpc/namenode.go index 708d92d4..feeb73be 100644 --- a/internal/rpc/namenode.go +++ b/internal/rpc/namenode.go @@ -42,11 +42,13 @@ type NamenodeConnection struct { kerberosServicePrincipleName string kerberosRealm string - dialFunc func(ctx context.Context, network, addr string) (net.Conn, error) - conn net.Conn - host *namenodeHost - hostList []*namenodeHost - transport transport + dialFunc func(ctx context.Context, network, addr string) (net.Conn, error) + readTimeout time.Duration + writeTimeout time.Duration + conn net.Conn + host *namenodeHost + hostList []*namenodeHost + transport transport reqLock sync.Mutex done chan struct{} @@ -64,6 +66,10 @@ type NamenodeConnectionOptions struct { // DialFunc is used to connect to the datanodes. If nil, then // (&net.Dialer{}).DialContext is used. DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) + // ReadTimeout determines the deadline when reading from datanode connection + ReadTimeout time.Duration + // WriteTimeout determines the deadline when writing to datanode connection + WriteTimeout time.Duration // KerberosClient is used to connect to kerberized HDFS clusters. If provided, // the NamenodeConnection will always mutually athenticate when connecting // to the namenode(s). @@ -116,9 +122,11 @@ func NewNamenodeConnection(options NamenodeConnectionOptions) (*NamenodeConnecti kerberosServicePrincipleName: options.KerberosServicePrincipleName, kerberosRealm: realm, - dialFunc: options.DialFunc, - hostList: hostList, - transport: &basicTransport{clientID: clientId}, + dialFunc: options.DialFunc, + readTimeout: options.ReadTimeout, + writeTimeout: options.WriteTimeout, + hostList: hostList, + transport: &basicTransport{clientID: clientId}, done: make(chan struct{}), } @@ -136,6 +144,9 @@ func NewNamenodeConnection(options NamenodeConnectionOptions) (*NamenodeConnecti func (c *NamenodeConnection) resolveConnection() error { if c.conn != nil { + if err := c.setTimeout(); err != nil { + return nil + } return nil } @@ -155,6 +166,10 @@ func (c *NamenodeConnection) resolveConnection() error { c.host = host c.conn, err = c.dialFunc(context.Background(), "tcp", host.address) + if err := c.setTimeout(); err != nil { + c.markFailure(err) + continue + } if err != nil { c.markFailure(err) continue @@ -185,6 +200,20 @@ func (c *NamenodeConnection) markFailure(err error) { c.host.lastErrorAt = time.Now() } +func (c *NamenodeConnection) setTimeout() error { + if c.readTimeout > 0 { + if err := c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)); err != nil { + return err + } + } + if c.writeTimeout > 0 { + if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)); err != nil { + return err + } + } + return nil +} + // Execute performs an rpc call. It does this by sending req over the wire and // unmarshaling the result into resp. func (c *NamenodeConnection) Execute(method string, req proto.Message, resp proto.Message) error {