diff options
author | Axel Beckert <abe@deuxchevaux.org> | 2015-06-07 14:37:43 +0200 |
---|---|---|
committer | Axel Beckert <abe@deuxchevaux.org> | 2015-06-07 14:37:43 +0200 |
commit | ad76cc8ae4fee79962ea2be7f170d3b43f63a7c7 (patch) | |
tree | f81c74f75429e829714029850f89ee4c7f13aa39 /lib/IO/Async | |
parent | 64c71ff58dc2df52647d2afa92266c1f9beac7e3 (diff) |
Imported Upstream version 0.67
Diffstat (limited to 'lib/IO/Async')
35 files changed, 617 insertions, 357 deletions
diff --git a/lib/IO/Async/Channel.pm b/lib/IO/Async/Channel.pm index f459ee7..6e638f9 100644 --- a/lib/IO/Async/Channel.pm +++ b/lib/IO/Async/Channel.pm @@ -1,18 +1,17 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2011-2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk package IO::Async::Channel; use strict; use warnings; -use base qw( IO::Async::Notifier ); # just to get _capture_weakself +use base qw( IO::Async::Notifier ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; -use Storable qw( freeze thaw ); use IO::Async::Stream; @@ -43,10 +42,23 @@ retrieved from it by the C<recv> method. Values inserted into the Channel are snapshot by the C<send> method. Any changes to referred variables will not be observed by the other end of the Channel after the C<send> method returns. -Since the channel uses L<Storable> to serialise values to write over the -communication filehandle only reference values may be passed. To pass a single -scalar value, C<send> a SCALAR reference to it, and dereference the result of -C<recv>. +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>: + +=head2 codec => STR + +Gives the name of the encoding method used to represent values over the +channel. + +By default this will be C<Storable>, to use the core L<Storable> module. As +this only supports references, to pass a single scalar value, C<send> a SCALAR +reference to it, and dereference the result of C<recv>. + +If the L<Sereal::Encoder> and L<Sereal::Decoder> modules are installed, this +can be set to C<Sereal> instead, and will use those to perform the encoding +and decoding. This optional dependency may give higher performance than using +C<Storable>. =cut @@ -61,21 +73,12 @@ should be shared by both sides of a C<fork()>ed process. After C<fork()> the two C<setup_*> methods may be used to configure the object for operation on either end. -While this object does in fact inherit from L<IO::Async::Notifier> for -implementation reasons it is not intended that this object be used as a -Notifier. It should not be added to a Loop object directly; event management -will be handled by its containing C<IO::Async::Routine> object. +While this object does in fact inherit from L<IO::Async::Notifier>, it should +not be added to a Loop object directly; event management will be handled by +its containing C<IO::Async::Routine> object. =cut -sub new -{ - my $class = shift; - return bless { - mode => "", - }, $class; -} - =head1 METHODS The following methods documented with a trailing call to C<< ->get >> return @@ -108,6 +111,16 @@ channel gets closed by the peer. =cut +sub _init +{ + my $self = shift; + my ( $params ) = @_; + + $params->{codec} //= "Storable"; + + $self->SUPER::_init( $params ); +} + sub configure { my $self = shift; @@ -122,9 +135,38 @@ sub configure $self->_build_stream; } + if( my $codec = delete $params{codec} ) { + ( $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'" ) + ->( $self ); + } + $self->SUPER::configure( %params ); } +sub _make_codec_Storable +{ + my $self = shift; + + require Storable; + + $self->{encode} = \&Storable::freeze; + $self->{decode} = \&Storable::thaw; +} + +sub _make_codec_Sereal +{ + my $self = shift; + + require Sereal::Encoder; + require Sereal::Decoder; + + my $encoder = Sereal::Encoder->new; + $self->{encode} = sub { $encoder->encode( $_[0] ) }; + + my $decoder = Sereal::Decoder->new; + $self->{decode} = sub { $decoder->decode( $_[0] ) }; +} + =head2 $channel->send( $data ) Pushes the data stored in the given Perl reference into the FIFO of the @@ -140,8 +182,7 @@ sub send my $self = shift; my ( $data ) = @_; - my $record = freeze $data; - $self->send_frozen( $record ); + $self->send_frozen( $self->{encode}->( $data ) ); } =head2 $channel->send_frozen( $record ) @@ -271,7 +312,7 @@ sub _recv_sync defined $n or die "Cannot read - $!"; length $n or return undef; - return thaw $record; + return $self->{decode}->( $record ); } sub _send_sync @@ -384,7 +425,7 @@ sub _on_stream_read my $len = unpack( "I", $$buffref ); return 0 unless length( $$buffref ) >= 4 + $len; - my $record = thaw( substr( $$buffref, 4, $len ) ); + my $record = $self->{decode}->( substr( $$buffref, 4, $len ) ); substr( $$buffref, 0, 4 + $len ) = ""; if( my $on_result = shift @{ $self->{on_result_queue} } ) { diff --git a/lib/IO/Async/ChildManager.pm b/lib/IO/Async/ChildManager.pm index c8d486d..48ac108 100644 --- a/lib/IO/Async/ChildManager.pm +++ b/lib/IO/Async/ChildManager.pm @@ -8,7 +8,7 @@ package IO::Async::ChildManager; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; # Not a notifier @@ -360,6 +360,16 @@ Shortcuts for C<fd0>, C<fd1> and C<fd2> respectively. A reference to a hash to set as the child process's environment. +Note that this will entirely set a new environment, completely replacing the +existing one. If you want to simply add new keys or change the values of some +keys without removing the other existing ones, you can simply copy C<%ENV> +into the hash before setting new keys: + + env => { + %ENV, + ANOTHER => "key here", + } + =item nice => INT Change the child process's scheduling priority using C<POSIX::nice>. diff --git a/lib/IO/Async/Debug.pm b/lib/IO/Async/Debug.pm new file mode 100644 index 0000000..b23db0e --- /dev/null +++ b/lib/IO/Async/Debug.pm @@ -0,0 +1,98 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2015 -- leonerd@leonerd.org.uk + +package IO::Async::Debug; + +use strict; +use warnings; + +our $VERSION = '0.67'; + +our $DEBUG = $ENV{IO_ASYNC_DEBUG} || 0; +our $DEBUG_FD = $ENV{IO_ASYNC_DEBUG_FD}; +our $DEBUG_FILE = $ENV{IO_ASYNC_DEBUG_FILE}; +our $DEBUG_FH; +our %DEBUG_FLAGS = map { $_ => 1 } split m/,/, $ENV{IO_ASYNC_DEBUG_FLAGS} // ""; + +=head1 NAME + +C<IO::Async::Debug> - debugging control and support for L<IO::Async> + +=head1 DESCRIPTION + +The following methods and behaviours are still experimental and may change or +even be removed in future. + +Debugging support is enabled by an environment variable called +C<IO_ASYNC_DEBUG> having a true value. + +When debugging is enabled, the C<make_event_cb> and C<invoke_event> methods +on L<IO::Async::Notifier> (and their C<maybe_> variants) are altered such that +when the event is fired, a debugging line is printed, using the C<debug_printf> +method. This identifes the name of the event. + +By default, the line is only printed if the caller of one of these methods is +the same package as the object is blessed into, allowing it to print the +events of the most-derived class, without the extra verbosity of the +lower-level events of its parent class used to create it. All calls regardless +of caller can be printed by setting a number greater than 1 as the value of +C<IO_ASYNC_DEBUG>. + +By default the debugging log goes to C<STDERR>, but two other environment +variables can redirect it. If C<IO_ASYNC_DEBUG_FILE> is set, it names a file +which will be opened for writing, and logging written into it. Otherwise, if +C<IO_ASYNC_DEBUG_FD> is set, it gives a file descriptor number that logging +should be written to. If opening the named file or file descriptor fails then +the log will be written to C<STDERR> as normal. + +Extra debugging flags can be set in a comma-separated list in an environment +variable called C<IO_ASYNC_DEBUG_FLAGS>. The presence of these flags can cause +extra information to be written to the log. Full details on these flags will +be documented by the implementing classes. Typically these flags take the form +of one or more capital letters indicating the class, followed by one or more +lowercase letters enabling some particular feature within that class. + +=cut + +sub logf +{ + my ( $fmt, @args ) = @_; + + $DEBUG_FH ||= do { + my $fh; + if( $DEBUG_FILE ) { + open $fh, ">", $DEBUG_FILE or undef $fh; + } + elsif( $DEBUG_FD ) { + $fh = IO::Handle->new; + $fh->fdopen( $DEBUG_FD, "w" ) or undef $fh; + } + $fh ||= \*STDERR; + $fh->autoflush; + $fh; + }; + + printf $DEBUG_FH $fmt, @args; +} + +sub log_hexdump +{ + my ( $bytes ) = @_; + + foreach my $chunk ( $bytes =~ m/(.{1,16})/sg ) { + my $chunk_hex = join " ", map { sprintf "%02X", ord $_ } split //, $chunk; + ( my $chunk_safe = $chunk ) =~ s/[^\x20-\x7e]/./g; + + logf " | %-48s | %-16s |\n", $chunk_hex, $chunk_safe; + } +} + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; diff --git a/lib/IO/Async/File.pm b/lib/IO/Async/File.pm index ccfcfe5..cbb3604 100644 --- a/lib/IO/Async/File.pm +++ b/lib/IO/Async/File.pm @@ -9,7 +9,7 @@ use 5.010; # // use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Timer::Periodic ); diff --git a/lib/IO/Async/FileStream.pm b/lib/IO/Async/FileStream.pm index 0a82f7b..96778c9 100644 --- a/lib/IO/Async/FileStream.pm +++ b/lib/IO/Async/FileStream.pm @@ -8,7 +8,7 @@ package IO::Async::FileStream; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Stream ); diff --git a/lib/IO/Async/Function.pm b/lib/IO/Async/Function.pm index 04b8be9..adaf396 100644 --- a/lib/IO/Async/Function.pm +++ b/lib/IO/Async/Function.pm @@ -1,14 +1,14 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2011-2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk package IO::Async::Function; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Notifier ); use IO::Async::Timer::Countdown; @@ -417,7 +417,7 @@ sub call # Caller is not going to keep hold of the Future, so we have to ensure it # stays alive somehow - $future->on_ready( sub { undef $future } ); # intentional cycle + $self->adopt_future( $future->else( sub { Future->done } ) ); } sub _worker_objects @@ -625,10 +625,10 @@ sub call $worker->{exit_on_die} && $type eq "e"; if( $type eq "r" ) { - return Future->new->done( @values ); + return Future->done( @values ); } elsif( $type eq "e" ) { - return Future->new->fail( @values ); + return Future->fail( @values ); } else { die "Unrecognised type from worker - $type\n"; @@ -640,7 +640,7 @@ sub call $worker->stop; - return Future->new->fail( "closed", "closed" ); + return Future->fail( "closed", "closed" ); } ) )->on_ready( $worker->_capture_weakself( sub { my ( $worker, $f ) = @_; diff --git a/lib/IO/Async/Future.pm b/lib/IO/Async/Future.pm index 4b5baee..5bf8395 100644 --- a/lib/IO/Async/Future.pm +++ b/lib/IO/Async/Future.pm @@ -8,7 +8,7 @@ package IO::Async::Future; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( Future ); Future->VERSION( '0.05' ); # to respect subclassing diff --git a/lib/IO/Async/Handle.pm b/lib/IO/Async/Handle.pm index 902fe0d..201d900 100644 --- a/lib/IO/Async/Handle.pm +++ b/lib/IO/Async/Handle.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2006-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2006-2015 -- leonerd@leonerd.org.uk package IO::Async::Handle; @@ -9,13 +9,14 @@ use strict; use warnings; use base qw( IO::Async::Notifier ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; use IO::Handle; # give methods to bare IO handles use Future; +use Future::Utils qw( try_repeat ); use IO::Async::OS; @@ -603,29 +604,51 @@ sub socket $self->set_handle( $sock ); } -=head2 $handle->bind( $ai ) +=head2 $handle = $handle->bind( %args )->get -Convenient shortcut to creating a socket handle and C<bind()>ing it to the -address as given by an addrinfo structure, and setting it as the read and -write handle for the object. +Performs a C<getaddrinfo> resolver operation with the C<passive> flag set, +and then attempts to bind a socket handle of any of the return values. + +=head2 $handle = $handle->bind( $ai )->get + +When invoked with a single argument, this method is a convenient shortcut to +creating a socket handle and C<bind()>ing it to the address as given by an +addrinfo structure, and setting it as the read and write handle for the +object. C<$ai> may be either a C<HASH> or C<ARRAY> reference of the same form as given to L<IO::Async::OS>'s C<extract_addrinfo> method. -This method returns nothing if it succeeds, or throws an exception if it -fails. +The returned future returns the handle object itself for convenience. =cut sub bind { my $self = shift; - my ( $ai ) = @_; - $self->socket( $ai ); - my $addr = ( IO::Async::OS->extract_addrinfo( $ai ) )[3]; + if( @_ == 1 ) { + my ( $ai ) = @_; + + $self->socket( $ai ); + my $addr = ( IO::Async::OS->extract_addrinfo( $ai ) )[3]; + + $self->read_handle->bind( $addr ) or + return Future->fail( "Cannot bind - $!", bind => $self->read_handle, $addr, $! ); + + return Future->done( $self ); + } + + $self->loop->resolver->getaddrinfo( passive => 1, @_ )->then( sub { + my @addrs = @_; + + try_repeat { + my $ai = shift; - $self->read_handle->bind( $addr ) or croak "Cannot bind - $!"; + $self->bind( $ai ); + } foreach => \@addrs, + until => sub { shift->is_done }; + }); } =head2 $handle = $handle->connect( %args )->get diff --git a/lib/IO/Async/Internals/Connector.pm b/lib/IO/Async/Internals/Connector.pm index 43845cf..57029ef 100644 --- a/lib/IO/Async/Internals/Connector.pm +++ b/lib/IO/Async/Internals/Connector.pm @@ -9,7 +9,7 @@ package # hide from CPAN use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Scalar::Util qw( weaken ); @@ -95,13 +95,13 @@ sub _connect_addresses if( !$sock ) { $socketerr = $!; $on_fail->( "socket", $family, $socktype, $protocol, $! ) if $on_fail; - return Future->new->fail( 1 ); + return Future->fail( 1 ); } if( $localaddr and not $sock->bind( $localaddr ) ) { $binderr = $!; $on_fail->( "bind", $sock, $localaddr, $! ) if $on_fail; - return Future->new->fail( 1 ); + return Future->fail( 1 ); } $sock->blocking( 0 ); @@ -111,12 +111,12 @@ sub _connect_addresses if( $ret ) { # Succeeded already? Dubious, but OK. Can happen e.g. with connections to # localhost, or UNIX sockets, or something like that. - return Future->new->done( $sock ); + return Future->done( $sock ); } elsif( $! != EINPROGRESS and !CONNECT_EWOULDLBOCK || $! != POSIX::EWOULDBLOCK ) { $connecterr = $!; $on_fail->( "connect", $sock, $peeraddr, $! ) if $on_fail; - return Future->new->fail( 1 ); + return Future->fail( 1 ); } # Else diff --git a/lib/IO/Async/Listener.pm b/lib/IO/Async/Listener.pm index e73f4ce..277a9ab 100644 --- a/lib/IO/Async/Listener.pm +++ b/lib/IO/Async/Listener.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2008-2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2008-2015 -- leonerd@leonerd.org.uk package IO::Async::Listener; @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Handle ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use IO::Async::Handle; use IO::Async::OS; @@ -43,7 +43,7 @@ C<IO::Async::Listener> - listen on network sockets for incoming connections return 0; }, ); - + $loop->add( $stream ); }, ); @@ -53,10 +53,7 @@ C<IO::Async::Listener> - listen on network sockets for incoming connections $listener->listen( service => "echo", socktype => 'stream', - - on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; }, - on_listen_error => sub { print STDERR "Cannot listen\n"; }, - ); + )->get; $loop->run; @@ -74,10 +71,7 @@ This object can also be used indirectly via an C<IO::Async::Loop>: on_stream => sub { ... }, - - on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; }, - on_listen_error => sub { print STDERR "Cannot listen\n"; }, - ); + )->get; $loop->run; @@ -128,7 +122,7 @@ constructor or class. Optional. Invoked if the C<accept> syscall indicates an error (other than C<EAGAIN> or C<EWOULDBLOCK>). If not provided, failures of C<accept> will -simply be ignored. +be passed to the main C<on_error> handler. =cut @@ -294,13 +288,17 @@ sub on_read_ready my ( $result ) = @_ or return; # false-alarm $on_done->( $self, $result ); })->on_fail( sub { - my ( $message, undef, $socket, $dollarbang ) = @_; - $self->maybe_invoke_event( on_accept_error => $socket, $dollarbang ); + my ( $message, $name, @args ) = @_; + if( $name eq "accept" ) { + my ( $socket, $dollarbang ) = @args; + $self->maybe_invoke_event( on_accept_error => $socket, $dollarbang ) or + $self->invoke_error( "accept() failed - $dollarbang", accept => $socket, $dollarbang ); + } }); # Caller is not going to keep hold of the Future, so we have to ensure it # stays alive somehow - $f->on_ready( sub { undef $f } ); # intentional cycle + $self->adopt_future( $f->else( sub { Future->done } ) ); } sub _accept @@ -314,17 +312,17 @@ sub _accept $accepted->blocking( 0 ); if( my $handle = $params{handle} ) { $handle->set_handle( $accepted ); - return Future->new->done( $handle ); + return Future->done( $handle ); } else { - return Future->new->done( $accepted ); + return Future->done( $accepted ); } } elsif( $! == EAGAIN or $! == EWOULDBLOCK ) { - return Future->new->done; + return Future->done; } else { - return Future->new->fail( "Cannot accept() - $!", accept => $listen_sock, $! ); + return Future->fail( "Cannot accept() - $!", accept => $listen_sock, $! ); } } @@ -462,7 +460,7 @@ sockets. return 0; }, ); - + $loop->add( $stream ); }, ); @@ -510,6 +508,38 @@ earlier example: ... ); +=head2 Using A Kernel-Assigned Port Number + +Rather than picking a specific port number, is it possible to ask the kernel +to assign one arbitrarily that is currently free. This can be done by +requesting port number 0 (which is actually the default if no port number is +otherwise specified). To determine which port number the kernel actually +picked, inspect the C<sockport> accessor on the actual socket filehandle. + +Either use the L<Future> returned by the C<listen> method: + + $listener->listen( + addr => { family => "inet" }, + )->on_done( sub { + my ( $listener ) = @_; + my $socket = $listener->read_handle; + + say "Now listening on port ", $socket->sockport; + }); + +Or pass an C<on_listen> continuation: + + $listener->listen( + addr => { family => "inet" }, + + on_listen => sub { + my ( $listener ) = @_; + my $socket = $listener->read_handle; + + say "Now listening on port ", $socket->sockport; + }, + ); + =head1 AUTHOR Paul Evans <leonerd@leonerd.org.uk> diff --git a/lib/IO/Async/Loop.pm b/lib/IO/Async/Loop.pm index a59a2e6..e510c64 100644 --- a/lib/IO/Async/Loop.pm +++ b/lib/IO/Async/Loop.pm @@ -1,14 +1,15 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2007-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk package IO::Async::Loop; use strict; use warnings; +use 5.010; -our $VERSION = '0.64'; +our $VERSION = '0.67'; # When editing this value don't forget to update the docs below use constant NEED_API_VERSION => '0.33'; @@ -78,6 +79,8 @@ $SIG{ALRM} = sub { } } if WATCHDOG_ENABLE; +$SIG{PIPE} = "IGNORE" if ( $SIG{PIPE} // "" ) eq "DEFAULT"; + =head1 NAME C<IO::Async::Loop> - core loop of the C<IO::Async> framework @@ -133,6 +136,15 @@ See also the two bundled Loop subclasses: Or other subclasses that may appear on CPAN which are not part of the core C<IO::Async> distribution. +=head2 Ignoring SIGPIPE + +Since version I<0.66> loading this module automatically ignores C<SIGPIPE>, as +it is highly unlikely that the default-terminate action is the best course of +action for an C<IO::Async>-based program to take. If at load time the handler +disposition is still set as C<DEFAULT>, it is set to ignore. If already +another handler has been placed there by the program code, it will be left +undisturbed. + =cut # Internal constructor used by subclasses @@ -235,12 +247,21 @@ In cases where the module subclass is a hard requirement, such as GTK programs using C<Glib>, it would be better to use the module specifically and invoke its constructor directly. +=item * IO::Async::OS->LOOP_PREFER_CLASSES + +The L<IO::Async::OS> hints module for the given OS is then consulted to see if +it suggests any other module classes specific to the given operating system. + =item * $^O The module called C<IO::Async::Loop::$^O> is tried next. This allows specific OSes, such as the ever-tricky C<MSWin32>, to provide an implementation that might be more efficient than the generic ones, or even work at all. +This option is now discouraged in favour of the C<IO::Async::OS> hint instead. +At some future point it may be removed entirely, given as currently only +C<linux> uses it. + =item * Poll and Select Finally, if no other choice has been made by now, the built-in C<Poll> module @@ -309,7 +330,14 @@ sub really_new warn "Unable to use $class - $topline\n"; } - $self = __try_new( "IO::Async::Loop::$^O" ) and return $self unless $LOOP_NO_OS; + unless( $LOOP_NO_OS ) { + foreach my $class ( IO::Async::OS->LOOP_PREFER_CLASSES, "IO::Async::Loop::$^O" ) { + $class =~ m/::/ or $class = "IO::Async::Loop::$class"; + $self = __try_new( $class ) and return $self; + + # Don't complain about these ones + } + } return IO::Async::Loop->new_builtin; } @@ -530,6 +558,25 @@ sub loop_stop $self->stop; } +=head2 $loop->post_fork + +The base implementation of this method does nothing. It is provided in case +some Loop subclasses should take special measures after a C<fork()> system +call if the main body of the program should survive in both running processes. + +This may be required, for example, in a long-running server daemon that forks +multiple copies on startup after opening initial listening sockets. A loop +implementation that uses some in-kernel resource that becomes shared after +forking (for example, a Linux C<epoll> or a BSD C<kqueue> filehandle) would +need recreating in the new child process before the program can continue. + +=cut + +sub post_fork +{ + # empty +} + ########### # Futures # ########### @@ -1062,6 +1109,8 @@ sub set_resolver for qw( resolve getaddrinfo getnameinfo ); $self->{resolver} = $resolver; + + $self->add( $resolver ); } =head2 @result = $loop->resolve( %params )->get @@ -1357,7 +1406,7 @@ sub connect $future = $future->then( sub { $handle->set_handle( shift ); - return Future->new->done( $handle ) + return Future->done( $handle ) }) if $handle; $future->on_done( $on_done ) if $on_done; @@ -2011,6 +2060,10 @@ existing one of that type. It is not required that both are provided. Applications should use a C<IO::Async::Handle> or C<IO::Async::Stream> instead of using this method. +If the filehandle does not yet have the C<O_NONBLOCK> flag set, it will be +enabled by this method. This will ensure that any subsequent C<sysread>, +C<syswrite>, or similar will not block on the filehandle. + =cut # This class specifically does NOT implement this method, so that subclasses @@ -2021,6 +2074,10 @@ sub __watch_io my %params = @_; my $handle = delete $params{handle} or croak "Expected 'handle'"; + defined eval { $handle->fileno } or croak "Expected that 'handle' has defined ->fileno"; + + # Silent "upgrade" to O_NONBLOCK + $handle->blocking and $handle->blocking(0); my $watch = ( $self->{iowatches}->{$handle->fileno} ||= [] ); diff --git a/lib/IO/Async/Loop/Poll.pm b/lib/IO/Async/Loop/Poll.pm index a1154d3..fb7bbf1 100644 --- a/lib/IO/Async/Loop/Poll.pm +++ b/lib/IO/Async/Loop/Poll.pm @@ -1,14 +1,14 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2007-2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk package IO::Async::Loop::Poll; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use constant API_VERSION => '0.49'; use base qw( IO::Async::Loop ); @@ -62,16 +62,34 @@ program already using an C<IO::Poll> object. =head1 DESCRIPTION -This subclass of C<IO::Async::Loop> uses an C<IO::Poll> object to perform +This subclass of C<IO::Async::Loop> uses the C<poll(2)> system call to perform read-ready and write-ready tests. -To integrate with existing code that uses an C<IO::Poll>, a C<post_poll> can -be called immediately after the C<poll> method on the contained C<IO::Poll> -object. The appropriate mask bits are maintained on the C<IO::Poll> object -when notifiers are added or removed from the set, or when they change their -C<want_writeready> status. The C<post_poll> method inspects the result bits -and invokes the C<on_read_ready> or C<on_write_ready> methods on the -notifiers. +By default, this loop will use the underlying C<poll()> system call directly, +bypassing the usual L<IO::Poll> object wrapper around it because of a number +of bugs and design flaws in that class; namely + +=over 2 + +=item * + +L<https://rt.cpan.org/Ticket/Display.html?id=93107> - IO::Poll relies on +stable stringification of IO handles + +=item * + +L<https://rt.cpan.org/Ticket/Display.html?id=25049> - IO::Poll->poll() with no +handles always returns immediately + +=back + +However, to integrate with existing code that uses an C<IO::Poll> object, a +C<post_poll> can be called immediately after the C<poll> method that +C<IO::Poll> object. The appropriate mask bits are maintained on the +C<IO::Poll> object when notifiers are added or removed from the loop, or when +they change their C<want_*> status. The C<post_poll> method inspects the +result bits and invokes the C<on_read_ready> or C<on_write_ready> methods on +the notifiers. =cut @@ -89,7 +107,8 @@ takes the following named arguments: =item C<poll> The C<IO::Poll> object to use for notification. Optional; if a value is not -given, a new C<IO::Poll> object will be constructed. +given, the underlying C<IO::Poll::_poll()> function is invoked directly, +outside of the object wrapping. =back @@ -102,11 +121,10 @@ sub new my $poll = delete $args{poll}; - $poll ||= IO::Poll->new; - my $self = $class->__new( %args ); $self->{poll} = $poll; + $self->{pollmask} = {}; return $self; } @@ -115,7 +133,7 @@ sub new =cut -=head2 $count = $loop->post_poll( $poll ) +=head2 $count = $loop->post_poll This method checks the returned event list from a C<IO::Poll::poll> call, and calls any of the notification methods or callbacks that are appropriate. @@ -123,14 +141,6 @@ It returns the total number of callbacks that were invoked; that is, the total number of C<on_read_ready> and C<on_write_ready> callbacks for C<watch_io>, and C<watch_time> event callbacks. -=over 8 - -=item $poll - -Reference to the C<IO::Poll> object - -=back - =cut sub post_poll @@ -147,7 +157,8 @@ sub post_poll foreach my $fd ( keys %$iowatches ) { my $watch = $iowatches->{$fd} or next; - my $events = $poll->events( $watch->[0] ); + my $events = $poll ? $poll->events( $watch->[0] ) + : $self->{pollevents}{$fd}; if( FAKE_ISREG_READY and $self->{fake_isreg}{$fd} ) { $events |= $self->{fake_isreg}{$fd} & ( POLLIN|POLLOUT ); } @@ -202,38 +213,56 @@ sub loop_once $timeout += ( 1 - $fraction ) / 1000 if $fraction; } - my $poll = $self->{poll}; + if( my $poll = $self->{poll} ) { + my $pollret; + + # There is a bug in IO::Poll at least version 0.07, where poll with no + # registered masks returns immediately, rather than waiting for a timeout + # This has been reported: + # http://rt.cpan.org/Ticket/Display.html?id=25049 + if( $poll->handles ) { + $pollret = $poll->poll( $timeout ); + + if( ( $pollret == -1 and $! == EINTR ) or $pollret == 0 + and defined $self->{sigproxy} ) { + # A signal occured and we have a sigproxy. Allow one more poll call + # with zero timeout. If it finds something, keep that result. If it + # finds nothing, keep -1 - my $pollret; + # Preserve $! whatever happens + local $!; - # There is a bug in IO::Poll at least version 0.07, where poll with no - # registered masks returns immediately, rather than waiting for a timeout - # This has been reported: - # http://rt.cpan.org/Ticket/Display.html?id=25049 - if( $poll->handles ) { - $pollret = $poll->poll( $timeout ); + my $secondattempt = $poll->poll( 0 ); + $pollret = $secondattempt if $secondattempt > 0; + } + } + else { + # Workaround - we'll use select to fake a millisecond-accurate sleep + $pollret = select( undef, undef, undef, $timeout ); + } - if( ( $pollret == -1 and $! == EINTR ) or $pollret == 0 - and defined $self->{sigproxy} ) { - # A signal occured and we have a sigproxy. Allow one more poll call - # with zero timeout. If it finds something, keep that result. If it - # finds nothing, keep -1 + return undef unless defined $pollret; + return $self->post_poll; + } + else { + my $msec = defined $timeout ? $timeout * 1000 : -1; + my @pollmasks = %{ $self->{pollmask} }; - # Preserve $! whatever happens + my $pollret = IO::Poll::_poll( $msec, @pollmasks ); + if( $pollret == -1 and $! == EINTR or + $pollret == 0 and $self->{sigproxy} ) { local $!; - my $secondattempt = $poll->poll( 0 ); + @pollmasks = %{ $self->{pollmask} }; + my $secondattempt = IO::Poll::_poll( $msec, @pollmasks ); $pollret = $secondattempt if $secondattempt > 0; } - } - else { - # Workaround - we'll use select to fake a millisecond-accurate sleep - $pollret = select( undef, undef, undef, $timeout ); - } - return undef unless defined $pollret; + return undef unless defined $pollret; - return $self->post_poll; + $self->{pollevents} = { @pollmasks }; + return $self->post_poll; + } } sub watch_io @@ -246,8 +275,11 @@ sub watch_io my $poll = $self->{poll}; my $handle = $params{handle}; + my $fileno = $handle->fileno; - my $curmask = $poll->mask( $handle ) || 0; + my $curmask = $poll ? $poll->mask( $handle ) + : $self->{pollmask}{$fileno}; + $curmask ||= 0; my $mask = $curmask; $params{on_read_ready} and $mask |= POLLIN; @@ -255,10 +287,17 @@ sub watch_io $params{on_hangup} and $mask |= POLLHUP; if( FAKE_ISREG_READY and S_ISREG +(stat $handle)[2] ) { - $self->{fake_isreg}{$handle->fileno} = $mask; + $self->{fake_isreg}{$fileno} = $mask; } - $poll->mask( $handle, $mask ) if $mask != $curmask; + return if $mask == $curmask; + + if( $poll ) { + $poll->mask( $handle, $mask ); + } + else { + $self->{pollmask}{$fileno} = $mask; + } } sub unwatch_io @@ -268,12 +307,14 @@ sub unwatch_io $self->__unwatch_io( %params ); - # Guard for global destruction - my $poll = $self->{poll} or return; + my $poll = $self->{poll}; my $handle = $params{handle}; + my $fileno = $handle->fileno; - my $curmask = $poll->mask( $handle ) || 0; + my $curmask = $poll ? $poll->mask( $handle ) + : $self->{pollmask}{$fileno}; + $curmask ||= 0; my $mask = $curmask; $params{on_read_ready} and $mask &= ~POLLIN; @@ -289,7 +330,15 @@ sub unwatch_io } } - $poll->mask( $handle, $mask ) if $mask != $curmask; + return if $mask == $curmask; + + if( $poll ) { + $poll->mask( $handle, $mask ); + } + else { + $mask ? ( $self->{pollmask}{$fileno} = $mask ) + : ( delete $self->{pollmask}{$fileno} ); + } } =head1 AUTHOR diff --git a/lib/IO/Async/Loop/Select.pm b/lib/IO/Async/Loop/Select.pm index 37ff60d..0c3bd9c 100644 --- a/lib/IO/Async/Loop/Select.pm +++ b/lib/IO/Async/Loop/Select.pm @@ -1,14 +1,14 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2007-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk package IO::Async::Loop::Select; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use constant API_VERSION => '0.49'; use base qw( IO::Async::Loop ); @@ -177,7 +177,7 @@ sub post_select alarm( IO::Async::Loop->WATCHDOG_INTERVAL ) if WATCHDOG_ENABLE; foreach my $fd ( keys %$iowatches ) { - my $watch = $iowatches->{$fd}; + my $watch = $iowatches->{$fd} or next; my $fileno = $watch->[0]->fileno; diff --git a/lib/IO/Async/LoopTests.pm b/lib/IO/Async/LoopTests.pm index 315e6e4..63f1257 100644 --- a/lib/IO/Async/LoopTests.pm +++ b/lib/IO/Async/LoopTests.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2009-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2009-2015 -- leonerd@leonerd.org.uk package IO::Async::LoopTests; @@ -27,7 +27,7 @@ use POSIX qw( SIGTERM ); use Socket qw( sockaddr_family AF_UNIX ); use Time::HiRes qw( time ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; # Abstract Units of Time use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1; @@ -158,7 +158,7 @@ Tests the Loop's ability to watch filehandles for IO readiness =cut -use constant count_tests_io => 17; +use constant count_tests_io => 18; sub run_tests_io { { @@ -336,6 +336,28 @@ sub run_tests_io is( $callcount, 1, 'read/write_ready can cancel each other' ); } + # Check that cross-connected handlers can cancel each other + { + my ( $SA1, $SA2 ) = IO::Async::OS->socketpair or die "Cannot socketpair - $!"; + my ( $SB1, $SB2 ) = IO::Async::OS->socketpair or die "Cannot socketpair - $!"; + $_->blocking( 0 ) for $SA1, $SA2, $SB1, $SB2; + + my @handles = ( $SA1, $SB1 ); + + my $callcount = 0; + $loop->watch_io( + handle => $_, + on_write_ready => sub { + $callcount++; + $loop->unwatch_io( handle => $_, on_write_ready => 1 ) for @handles; + }, + ) for @handles; + + $loop->loop_once( 0.1 ); + + is( $callcount, 1, 'write_ready on crosslinked handles can cancel each other' ); + } + # Check that error conditions that aren't true read/write-ability are still # invoked { diff --git a/lib/IO/Async/MergePoint.pm b/lib/IO/Async/MergePoint.pm deleted file mode 100644 index f00e88b..0000000 --- a/lib/IO/Async/MergePoint.pm +++ /dev/null @@ -1,81 +0,0 @@ -# You may distribute under the terms of either the GNU General Public License -# or the Artistic License (the same terms as Perl itself) -# -# (C) Paul Evans, 2007,2009 -- leonerd@leonerd.org.uk - -package IO::Async::MergePoint; - -use strict; -use warnings; - -our $VERSION = '0.64'; - -use Carp; - -use base qw( Async::MergePoint ); - -carp "This module is deprecated; use Async::MergePoint instead"; - -=head1 NAME - -C<IO::Async::MergePoint> - resynchronise diverged control flow - -=head1 SYNOPSIS - -This module as now been moved to its own dist of L<Async::MergePoint>. - -It is kept here as a trivial subclass for backward compatibility. Eventually -this subclass may be removed. Any code using C<IO::Async::MergePoint> should -instead use L<Async::MergePoint>. - - use Async::MergePoint; - - my $merge = Async::MergePoint->new( - needs => [ "leaves", "water" ], - - on_finished => sub { - my %items = @_; - # Make tea using $items{leaves} and $items{water} - } - ); - - Kettle->boil( - on_boiled => sub { $merge->done( "water", $_[0] ) } - ); - - Cupboard->get_tea_leaves( - on_fetched => sub { $merge->done( "leaves", $_[0] ) } - ); - -=head1 DESCRIPTION - -Often in program logic, multiple different steps need to be taken that are -independent of each other, but their total result is needed before the next -step can be taken. In synchonous code, the usual approach is to do them -sequentially. - -An C<IO::Async>-based program could do this, but if each step involves some IO -idle time, better overall performance can often be gained by running the steps -in parallel. A L<Async::MergePoint> object can then be used to wait for all of -the steps to complete, before passing the combined result of each step on to -the next stage. - -A merge point maintains a set of outstanding operations it is waiting on; -these are arbitrary string values provided at the object's construction. Each -time the C<done> method is called, the named item is marked as being -complete. When all of the required items are so marked, the C<on_finished> -continuation is invoked. - -When an item is marked as complete, a value can also be provided, which would -contain the results of that step. The C<on_finished> callback is passed a hash -(in list form, rather than by reference) of the collected item values. - -=cut - -=head1 AUTHOR - -Paul Evans <leonerd@leonerd.org.uk> - -=cut - -0x55AA; diff --git a/lib/IO/Async/Notifier.pm b/lib/IO/Async/Notifier.pm index 44d18a6..f21c346 100644 --- a/lib/IO/Async/Notifier.pm +++ b/lib/IO/Async/Notifier.pm @@ -1,28 +1,25 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2006-2011 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2006-2015 -- leonerd@leonerd.org.uk package IO::Async::Notifier; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; use Scalar::Util qw( weaken ); -use Future 0.22; # ->else_done +use Future 0.26; # ->is_failed + +use IO::Async::Debug; # Perl 5.8.4 cannot do trampolines by modiying @_ then goto &$code use constant HAS_BROKEN_TRAMPOLINES => ( $] == "5.008004" ); -our $DEBUG = $ENV{IO_ASYNC_DEBUG} || 0; -our $DEBUG_FD = $ENV{IO_ASYNC_DEBUG_FD}; -our $DEBUG_FILE = $ENV{IO_ASYNC_DEBUG_FILE}; -our $DEBUG_FH; - =head1 NAME C<IO::Async::Notifier> - base class for C<IO::Async> event objects @@ -346,7 +343,7 @@ sub adopt_future delete $self->{IO_Async_Notifier__futures}{$fkey}; - $self->invoke_error( $f->failure ) if $f->failure; + $self->invoke_error( $f->failure ) if $f->is_failed; })); return $f; @@ -725,7 +722,7 @@ sub make_event_cb my $caller = caller; return $self->_capture_weakself( - !$DEBUG ? $code : sub { + !$IO::Async::Debug::DEBUG ? $code : sub { my $self = $_[0]; $self->_debug_printf_event( $caller, $event_name ); goto &$code; @@ -751,7 +748,7 @@ sub maybe_make_event_cb my $caller = caller; return $self->_capture_weakself( - !$DEBUG ? $code : sub { + !$IO::Async::Debug::DEBUG ? $code : sub { my $self = $_[0]; $self->_debug_printf_event( $caller, $event_name ); goto &$code; @@ -776,7 +773,7 @@ sub invoke_event my $code = $self->can_event( $event_name ) or croak "$self cannot handle $event_name event"; - $self->_debug_printf_event( scalar caller, $event_name ) if $DEBUG; + $self->_debug_printf_event( scalar caller, $event_name ) if $IO::Async::Debug::DEBUG; return $code->( $self, @args ); } @@ -798,37 +795,12 @@ sub maybe_invoke_event my $code = $self->can_event( $event_name ) or return undef; - $self->_debug_printf_event( scalar caller, $event_name ) if $DEBUG; + $self->_debug_printf_event( scalar caller, $event_name ) if $IO::Async::Debug::DEBUG; return [ $code->( $self, @args ) ]; } =head1 DEBUGGING SUPPORT -The following methods and behaviours are still experimental and may change or -even be removed in future. - -Debugging support is enabled by an environment variable called -C<IO_ASYNC_DEBUG> having a true value. - -When debugging is enabled, the C<make_event_cb> and C<invoke_event> methods -(and their C<maybe_> variants) are altered such that when the event is fired, -a debugging line is printed, using the C<debug_printf> method. This identifes -the name of the event. - -By default, the line is only printed if the caller of one of these methods is -the same package as the object is blessed into, allowing it to print the -events of the most-derived class, without the extra verbosity of the -lower-level events of its parent class used to create it. All calls regardless -of caller can be printed by setting a number greater than 1 as the value of -C<IO_ASYNC_DEBUG>. - -By default the debugging log goes to C<STDERR>, but two other environment -variables can redirect it. If C<IO_ASYNC_DEBUG_FILE> is set, it names a file -which will be opened for writing, and logging written into it. Otherwise, if -C<IO_ASYNC_DEBUG_FD> is set, it gives a file descriptor number that logging -should be written to. If opening the named file or file descriptor fails then -the log will be written to C<STDERR> as normal. - =cut =head2 $notifier->debug_printf( $format, @args ) @@ -862,7 +834,7 @@ will produce a line of output: sub debug_printf { - $DEBUG or return; + $IO::Async::Debug::DEBUG or return; my $self = shift; my ( $format, @args ) = @_; @@ -881,22 +853,7 @@ sub debug_printf s/^IO::Async::/Ia:/, s/^Net::Async::/Na:/ for @id; - $DEBUG_FH ||= do { - my $fh; - if( $DEBUG_FILE ) { - open $fh, ">", $DEBUG_FILE or undef $fh; - } - elsif( $DEBUG_FD ) { - $fh = IO::Handle->new; - $fh->fdopen( $DEBUG_FD, "w" ) or undef $fh; - } - $fh ||= \*STDERR; - $fh->autoflush; - $fh; - }; - - printf $DEBUG_FH "[%s] $format\n", - join("<-", @id), @args; + IO::Async::Debug::logf "[%s] $format\n", join("<-", @id), @args; } sub _debug_printf_event @@ -906,7 +863,7 @@ sub _debug_printf_event my $class = ref $self; - if( $DEBUG > 1 or $class eq $caller ) { + if( $IO::Async::Debug::DEBUG > 1 or $class eq $caller ) { s/^IO::Async::Protocol::/IaP:/, s/^IO::Async::/Ia:/, s/^Net::Async::/Na:/ for my $str_caller = $caller; diff --git a/lib/IO/Async/OS.pm b/lib/IO/Async/OS.pm index e344291..db16138 100644 --- a/lib/IO/Async/OS.pm +++ b/lib/IO/Async/OS.pm @@ -1,14 +1,14 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2012-2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2012-2015 -- leonerd@leonerd.org.uk package IO::Async::OS; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; our @ISA = qw( IO::Async::OS::_Base ); @@ -71,6 +71,9 @@ use constant HAVE_THREADS => !$ENV{IO_ASYNC_NO_THREADS} && # Preferred trial order for built-in Loop classes use constant LOOP_BUILTIN_CLASSES => qw( Poll Select ); +# Should there be any other Loop classes we try before the builtin ones? +use constant LOOP_PREFER_CLASSES => (); + =head1 NAME C<IO::Async::OS> - operating system abstractions for C<IO::Async> @@ -409,20 +412,23 @@ sub extract_addrinfo @ai = @$ai; } elsif( ref $ai eq "HASH" ) { - @ai = @{$ai}{qw( family socktype protocol addr )}; + $ai = { %$ai }; # copy so we can delete from it + @ai = delete @{$ai}{qw( family socktype protocol addr )}; + + if( defined $ai[ADDRINFO_FAMILY] and !defined $ai[ADDRINFO_ADDR] ) { + my $family = $ai[ADDRINFO_FAMILY]; + my $method = "_extract_addrinfo_$family"; + my $code = $self->can( $method ) or croak "Cannot determine addr for extract_addrinfo on family='$family'"; + + $ai[ADDRINFO_ADDR] = $code->( $self, $ai ); + + keys %$ai and croak "Unrecognised '$family' addrinfo keys: " . join( ", ", keys %$ai ); + } } else { croak "Expected '$argname' to be an ARRAY or HASH reference"; } - if( defined $ai[ADDRINFO_FAMILY] and !defined $ai[ADDRINFO_ADDR] and ref $ai eq "HASH" ) { - my $family = $ai[ADDRINFO_FAMILY]; - my $method = "_extract_addrinfo_$family"; - my $code = $self->can( $method ) or croak "Cannot determine addr for extract_addrinfo on family='$family'"; - - $ai[ADDRINFO_ADDR] = $code->( $self, $ai ); - } - $ai[ADDRINFO_FAMILY] = $self->getfamilybyname( $ai[ADDRINFO_FAMILY] ); $ai[ADDRINFO_SOCKTYPE] = $self->getsocktypebyname( $ai[ADDRINFO_SOCKTYPE] ); @@ -446,8 +452,8 @@ sub _extract_addrinfo_inet my $self = shift; my ( $ai ) = @_; - my $port = $ai->{port} || 0; - my $ip = $ai->{ip} || "0.0.0.0"; + my $port = delete $ai->{port} || 0; + my $ip = delete $ai->{ip} || "0.0.0.0"; return pack_sockaddr_in( $port, inet_aton( $ip ) ); } @@ -469,10 +475,10 @@ sub _extract_addrinfo_inet6 my $self = shift; my ( $ai ) = @_; - my $port = $ai->{port} || 0; - my $ip = $ai->{ip} || "::"; - my $scopeid = $ai->{scopeid} || 0; - my $flowinfo = $ai->{flowinfo} || 0; + my $port = delete $ai->{port} || 0; + my $ip = delete $ai->{ip} || "::"; + my $scopeid = delete $ai->{scopeid} || 0; + my $flowinfo = delete $ai->{flowinfo} || 0; if( HAVE_SOCKADDR_IN6 ) { return pack_sockaddr_in6( $port, inet_pton( AF_INET6, $ip ), $scopeid, $flowinfo ); @@ -493,7 +499,7 @@ sub _extract_addrinfo_unix my $self = shift; my ( $ai ) = @_; - defined( my $path = $ai->{path} ) or croak "Expected 'path' for extract_addrinfo on family='unix'"; + defined( my $path = delete $ai->{path} ) or croak "Expected 'path' for extract_addrinfo on family='unix'"; return pack_sockaddr_un( $path ); } diff --git a/lib/IO/Async/OS/MSWin32.pm b/lib/IO/Async/OS/MSWin32.pm index 3c49ddd..bfed7f7 100644 --- a/lib/IO/Async/OS/MSWin32.pm +++ b/lib/IO/Async/OS/MSWin32.pm @@ -8,7 +8,7 @@ package IO::Async::OS::MSWin32; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; our @ISA = qw( IO::Async::OS::_Base ); diff --git a/lib/IO/Async/OS/cygwin.pm b/lib/IO/Async/OS/cygwin.pm index a477805..630bf9a 100644 --- a/lib/IO/Async/OS/cygwin.pm +++ b/lib/IO/Async/OS/cygwin.pm @@ -8,7 +8,7 @@ package IO::Async::OS::cygwin; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; our @ISA = qw( IO::Async::OS::_Base ); diff --git a/lib/IO/Async/OS/linux.pm b/lib/IO/Async/OS/linux.pm index 26f365d..c12949b 100644 --- a/lib/IO/Async/OS/linux.pm +++ b/lib/IO/Async/OS/linux.pm @@ -1,14 +1,14 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2014-2015 -- leonerd@leonerd.org.uk package IO::Async::OS::linux; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; our @ISA = qw( IO::Async::OS::_Base ); @@ -24,6 +24,9 @@ See instead L<IO::Async::OS>. =cut +# Suggest either Epoll or Ppoll loops first if they are installed +use constant LOOP_PREFER_CLASSES => qw( Epoll Ppoll ); + # Try to use /proc/pid/fd to get the list of actually-open file descriptors # for our process. Saves a bit of time when running with high ulimit -n / # fileno counts. diff --git a/lib/IO/Async/PID.pm b/lib/IO/Async/PID.pm index 167d9eb..fc59f9c 100644 --- a/lib/IO/Async/PID.pm +++ b/lib/IO/Async/PID.pm @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Notifier ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; diff --git a/lib/IO/Async/Process.pm b/lib/IO/Async/Process.pm index a86c8e7..31c70d5 100644 --- a/lib/IO/Async/Process.pm +++ b/lib/IO/Async/Process.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2011-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk package IO::Async::Process; @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Notifier ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; @@ -41,11 +41,11 @@ C<IO::Async::Process> - start and manage a child process while( $$buffref =~ s/^(.*)\n// ) { print "Rot13 of 'hello world' is '$1'\n"; } - + return 0; }, }, - + on_finish => sub { $loop->stop; }, @@ -78,7 +78,8 @@ all its file descriptors. Invoked when the process exits by an exception from C<code>, or by failing to C<exec(2)> the given command. C<$errno> will be a dualvar, containing both -number and string values. +number and string values. After a successful C<exec()> call, this condition +can no longer happen. Note that this has a different name and a different argument order from C<< Loop->open_child >>'s C<on_error>. @@ -486,10 +487,12 @@ sub _add_to_loop setup => \@setup, - on_exit => sub { - ( undef, $exitcode, $dollarbang, $dollarat ) = @_; + on_exit => $self->_capture_weakself( sub { + ( my $self, undef, $exitcode, $dollarbang, $dollarat ) = @_; + + $self->debug_printf( "EXIT status=0x%04x", $exitcode ) if $self; $exit_future->done unless $exit_future->is_cancelled; - }, + } ), ); $self->{running} = 1; diff --git a/lib/IO/Async/Protocol.pm b/lib/IO/Async/Protocol.pm index ba360d0..c155963 100644 --- a/lib/IO/Async/Protocol.pm +++ b/lib/IO/Async/Protocol.pm @@ -8,7 +8,7 @@ package IO::Async::Protocol; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Notifier ); diff --git a/lib/IO/Async/Protocol/LineStream.pm b/lib/IO/Async/Protocol/LineStream.pm index 08db62a..f6148e9 100644 --- a/lib/IO/Async/Protocol/LineStream.pm +++ b/lib/IO/Async/Protocol/LineStream.pm @@ -8,7 +8,7 @@ package IO::Async::Protocol::LineStream; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Protocol::Stream ); diff --git a/lib/IO/Async/Protocol/Stream.pm b/lib/IO/Async/Protocol/Stream.pm index ac3fec9..11d1144 100644 --- a/lib/IO/Async/Protocol/Stream.pm +++ b/lib/IO/Async/Protocol/Stream.pm @@ -8,7 +8,7 @@ package IO::Async::Protocol::Stream; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Protocol ); diff --git a/lib/IO/Async/Resolver.pm b/lib/IO/Async/Resolver.pm index f8e9a20..10c0a15 100644 --- a/lib/IO/Async/Resolver.pm +++ b/lib/IO/Async/Resolver.pm @@ -1,15 +1,16 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2007-2013 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk package IO::Async::Resolver; use strict; use warnings; +use 5.010; use base qw( IO::Async::Function ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; # Socket 2.006 fails to getaddrinfo() AI_NUMERICHOST properly on MSWin32 use Socket 2.007 qw( @@ -211,7 +212,7 @@ sub resolve # Caller is not going to keep hold of the Future, so we have to ensure it # stays alive somehow - $future->on_ready( sub { undef $future } ); # intentional cycle + $self->adopt_future( $future->else( sub { Future->done } ) ); } =head2 @addrs = $resolver->getaddrinfo( %args )->get @@ -301,7 +302,7 @@ sub getaddrinfo croak "Expected 'on_error' or to return a Future"; my $host = $args{host} || ""; - my $service = $args{service} || ""; + my $service = $args{service} // ""; my $flags = $args{flags} || 0; $flags |= AI_PASSIVE if $args{passive}; @@ -352,7 +353,7 @@ sub getaddrinfo timeout => $args{timeout}, )->else( sub { my $message = shift; - Future->new->fail( $message, resolve => getaddrinfo => @_ ); + Future->fail( $message, resolve => getaddrinfo => @_ ); }); $future->on_done( $args{on_resolved} ) if $args{on_resolved}; @@ -362,7 +363,7 @@ sub getaddrinfo # Caller is not going to keep hold of the Future, so we have to ensure it # stays alive somehow - $future->on_ready( sub { undef $future } ); # intentional cycle + $self->adopt_future( $future->else( sub { Future->done } ) ); } =head2 ( $host, $service ) = $resolver->getnameinfo( %args )->get @@ -469,7 +470,7 @@ sub getnameinfo done => sub { @{ $_[0] } }, # unpack the ARRAY ref )->else( sub { my $message = shift; - Future->new->fail( $message, resolve => getnameinfo => @_ ); + Future->fail( $message, resolve => getnameinfo => @_ ); }); $future->on_done( $args{on_resolved} ) if $args{on_resolved}; @@ -479,7 +480,7 @@ sub getnameinfo # Caller is not going to keep hold of the Future, so we have to ensure it # stays alive somehow - $future->on_ready( sub { undef $future } ); # intentional cycle + $self->adopt_future( $future->else( sub { Future->done } ) ); } =head1 FUNCTIONS diff --git a/lib/IO/Async/Routine.pm b/lib/IO/Async/Routine.pm index ae8bef6..f9a5a3b 100644 --- a/lib/IO/Async/Routine.pm +++ b/lib/IO/Async/Routine.pm @@ -8,7 +8,7 @@ package IO::Async::Routine; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Notifier ); diff --git a/lib/IO/Async/Signal.pm b/lib/IO/Async/Signal.pm index bfdc758..4ef68f5 100644 --- a/lib/IO/Async/Signal.pm +++ b/lib/IO/Async/Signal.pm @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Notifier ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; diff --git a/lib/IO/Async/Socket.pm b/lib/IO/Async/Socket.pm index 5430ebb..23a4973 100644 --- a/lib/IO/Async/Socket.pm +++ b/lib/IO/Async/Socket.pm @@ -1,14 +1,14 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2011 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk package IO::Async::Socket; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Handle ); @@ -28,36 +28,27 @@ filehandle use IO::Async::Loop; my $loop = IO::Async::Loop->new; - $loop->connect( + my $socket = IO::Async::Socket->new( + on_recv => sub { + my ( $self, $dgram, $addr ) = @_; + + print "Received reply: $dgram\n", + $loop->stop; + }, + on_recv_error => sub { + my ( $self, $errno ) = @_; + die "Cannot recv - $errno\n"; + }, + ); + $loop->add( $socket ); + + $socket->connect( host => "some.host.here", service => "echo", socktype => 'dgram', + )->get; - on_connected => sub { - my ( $sock ) = @_; - - my $socket = IO::Async::Socket->new( - handle => $sock, - on_recv => sub { - my ( $self, $dgram, $addr ) = @_; - - print "Received reply: $dgram\n", - $loop->stop; - }, - on_recv_error => sub { - my ( $self, $errno ) = @_; - die "Cannot recv - $errno\n"; - }, - ); - - $loop->add( $socket ); - - $socket->send( "A TEST DATAGRAM" ); - }, - - on_resolve_error => sub { die "Cannot resolve - $_[0]\n"; }, - on_connect_error => sub { die "Cannot connect\n"; }, - ); + $socket->send( "A TEST DATAGRAM" ); $loop->run; @@ -326,18 +317,28 @@ sub on_write_ready =head1 EXAMPLES -=head2 Using a UDP Socket +=head2 Send-first on a UDP Socket C<UDP> is carried by the C<SOCK_DGRAM> socket type, for which the string C<'dgram'> is a convenient shortcut: - $loop->connect( + $socket->connect( host => $hostname, service => $service, socktype => 'dgram', ... ) +=head2 Receive-first on a UDP Socket + +A typical server pattern with C<UDP> involves binding a well-known port +number instead of connecting to one, and waiting on incoming packets. + + $socket->bind( + service => 12345, + socktype => 'dgram', + )->get; + =head1 SEE ALSO =over 4 diff --git a/lib/IO/Async/Stream.pm b/lib/IO/Async/Stream.pm index f0c0e55..487eb38 100644 --- a/lib/IO/Async/Stream.pm +++ b/lib/IO/Async/Stream.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2006-2014 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2006-2015 -- leonerd@leonerd.org.uk package IO::Async::Stream; @@ -9,7 +9,7 @@ use strict; use warnings; use 5.010; # // -our $VERSION = '0.64'; +our $VERSION = '0.67'; use base qw( IO::Async::Handle ); @@ -20,6 +20,8 @@ use Carp; use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL ); use Scalar::Util qw( blessed ); +use IO::Async::Debug; + # Tuneable from outside # Not yet documented our $READLEN = 8192; @@ -765,6 +767,12 @@ sub _flush_one_write die "TODO: head data does not contain a plain string" if ref $head->data; + if( $IO::Async::Debug::DEBUG > 1 ) { + my $data = substr $head->data, 0, $head->writelen; + $self->debug_printf( "WRITE len=%d", length $data ); + IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw}; + } + my $writer = $self->{writer}; my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen ); @@ -827,7 +835,10 @@ sub write if( defined wantarray ) { my $orig_on_flush = $on_flush; my $orig_on_error = $on_error; - $f = $self->loop->new_future; + + my $loop = $self->loop or + croak "Cannot ->write data returning a Future to a Stream not in a Loop"; + $f = $loop->new_future; $on_flush = sub { $f->done; $orig_on_flush->( @_ ) if $orig_on_flush; @@ -897,6 +908,8 @@ sub _flush_one_read my $self = shift; my ( $eof ) = @_; + local $self->{flushing_read} = 1; + my $readqueue = $self->{readqueue}; my $ret; @@ -969,6 +982,11 @@ sub _do_read return; } + if( $IO::Async::Debug::DEBUG > 1 ) { + $self->debug_printf( "READ len=%d", $len ); + IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr}; + } + my $eof = $self->{read_eof} = ( $len == 0 ); if( my $encoding = $self->{encoding} ) { @@ -1040,6 +1058,7 @@ sub push_on_read push @{ $self->{readqueue} }, Reader( $on_read, $args{future} ); # TODO: Should this always defer? + return if $self->{flushing_read}; 1 while length $self->{readbuff} and $self->_flush_one_read( 0 ); } @@ -1232,6 +1251,24 @@ sub connect return $self->SUPER::connect( socktype => "stream", @_ ); } +=head1 DEBUGGING FLAGS + +The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging: + +=over 4 + +=item C<Sr> + +Log byte buffers as data is read from a Stream + +=item C<Sw> + +Log byte buffers as data is written to a Stream + +=back + +=cut + =head1 EXAMPLES =head2 A line-based C<on_read> method @@ -1295,7 +1332,7 @@ C<on_read> method extracts messages in such a protocol. return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes - my ( $len, $x, $y ) = unpack $$buffref, "N n n"; + my ( $len, $x, $y ) = unpack "N n n", $$buffref; return 0 unless length $$buffref >= 8 + $len; diff --git a/lib/IO/Async/Test.pm b/lib/IO/Async/Test.pm index 1e8ff24..933330f 100644 --- a/lib/IO/Async/Test.pm +++ b/lib/IO/Async/Test.pm @@ -8,7 +8,7 @@ package IO::Async::Test; use strict; use warnings; -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Exporter 'import'; our @EXPORT = qw( diff --git a/lib/IO/Async/Timer.pm b/lib/IO/Async/Timer.pm index ccfe597..8e5961b 100644 --- a/lib/IO/Async/Timer.pm +++ b/lib/IO/Async/Timer.pm @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Notifier ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; diff --git a/lib/IO/Async/Timer/Absolute.pm b/lib/IO/Async/Timer/Absolute.pm index 0672da0..a925415 100644 --- a/lib/IO/Async/Timer/Absolute.pm +++ b/lib/IO/Async/Timer/Absolute.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2010-2011 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2010-2015 -- leonerd@leonerd.org.uk package IO::Async::Timer::Absolute; @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Timer ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; @@ -29,7 +29,7 @@ C<IO::Async::Timer::Absolute> - event callback at a fixed future time my @time = gmtime; my $timer = IO::Async::Timer::Absolute->new( - time => mktime( 0, 0, 0, $time[4]+1, $time[5], $time[6] ), + time => mktime( 0, 0, 0, $time[3]+1, $time[4], $time[5] ), on_expire => sub { print "It's midnight\n"; diff --git a/lib/IO/Async/Timer/Countdown.pm b/lib/IO/Async/Timer/Countdown.pm index 6ee88f5..201ba42 100644 --- a/lib/IO/Async/Timer/Countdown.pm +++ b/lib/IO/Async/Timer/Countdown.pm @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Timer ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; diff --git a/lib/IO/Async/Timer/Periodic.pm b/lib/IO/Async/Timer/Periodic.pm index 84da8d9..f99a43c 100644 --- a/lib/IO/Async/Timer/Periodic.pm +++ b/lib/IO/Async/Timer/Periodic.pm @@ -1,7 +1,7 @@ # You may distribute under the terms of either the GNU General Public License # or the Artistic License (the same terms as Perl itself) # -# (C) Paul Evans, 2009-2012 -- leonerd@leonerd.org.uk +# (C) Paul Evans, 2009-2015 -- leonerd@leonerd.org.uk package IO::Async::Timer::Periodic; @@ -9,7 +9,7 @@ use strict; use warnings; use base qw( IO::Async::Timer ); -our $VERSION = '0.64'; +our $VERSION = '0.67'; use Carp; @@ -223,10 +223,13 @@ sub _make_cb undef $self->{id}; - $self->invoke_event( on_tick => ); + my $ok = eval { $self->invoke_event( on_tick => ); 1 } or + my $e = $@; # detect ->stop $self->start if defined $self->{next_time}; + + die $e if !$ok; } ); } |