summaryrefslogtreecommitdiff
path: root/lib/IO/Async/Stream.pm
diff options
context:
space:
mode:
authorgregor herrmann <gregoa@debian.org>2014-10-20 20:23:54 +0200
committergregor herrmann <gregoa@debian.org>2014-10-20 20:23:54 +0200
commit64c71ff58dc2df52647d2afa92266c1f9beac7e3 (patch)
tree717047bde92d95250f4b38505d10f6f495e0ff5e /lib/IO/Async/Stream.pm
parent623ccb4a00986087bc950975f0ecf3821464f920 (diff)
Imported Upstream version 0.64
Diffstat (limited to 'lib/IO/Async/Stream.pm')
-rw-r--r--lib/IO/Async/Stream.pm178
1 files changed, 100 insertions, 78 deletions
diff --git a/lib/IO/Async/Stream.pm b/lib/IO/Async/Stream.pm
index 4762845..f0c0e55 100644
--- a/lib/IO/Async/Stream.pm
+++ b/lib/IO/Async/Stream.pm
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2006-2013 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2006-2014 -- leonerd@leonerd.org.uk
package IO::Async::Stream;
@@ -9,7 +9,7 @@ use strict;
use warnings;
use 5.010; # //
-our $VERSION = '0.63';
+our $VERSION = '0.64';
use base qw( IO::Async::Handle );
@@ -25,15 +25,14 @@ use Scalar::Util qw( blessed );
our $READLEN = 8192;
our $WRITELEN = 8192;
-# Indicies in writequeue elements
-use constant WQ_DATA => 0;
-use constant WQ_WRITELEN => 1;
-use constant WQ_ON_WRITE => 2;
-use constant WQ_ON_FLUSH => 3;
-use constant WQ_WATCHING => 4;
-# Indicies into readqueue elements
-use constant RQ_ONREAD => 0;
-use constant RQ_FUTURE => 1;
+use Struct::Dumb;
+
+# Element of the writequeue
+struct Writer => [qw( data writelen on_write on_flush on_error watching )];
+
+# Element of the readqueue
+struct Reader => [qw( on_read future )];
+
# Bitfields in the want flags
use constant WANT_READ_FOR_READ => 0x01;
use constant WANT_READ_FOR_WRITE => 0x02;
@@ -84,9 +83,11 @@ a byte-stream. It provides buffering for both incoming and outgoing data. It
invokes the C<on_read> handler when new data is read from the filehandle. Data
may be written to the filehandle by calling the C<write> method.
-For implementing real network protocols that are based on messages sent over a
-byte-stream (such as a TCP socket), it may be more appropriate to use a
-subclass of L<IO::Async::Protocol::Stream>.
+This class is suitable for any kind of filehandle that provides a
+possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
+C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
+datagram or raw message-based sockets (such as UDP) see instead
+L<IO::Async::Socket>.
=cut
@@ -199,8 +200,8 @@ sub _init
{
my $self = shift;
- $self->{writequeue} = []; # Queue of ARRAYs of [ $data, $on_write, $on_flush ]
- $self->{readqueue} = []; # Queue of ARRAYs of [ CODE, $readfuture ]
+ $self->{writequeue} = []; # Queue of Writers
+ $self->{readqueue} = []; # Queue of Readers
$self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
$self->{readbuff} = "";
@@ -219,35 +220,33 @@ sub _init
The following named parameters may be passed to C<new> or C<configure>:
-=over 8
-
-=item read_handle => IO
+=head2 read_handle => IO
The IO handle to read from. Must implement C<fileno> and C<sysread> methods.
-=item write_handle => IO
+=head2 write_handle => IO
The IO handle to write to. Must implement C<fileno> and C<syswrite> methods.
-=item handle => IO
+=head2 handle => IO
Shortcut to specifying the same IO handle for both of the above.
-=item on_read => CODE
+=head2 on_read => CODE
-=item on_read_error => CODE
+=head2 on_read_error => CODE
-=item on_outgoing_empty => CODE
+=head2 on_outgoing_empty => CODE
-=item on_write_error => CODE
+=head2 on_write_error => CODE
-=item on_writeable_start => CODE
+=head2 on_writeable_start => CODE
-=item on_writeable_stop => CODE
+=head2 on_writeable_stop => CODE
CODE references for event handlers.
-=item autoflush => BOOL
+=head2 autoflush => BOOL
Optional. If true, the C<write> method will attempt to write data to the
operating system immediately, without waiting for the loop to indicate the
@@ -257,11 +256,11 @@ contain up-to-date logging or console information.
It currently defaults to false for any file handle, but future versions of
C<IO::Async> may enable this by default on STDOUT and STDERR.
-=item read_len => INT
+=head2 read_len => INT
Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes.
-=item read_all => BOOL
+=head2 read_all => BOOL
Optional. If true, attempt to read as much data from the kernel as possible
when the handle becomes readable. By default this is turned off, meaning at
@@ -275,19 +274,19 @@ filehandles of processing time. Turning this option on may improve bulk data
transfer rate, at the risk of delaying or stalling processing on other
filehandles.
-=item write_len => INT
+=head2 write_len => INT
Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes.
-=item write_all => BOOL
+=head2 write_all => BOOL
Optional. Analogous to the C<read_all> option, but for writing. When
C<autoflush> is enabled, this option only affects deferred writing if the
initial attempt failed due to buffer space.
-=item read_high_watermark => INT
+=head2 read_high_watermark => INT
-=item read_low_watermark => INT
+=head2 read_low_watermark => INT
Optional. If defined, gives a way to implement flow control or other
behaviours that depend on the size of Stream's read buffer.
@@ -313,9 +312,9 @@ If these options are used with the default event handlers, be careful not to
cause deadlocks by having a high watermark sufficiently low that a single
C<on_read> invocation might not consider it finished yet.
-=item reader => STRING|CODE
+=head2 reader => STRING|CODE
-=item writer => STRING|CODE
+=head2 writer => STRING|CODE
Optional. If defined, gives the name of a method or a CODE reference to use
to implement the actual reading from or writing to the filehandle. These will
@@ -330,14 +329,14 @@ value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not
provided, they will be substituted by implenentations using C<sysread> and
C<syswrite> on the underlying handle, respectively.
-=item close_on_read_eof => BOOL
+=head2 close_on_read_eof => BOOL
Optional. Usually true, but if set to a false value then the stream will not
be C<close>d when an EOF condition occurs on read. This is normally not useful
as at that point the underlying stream filehandle is no longer useable, but it
may be useful for reading regular files, or interacting with TTY devices.
-=item encoding => STRING
+=head2 encoding => STRING
If supplied, sets the name of encoding of the underlying stream. If an
encoding is set, then the C<write> method will expect to receive Unicode
@@ -358,8 +357,6 @@ believe it to be reasonably stable.
This note applies only to the C<on_read> event; data written using the
C<write> method does not rely on any undocumented features of C<Encode>.
-=back
-
If a read handle is given, it is required that either an C<on_read> callback
reference is configured, or that the object provides an C<on_read> method. It
is optional whether either is true for C<on_outgoing_empty>; if neither is
@@ -437,6 +434,9 @@ sub _add_to_loop
=head1 METHODS
+The following methods documented with a trailing call to C<< ->get >> return
+L<Future> instances.
+
=cut
=head2 $stream->want_readready_for_read( $set )
@@ -575,6 +575,10 @@ sub close_now
{
my $self = shift;
+ foreach ( @{ $self->{writequeue} } ) {
+ $_->on_error->( "stream closing" ) if $_->on_error;
+ }
+
undef @{ $self->{writequeue} };
undef $self->{stream_closing};
@@ -666,6 +670,13 @@ yet empty; if more data has been queued since the call.
$on_flush->( $stream )
+=item on_error => CODE
+
+A CODE reference which will be invoked if a C<syswrite> error happens while
+performing this write. Invoked as for the C<Stream>'s C<on_write_error> event.
+
+ $on_error->( $stream, $errno )
+
=back
If the object is not yet a member of a loop and doesn't yet have a
@@ -677,7 +688,7 @@ C<on_flush> continuation will be invoked, if supplied. This can be used to
obtain a marker, to invoke some code once the output queue has been flushed up
to this point.
-=head2 $stream->write( ... ) ==> ()
+=head2 $stream->write( ... )->get
If called in non-void context, this method returns a L<Future> which will
complete (with no value) when the write operation has been flushed. This may
@@ -704,57 +715,58 @@ sub _flush_one_write
my $writequeue = $self->{writequeue};
my $head;
- while( $head = $writequeue->[0] and ref $head->[WQ_DATA] ) {
- if( ref $head->[WQ_DATA] eq "CODE" ) {
- my $data = $head->[WQ_DATA]->( $self );
+ while( $head = $writequeue->[0] and ref $head->data ) {
+ if( ref $head->data eq "CODE" ) {
+ my $data = $head->data->( $self );
if( !defined $data ) {
- $head->[WQ_ON_FLUSH]->( $self ) if $head->[WQ_ON_FLUSH];
+ $head->on_flush->( $self ) if $head->on_flush;
shift @$writequeue;
return 1;
}
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
- unshift @$writequeue, my $new = [ $data ];
- $new->[$_] = $head->[$_] for WQ_WRITELEN, WQ_ON_WRITE; # not ON_FLUSH
+ unshift @$writequeue, my $new = Writer(
+ $data, $head->writelen, $head->on_write, undef, undef, 0
+ );
next;
}
- elsif( blessed $head->[WQ_DATA] and $head->[WQ_DATA]->isa( "Future" ) ) {
- my $f = $head->[WQ_DATA];
+ elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
+ my $f = $head->data;
if( !$f->is_ready ) {
- return 0 if $head->[WQ_WATCHING];
+ return 0 if $head->watching;
$f->on_ready( sub { $self->_flush_one_write } );
- $head->[WQ_WATCHING]++;
+ $head->watching++;
return 0;
}
my $data = $f->get;
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
- $head->[WQ_DATA] = $data;
+ $head->data = $data;
next;
}
else {
- die "Unsure what to do with reference ".ref($head->[WQ_DATA])." in write queue";
+ die "Unsure what to do with reference ".ref($head->data)." in write queue";
}
}
my $second;
while( $second = $writequeue->[1] and
- !ref $second->[WQ_DATA] and
- $head->[WQ_WRITELEN] == $second->[WQ_WRITELEN] and
- !$head->[WQ_ON_WRITE] and !$second->[WQ_ON_WRITE] and
- !$head->[WQ_ON_FLUSH] ) {
- $head->[WQ_DATA] .= $second->[WQ_DATA];
- $head->[WQ_ON_WRITE] = $second->[WQ_ON_WRITE];
- $head->[WQ_ON_FLUSH] = $second->[WQ_ON_FLUSH];
+ !ref $second->data and
+ $head->writelen == $second->writelen and
+ !$head->on_write and !$second->on_write and
+ !$head->on_flush ) {
+ $head->data .= $second->data;
+ $head->on_write = $second->on_write;
+ $head->on_flush = $second->on_flush;
splice @$writequeue, 1, 1, ();
}
- die "TODO: head data does not contain a plain string" if ref $head->[WQ_DATA];
+ die "TODO: head data does not contain a plain string" if ref $head->data;
my $writer = $self->{writer};
- my $len = $self->$writer( $self->write_handle, $head->[WQ_DATA], $head->[WQ_WRITELEN] );
+ my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
if( !defined $len ) {
my $errno = $!;
@@ -771,18 +783,19 @@ sub _flush_one_write
$self->maybe_invoke_event( on_write_eof => );
}
+ $head->on_error->( $self, $errno ) if $head->on_error;
$self->maybe_invoke_event( on_write_error => $errno )
or $self->close_now;
return 0;
}
- if( my $on_write = $head->[WQ_ON_WRITE] ) {
+ if( my $on_write = $head->on_write ) {
$on_write->( $self, $len );
}
- if( !length $head->[WQ_DATA] ) {
- $head->[WQ_ON_FLUSH]->( $self ) if $head->[WQ_ON_FLUSH];
+ if( !length $head->data ) {
+ $head->on_flush->( $self ) if $head->on_flush;
shift @{ $self->{writequeue} };
}
@@ -808,18 +821,30 @@ sub write
my $on_write = delete $params{on_write};
my $on_flush = delete $params{on_flush};
+ my $on_error = delete $params{on_error};
my $f;
if( defined wantarray ) {
my $orig_on_flush = $on_flush;
+ my $orig_on_error = $on_error;
$f = $self->loop->new_future;
$on_flush = sub {
$f->done;
$orig_on_flush->( @_ ) if $orig_on_flush;
};
+ $on_error = sub {
+ my $self = shift;
+ my ( $errno ) = @_;
+
+ $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
+
+ $orig_on_error->( $self, @_ ) if $orig_on_error;
+ };
}
- push @{ $self->{writequeue} }, [ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush ];
+ push @{ $self->{writequeue} }, Writer(
+ $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush, $on_error, 0
+ );
keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
@@ -875,7 +900,7 @@ sub _flush_one_read
my $readqueue = $self->{readqueue};
my $ret;
- if( $readqueue->[0] and my $on_read = $readqueue->[0][RQ_ONREAD] ) {
+ if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
$ret = $on_read->( $self, \$self->{readbuff}, $eof );
}
else {
@@ -890,7 +915,7 @@ sub _flush_one_read
if( ref $ret eq "CODE" ) {
# Replace the top CODE, or add it if there was none
- $readqueue->[0] = [ $ret ];
+ $readqueue->[0] = Reader( $ret, undef );
return 1;
}
elsif( @$readqueue and !defined $ret ) {
@@ -937,7 +962,7 @@ sub _do_read
or $self->close_now;
foreach ( @{ $self->{readqueue} } ) {
- $_->[RQ_FUTURE]->fail( "read failed: $errno", sysread => $errno ) if $_->[RQ_FUTURE];
+ $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
}
undef @{ $self->{readqueue} };
@@ -960,7 +985,7 @@ sub _do_read
$self->maybe_invoke_event( on_read_eof => );
$self->close_now if $self->{close_on_read_eof};
foreach ( @{ $self->{readqueue} } ) {
- $_->[RQ_FUTURE]->done( undef ) if $_->[RQ_FUTURE];
+ $_->future->done( undef ) if $_->future;
}
undef @{ $self->{readqueue} };
return;
@@ -1012,7 +1037,7 @@ sub push_on_read
my ( $on_read, %args ) = @_;
# %args undocumented for internal use
- push @{ $self->{readqueue} }, [ $on_read, $args{future} ];
+ push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );
# TODO: Should this always defer?
1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
@@ -1075,9 +1100,9 @@ sub _read_future
return $f;
}
-=head2 $stream->read_atmost( $len ) ==> ( $string, $eof )
+=head2 ( $string, $eof ) = $stream->read_atmost( $len )->get
-=head2 $stream->read_exactly( $len ) ==> ( $string, $eof )
+=head2 ( $string, $eof ) = $stream->read_exactly( $len )->get
Completes the C<Future> when the read buffer contains C<$len> or more
characters of input. C<read_atmost> will also complete after the first
@@ -1117,7 +1142,7 @@ sub read_exactly
return $f;
}
-=head2 $stream->read_until( $end ) ==> ( $string, $eof )
+=head2 ( $string, $eof ) = $stream->read_until( $end )->get
Completes the C<Future> when the read buffer contains a match for C<$end>,
which may either be a plain string or a compiled C<Regexp> reference. Yields
@@ -1151,7 +1176,7 @@ sub read_until
return $f;
}
-=head2 $stream->read_until_eof ==> ( $string, $eof )
+=head2 ( $string, $eof ) = $stream->read_until_eof->get
Completes the C<Future> when the stream is eventually closed at EOF, and
yields all of the data that was available.
@@ -1233,9 +1258,6 @@ available then C<0> is returned, to indicate it should not be tried again. If
a line was successfully extracted, then C<1> is returned, to indicate it
should try again in case more lines exist in the buffer.
-For implementing real network protocols that are based on lines of text it may
-be more appropriate to use a subclass of L<IO::Async::Protocol::LineStream>.
-
=head2 Reading binary data
This C<on_read> method accepts incoming records in 16-byte chunks, printing