summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Schlichting <fsfs@debian.org>2021-09-01 22:40:40 +0800
committerFlorian Schlichting <fsfs@debian.org>2021-09-01 22:40:40 +0800
commitc5506a417a3c9914ef6dc36d0e7b6f84308309cd (patch)
tree7d1ea3d838590003af4f1a1f1ee9ad55c4759445
parent16d047efe56f844ae203582d398b7200010e5f3b (diff)
parent1af6c7552de129a387862c97c51ed6754a76fdc6 (diff)
Update upstream source from tag 'upstream/0.79'
Update to upstream version '0.79' with Debian dir a8174026e4a03d7ace20b510cb7897952ced6b57
-rw-r--r--Build.PL3
-rw-r--r--Changes10
-rw-r--r--MANIFEST2
-rw-r--r--META.json74
-rw-r--r--META.yml73
-rw-r--r--README56
-rw-r--r--lib/IO/Async.pm58
-rw-r--r--lib/IO/Async/Channel.pm88
-rw-r--r--lib/IO/Async/Debug.pm2
-rw-r--r--lib/IO/Async/File.pm28
-rw-r--r--lib/IO/Async/FileStream.pm60
-rw-r--r--lib/IO/Async/Function.pm156
-rw-r--r--lib/IO/Async/Future.pm12
-rw-r--r--lib/IO/Async/Handle.pm30
-rw-r--r--lib/IO/Async/Internals/ChildManager.pm2
-rw-r--r--lib/IO/Async/Internals/Connector.pm2
-rw-r--r--lib/IO/Async/Internals/FunctionWorker.pm62
-rw-r--r--lib/IO/Async/Listener.pm202
-rw-r--r--lib/IO/Async/Loop.pm152
-rw-r--r--lib/IO/Async/Loop/Poll.pm22
-rw-r--r--lib/IO/Async/Loop/Select.pm26
-rw-r--r--lib/IO/Async/LoopTests.pm8
-rw-r--r--lib/IO/Async/Notifier.pm130
-rw-r--r--lib/IO/Async/OS.pm30
-rw-r--r--lib/IO/Async/OS/MSWin32.pm2
-rw-r--r--lib/IO/Async/OS/cygwin.pm2
-rw-r--r--lib/IO/Async/OS/linux.pm2
-rw-r--r--lib/IO/Async/PID.pm48
-rw-r--r--lib/IO/Async/Process.pm290
-rw-r--r--lib/IO/Async/Protocol.pm6
-rw-r--r--lib/IO/Async/Protocol/LineStream.pm40
-rw-r--r--lib/IO/Async/Protocol/Stream.pm46
-rw-r--r--lib/IO/Async/Resolver.pm139
-rw-r--r--lib/IO/Async/Routine.pm389
-rw-r--r--lib/IO/Async/Signal.pm24
-rw-r--r--lib/IO/Async/Socket.pm66
-rw-r--r--lib/IO/Async/Stream.pm202
-rw-r--r--lib/IO/Async/Test.pm42
-rw-r--r--lib/IO/Async/Timer.pm4
-rw-r--r--lib/IO/Async/Timer/Absolute.pm30
-rw-r--r--lib/IO/Async/Timer/Countdown.pm102
-rw-r--r--lib/IO/Async/Timer/Periodic.pm26
-rw-r--r--t/25socket.t2
-rw-r--r--t/41routine.t36
-rw-r--r--t/42function.t23
-rw-r--r--t/RoutineTester.pm16
46 files changed, 1594 insertions, 1231 deletions
diff --git a/Build.PL b/Build.PL
index 513bbfb..8a8a0b9 100644
--- a/Build.PL
+++ b/Build.PL
@@ -12,7 +12,8 @@ my $build = Module::Build->new(
'File::stat' => 0,
'IO::Poll' => 0,
'List::Util' => 0,
- 'Socket' => '2.007',
+ # Require Socket 2.029 on MSWin32 because of AF_UNIX (RT133018)
+ ( 'Socket' => ( $^O eq "MSWin32" ? '2.029' : '2.007' ) ),
'Storable' => 0,
'Struct::Dumb' => 0,
'Time::HiRes' => 0,
diff --git a/Changes b/Changes
index f62197e..b7261c3 100644
--- a/Changes
+++ b/Changes
@@ -1,5 +1,15 @@
Revision history for IO-Async
+0.79 2021-08-06
+ [CHANGES]
+ * Permit IO::Async::Routine or Function by module+func names instead
+ of CODE reference
+ * Added new Routine/Function model of `spawn`
+ * Implement IO::Async::Resolver by module+func rather than code
+
+ [BUGFIXES]
+ * Require Socket 2.029 on MSWin32 because of AF_UNIX fix (RT133018)
+
0.78 2021-01-21
[CHANGES]
* Warn on attempts to ->connect to INADDR(6)_LOOPBACK as some OSes
diff --git a/MANIFEST b/MANIFEST
index d9007f0..d169168 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -18,6 +18,7 @@ lib/IO/Async/Future.pm
lib/IO/Async/Handle.pm
lib/IO/Async/Internals/ChildManager.pm
lib/IO/Async/Internals/Connector.pm
+lib/IO/Async/Internals/FunctionWorker.pm
lib/IO/Async/Internals/TimeQueue.pm
lib/IO/Async/Listener.pm
lib/IO/Async/Loop.pm
@@ -114,5 +115,6 @@ t/63handle-connect.t
t/64handle-bind.t
t/70future-io.t
t/99pod.t
+t/RoutineTester.pm
t/StupidLoop.pm
t/TimeAbout.pm
diff --git a/META.json b/META.json
index b5df7a1..9ff8c22 100644
--- a/META.json
+++ b/META.json
@@ -54,142 +54,146 @@
},
"IO::Async" : {
"file" : "lib/IO/Async.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Channel" : {
"file" : "lib/IO/Async/Channel.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Debug" : {
"file" : "lib/IO/Async/Debug.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::File" : {
"file" : "lib/IO/Async/File.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::FileStream" : {
"file" : "lib/IO/Async/FileStream.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Function" : {
"file" : "lib/IO/Async/Function.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Future" : {
"file" : "lib/IO/Async/Future.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Handle" : {
"file" : "lib/IO/Async/Handle.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Internals::ChildManager" : {
"file" : "lib/IO/Async/Internals/ChildManager.pm",
- "version" : "0.78"
+ "version" : "0.79"
+ },
+ "IO::Async::Internals::FunctionWorker" : {
+ "file" : "lib/IO/Async/Internals/FunctionWorker.pm",
+ "version" : "0.79"
},
"IO::Async::Listener" : {
"file" : "lib/IO/Async/Listener.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Loop" : {
"file" : "lib/IO/Async/Loop.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Loop::Poll" : {
"file" : "lib/IO/Async/Loop/Poll.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Loop::Select" : {
"file" : "lib/IO/Async/Loop/Select.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::LoopTests" : {
"file" : "lib/IO/Async/LoopTests.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Metrics" : {
"file" : "lib/IO/Async/Metrics.pm"
},
"IO::Async::Notifier" : {
"file" : "lib/IO/Async/Notifier.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::OS" : {
"file" : "lib/IO/Async/OS.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::OS::MSWin32" : {
"file" : "lib/IO/Async/OS/MSWin32.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::OS::cygwin" : {
"file" : "lib/IO/Async/OS/cygwin.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::OS::linux" : {
"file" : "lib/IO/Async/OS/linux.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::PID" : {
"file" : "lib/IO/Async/PID.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Process" : {
"file" : "lib/IO/Async/Process.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Protocol" : {
"file" : "lib/IO/Async/Protocol.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Protocol::LineStream" : {
"file" : "lib/IO/Async/Protocol/LineStream.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Protocol::Stream" : {
"file" : "lib/IO/Async/Protocol/Stream.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Resolver" : {
"file" : "lib/IO/Async/Resolver.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Routine" : {
"file" : "lib/IO/Async/Routine.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Signal" : {
"file" : "lib/IO/Async/Signal.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Socket" : {
"file" : "lib/IO/Async/Socket.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Stream" : {
"file" : "lib/IO/Async/Stream.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Test" : {
"file" : "lib/IO/Async/Test.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Timer" : {
"file" : "lib/IO/Async/Timer.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Timer::Absolute" : {
"file" : "lib/IO/Async/Timer/Absolute.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Timer::Countdown" : {
"file" : "lib/IO/Async/Timer/Countdown.pm",
- "version" : "0.78"
+ "version" : "0.79"
},
"IO::Async::Timer::Periodic" : {
"file" : "lib/IO/Async/Timer/Periodic.pm",
- "version" : "0.78"
+ "version" : "0.79"
}
},
"release_status" : "stable",
@@ -199,6 +203,6 @@
],
"x_IRC" : "irc://irc.perl.org/#io-async"
},
- "version" : "0.78",
+ "version" : "0.79",
"x_serialization_backend" : "JSON::PP version 4.05"
}
diff --git a/META.yml b/META.yml
index bc3ad32..250d49e 100644
--- a/META.yml
+++ b/META.yml
@@ -23,108 +23,111 @@ provides:
file: lib/Future/IO/Impl/IOAsync.pm
IO::Async:
file: lib/IO/Async.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Channel:
file: lib/IO/Async/Channel.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Debug:
file: lib/IO/Async/Debug.pm
- version: '0.78'
+ version: '0.79'
IO::Async::File:
file: lib/IO/Async/File.pm
- version: '0.78'
+ version: '0.79'
IO::Async::FileStream:
file: lib/IO/Async/FileStream.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Function:
file: lib/IO/Async/Function.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Future:
file: lib/IO/Async/Future.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Handle:
file: lib/IO/Async/Handle.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Internals::ChildManager:
file: lib/IO/Async/Internals/ChildManager.pm
- version: '0.78'
+ version: '0.79'
+ IO::Async::Internals::FunctionWorker:
+ file: lib/IO/Async/Internals/FunctionWorker.pm
+ version: '0.79'
IO::Async::Listener:
file: lib/IO/Async/Listener.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Loop:
file: lib/IO/Async/Loop.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Loop::Poll:
file: lib/IO/Async/Loop/Poll.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Loop::Select:
file: lib/IO/Async/Loop/Select.pm
- version: '0.78'
+ version: '0.79'
IO::Async::LoopTests:
file: lib/IO/Async/LoopTests.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Metrics:
file: lib/IO/Async/Metrics.pm
IO::Async::Notifier:
file: lib/IO/Async/Notifier.pm
- version: '0.78'
+ version: '0.79'
IO::Async::OS:
file: lib/IO/Async/OS.pm
- version: '0.78'
+ version: '0.79'
IO::Async::OS::MSWin32:
file: lib/IO/Async/OS/MSWin32.pm
- version: '0.78'
+ version: '0.79'
IO::Async::OS::cygwin:
file: lib/IO/Async/OS/cygwin.pm
- version: '0.78'
+ version: '0.79'
IO::Async::OS::linux:
file: lib/IO/Async/OS/linux.pm
- version: '0.78'
+ version: '0.79'
IO::Async::PID:
file: lib/IO/Async/PID.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Process:
file: lib/IO/Async/Process.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Protocol:
file: lib/IO/Async/Protocol.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Protocol::LineStream:
file: lib/IO/Async/Protocol/LineStream.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Protocol::Stream:
file: lib/IO/Async/Protocol/Stream.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Resolver:
file: lib/IO/Async/Resolver.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Routine:
file: lib/IO/Async/Routine.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Signal:
file: lib/IO/Async/Signal.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Socket:
file: lib/IO/Async/Socket.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Stream:
file: lib/IO/Async/Stream.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Test:
file: lib/IO/Async/Test.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Timer:
file: lib/IO/Async/Timer.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Timer::Absolute:
file: lib/IO/Async/Timer/Absolute.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Timer::Countdown:
file: lib/IO/Async/Timer/Countdown.pm
- version: '0.78'
+ version: '0.79'
IO::Async::Timer::Periodic:
file: lib/IO/Async/Timer/Periodic.pm
- version: '0.78'
+ version: '0.79'
recommends:
IO::Socket::IP: '0'
requires:
@@ -142,5 +145,5 @@ requires:
resources:
IRC: irc://irc.perl.org/#io-async
license: http://dev.perl.org/licenses/
-version: '0.78'
+version: '0.79'
x_serialization_backend: 'CPAN::Meta::YAML version 0.018'
diff --git a/README b/README
index f32ebb6..1171a06 100644
--- a/README
+++ b/README
@@ -4,41 +4,41 @@ NAME
SYNOPSIS
- use IO::Async::Stream;
- use IO::Async::Loop;
+ use IO::Async::Stream;
+ use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ my $loop = IO::Async::Loop->new;
- $loop->connect(
- host => "some.other.host",
- service => 12345,
- socktype => 'stream',
+ $loop->connect(
+ host => "some.other.host",
+ service => 12345,
+ socktype => 'stream',
- on_stream => sub {
- my ( $stream ) = @_;
+ on_stream => sub {
+ my ( $stream ) = @_;
- $stream->configure(
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
+ $stream->configure(
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*\n)// ) {
- print "Received a line $1";
- }
+ while( $$buffref =~ s/^(.*\n)// ) {
+ print "Received a line $1";
+ }
- return 0;
- }
- );
+ return 0;
+ }
+ );
- $stream->write( "An initial line here\n" );
+ $stream->write( "An initial line here\n" );
- $loop->add( $stream );
- },
+ $loop->add( $stream );
+ },
- on_resolve_error => sub { die "Cannot resolve - $_[-1]\n"; },
- on_connect_error => sub { die "Cannot connect - $_[0] failed $_[-1]\n"; },
- );
+ on_resolve_error => sub { die "Cannot resolve - $_[-1]\n"; },
+ on_connect_error => sub { die "Cannot connect - $_[0] failed $_[-1]\n"; },
+ );
- $loop->run;
+ $loop->run;
DESCRIPTION
@@ -202,9 +202,9 @@ DESCRIPTION
conventions for the Future's fail arguments to relate it to the legacy
on_error-style callbacks.
- $on_NAME_error->( $message, @argmuents )
+ $on_NAME_error->( $message, @argmuents )
- $f->fail( $message, NAME, @arguments )
+ $f->fail( $message, NAME, @arguments )
where $message is a message intended for humans to read (so that this
is the message displayed by $f->get if the failure is not otherwise
@@ -255,7 +255,7 @@ SUPPORT
Bugs may be reported via RT at
- https://rt.cpan.org/Public/Dist/Display.html?Name=IO-Async
+ https://rt.cpan.org/Public/Dist/Display.html?Name=IO-Async
Support by IRC may also be found on irc.perl.org in the #io-async
channel.
diff --git a/lib/IO/Async.pm b/lib/IO/Async.pm
index eab3019..f142456 100644
--- a/lib/IO/Async.pm
+++ b/lib/IO/Async.pm
@@ -12,7 +12,7 @@ use warnings;
# It is provided simply to keep CPAN happy:
# cpan -i IO::Async
-our $VERSION = '0.78';
+our $VERSION = '0.79';
=head1 NAME
@@ -20,41 +20,41 @@ C<IO::Async> - Asynchronous event-driven programming
=head1 SYNOPSIS
- use IO::Async::Stream;
- use IO::Async::Loop;
+ use IO::Async::Stream;
+ use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ my $loop = IO::Async::Loop->new;
- $loop->connect(
- host => "some.other.host",
- service => 12345,
- socktype => 'stream',
+ $loop->connect(
+ host => "some.other.host",
+ service => 12345,
+ socktype => 'stream',
- on_stream => sub {
- my ( $stream ) = @_;
+ on_stream => sub {
+ my ( $stream ) = @_;
- $stream->configure(
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
+ $stream->configure(
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*\n)// ) {
- print "Received a line $1";
- }
+ while( $$buffref =~ s/^(.*\n)// ) {
+ print "Received a line $1";
+ }
- return 0;
- }
- );
+ return 0;
+ }
+ );
- $stream->write( "An initial line here\n" );
+ $stream->write( "An initial line here\n" );
- $loop->add( $stream );
- },
+ $loop->add( $stream );
+ },
- on_resolve_error => sub { die "Cannot resolve - $_[-1]\n"; },
- on_connect_error => sub { die "Cannot connect - $_[0] failed $_[-1]\n"; },
- );
+ on_resolve_error => sub { die "Cannot resolve - $_[-1]\n"; },
+ on_connect_error => sub { die "Cannot connect - $_[0] failed $_[-1]\n"; },
+ );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -220,9 +220,9 @@ failures that are reported will, in general, use the same conventions for the
Future's C<fail> arguments to relate it to the legacy C<on_error>-style
callbacks.
- $on_NAME_error->( $message, @argmuents )
+ $on_NAME_error->( $message, @argmuents )
- $f->fail( $message, NAME, @arguments )
+ $f->fail( $message, NAME, @arguments )
where C<$message> is a message intended for humans to read (so that this is
the message displayed by C<< $f->get >> if the failure is not otherwise
@@ -284,7 +284,7 @@ things like L<IO::Async::Process> exits, or L<IO::Async::Handle> close.
Bugs may be reported via RT at
- https://rt.cpan.org/Public/Dist/Display.html?Name=IO-Async
+ https://rt.cpan.org/Public/Dist/Display.html?Name=IO-Async
Support by IRC may also be found on F<irc.perl.org> in the F<#io-async>
channel.
diff --git a/lib/IO/Async/Channel.pm b/lib/IO/Async/Channel.pm
index 512fdc0..06bfec4 100644
--- a/lib/IO/Async/Channel.pm
+++ b/lib/IO/Async/Channel.pm
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2011-2019 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
package IO::Async::Channel;
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Notifier );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -82,6 +82,26 @@ its containing L<IO::Async::Routine> object.
=cut
+# Undocumented convenience constructors for running IaRoutine in 'spawn' mode
+sub new_sync
+{
+ my $class = shift;
+ my ( $fd ) = @_;
+
+ my $self = $class->new;
+ $self->setup_sync_mode( $fd );
+ return $self;
+}
+
+sub new_stdin { shift->new_sync( \*STDIN ); }
+sub new_stdout { shift->new_sync( \*STDOUT ); }
+
+sub DESTROY
+{
+ my $self = shift;
+ eval { $self->close }; # ignore any error
+}
+
=head1 METHODS
The following methods documented with a trailing call to C<< ->get >> return
@@ -103,14 +123,14 @@ used to change details of the Channel's operation.
May only be set on an async mode channel. If present, will be invoked whenever
a new value is received, rather than using the C<recv> method.
- $on_recv->( $channel, $data )
+ $on_recv->( $channel, $data )
=item on_eof => CODE
May only be set on an async mode channel. If present, will be invoked when the
channel gets closed by the peer.
- $on_eof->( $channel )
+ $on_eof->( $channel )
=back
@@ -196,14 +216,25 @@ method will not block.
=cut
+my %SENDMETHODS;
sub send
{
my $self = shift;
my ( $data ) = @_;
- $self->send_encoded( $self->{encode}->( $data ) );
+ defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up";
+
+ my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) )
+ or die "IO::Async::Channel cannot send in unrecognised mode '$mode'";
+
+ $self->$code( $data );
}
+*_send_sync = *_send_async = sub {
+ my ( $self, $data ) = @_;
+ $self->send_encoded( $self->{encode}->( $data ) );
+};
+
=head2 send_encoded
$channel->send_encoded( $record )
@@ -222,8 +253,8 @@ sub send_encoded
defined $self->{mode} or die "Cannot ->send without being set up";
- return $self->_send_sync( $bytes ) if $self->{mode} eq "sync";
- return $self->_send_async( $bytes ) if $self->{mode} eq "async";
+ return $self->_sendbytes_sync( $bytes ) if $self->{mode} eq "sync";
+ return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async";
}
=head2 encode
@@ -233,8 +264,8 @@ sub send_encoded
Takes a Perl reference and returns a serialised string that can be passed to
C<send_encoded>. The following two forms are equivalent
- $channel->send( $data )
- $channel->send_encoded( $channel->encode( $data ) )
+ $channel->send( $data )
+ $channel->send_encoded( $channel->encode( $data ) )
This is provided for the use-case where data needs to be serialised into a
fixed string to "snapshot it" but not sent yet; the returned string can be
@@ -260,19 +291,6 @@ sub encode
)->( $data );
}
-=head2 send_frozen
-
- $channel->send_frozen( $record )
-
-Legacy name for C<send_encoded>. This is no longer preferred as it expects
-the data to be encoded using C<Storable>, which prevents (or at least makes
-more awkward) the use of other codecs on a channel by default. This method
-should not be used in new code and may be removed in a later version.
-
-=cut
-
-*send_frozen = \&send_encoded;
-
=head2 recv
$data = $channel->recv
@@ -302,27 +320,30 @@ When not returning a future, takes the following named arguments:
Called when a new Perl reference value is available. Will be passed the
Channel object and the reference data.
- $on_recv->( $channel, $data )
+ $on_recv->( $channel, $data )
=item on_eof => CODE
Called if the Channel was closed before a new value was ready. Will be passed
the Channel object.
- $on_eof->( $channel )
+ $on_eof->( $channel )
=back
=cut
+my %RECVMETHODS;
sub recv
{
my $self = shift;
- defined $self->{mode} or die "Cannot ->recv without being set up";
+ defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up";
- return $self->_recv_sync( @_ ) if $self->{mode} eq "sync";
- return $self->_recv_async( @_ ) if $self->{mode} eq "async";
+ my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) )
+ or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'";
+
+ return $self->$code( @_ );
}
=head2 close
@@ -334,12 +355,17 @@ or the queued C<on_eof> callbacks to be invoked.
=cut
+my %CLOSEMETHODS;
sub close
{
my $self = shift;
- return $self->_close_sync if $self->{mode} eq "sync";
- return $self->_close_async if $self->{mode} eq "async";
+ defined( my $mode = $self->{mode} ) or return;
+
+ my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) )
+ or die "IO::Async::Channel cannot close in unrecognised mode '$mode'";
+
+ return $self->$code;
}
# Leave this undocumented for now
@@ -388,7 +414,7 @@ sub _recv_sync
return $self->{decode}->( $record );
}
-sub _send_sync
+sub _sendbytes_sync
{
my $self = shift;
my ( $bytes ) = @_;
@@ -434,7 +460,7 @@ sub _build_stream
};
}
-sub _send_async
+sub _sendbytes_async
{
my $self = shift;
my ( $bytes ) = @_;
diff --git a/lib/IO/Async/Debug.pm b/lib/IO/Async/Debug.pm
index 3548b2d..8fd4fdc 100644
--- a/lib/IO/Async/Debug.pm
+++ b/lib/IO/Async/Debug.pm
@@ -8,7 +8,7 @@ package IO::Async::Debug;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
our $DEBUG = $ENV{IO_ASYNC_DEBUG} || 0;
our $DEBUG_FD = $ENV{IO_ASYNC_DEBUG_FD};
diff --git a/lib/IO/Async/File.pm b/lib/IO/Async/File.pm
index d6ef72d..6d7eeaa 100644
--- a/lib/IO/Async/File.pm
+++ b/lib/IO/Async/File.pm
@@ -8,7 +8,7 @@ package IO::Async::File;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Timer::Periodic );
@@ -24,23 +24,23 @@ C<IO::Async::File> - watch a file for changes
=head1 SYNOPSIS
- use IO::Async::File;
+ use IO::Async::File;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $file = IO::Async::File->new(
- filename => "config.ini",
- on_mtime_changed => sub {
- my ( $self ) = @_;
- print STDERR "Config file has changed\n";
- reload_config( $self->handle );
- }
- );
+ my $file = IO::Async::File->new(
+ filename => "config.ini",
+ on_mtime_changed => sub {
+ my ( $self ) = @_;
+ print STDERR "Config file has changed\n";
+ reload_config( $self->handle );
+ }
+ );
- $loop->add( $file );
+ $loop->add( $file );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
diff --git a/lib/IO/Async/FileStream.pm b/lib/IO/Async/FileStream.pm
index bcc000b..432ca79 100644
--- a/lib/IO/Async/FileStream.pm
+++ b/lib/IO/Async/FileStream.pm
@@ -8,7 +8,7 @@ package IO::Async::FileStream;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Stream );
@@ -23,36 +23,36 @@ C<IO::Async::FileStream> - read the tail of a file
=head1 SYNOPSIS
- use IO::Async::FileStream;
+ use IO::Async::FileStream;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- open my $logh, "<", "var/logs/daemon.log" or
- die "Cannot open logfile - $!";
+ open my $logh, "<", "var/logs/daemon.log" or
+ die "Cannot open logfile - $!";
- my $filestream = IO::Async::FileStream->new(
- read_handle => $logh,
+ my $filestream = IO::Async::FileStream->new(
+ read_handle => $logh,
- on_initial => sub {
- my ( $self ) = @_;
- $self->seek_to_last( "\n" );
- },
+ on_initial => sub {
+ my ( $self ) = @_;
+ $self->seek_to_last( "\n" );
+ },
- on_read => sub {
- my ( $self, $buffref ) = @_;
+ on_read => sub {
+ my ( $self, $buffref ) = @_;
- while( $$buffref =~ s/^(.*\n)// ) {
- print "Received a line $1";
- }
+ while( $$buffref =~ s/^(.*\n)// ) {
+ print "Received a line $1";
+ }
- return 0;
- },
- );
+ return 0;
+ },
+ );
- $loop->add( $filestream );
+ $loop->add( $filestream );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -281,10 +281,10 @@ C<SEEK_SET> if not provided.
Normally this would be used to seek to the end of the file, for example
- on_initial => sub {
- my ( $self, $filesize ) = @_;
- $self->seek( $filesize );
- }
+ on_initial => sub {
+ my ( $self, $filesize ) = @_;
+ $self->seek( $filesize );
+ }
=cut
@@ -342,10 +342,10 @@ This is most likely useful for seeking after the last complete line in a
line-based log file, to commence reading from the end, while still managing to
capture any partial content that isn't yet a complete line.
- on_initial => sub {
- my $self = shift;
- $self->seek_to_last( "\n" );
- }
+ on_initial => sub {
+ my $self = shift;
+ $self->seek_to_last( "\n" );
+ }
=cut
diff --git a/lib/IO/Async/Function.pm b/lib/IO/Async/Function.pm
index 38695a6..3c83a78 100644
--- a/lib/IO/Async/Function.pm
+++ b/lib/IO/Async/Function.pm
@@ -1,14 +1,14 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2011-2019 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
package IO::Async::Function;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Notifier );
use IO::Async::Timer::Countdown;
@@ -27,28 +27,28 @@ C<IO::Async::Function> - call a function asynchronously
=head1 SYNOPSIS
- use IO::Async::Function;
+ use IO::Async::Function;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $function = IO::Async::Function->new(
- code => sub {
- my ( $number ) = @_;
- return is_prime( $number );
- },
- );
+ my $function = IO::Async::Function->new(
+ code => sub {
+ my ( $number ) = @_;
+ return is_prime( $number );
+ },
+ );
- $loop->add( $function );
+ $loop->add( $function );
- $function->call(
- args => [ 123454321 ],
- )->on_done( sub {
- my $isprime = shift;
- print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n";
- })->on_fail( sub {
- print STDERR "Cannot determine if it's prime - $_[0]\n";
- })->get;
+ $function->call(
+ args => [ 123454321 ],
+ )->on_done( sub {
+ my $isprime = shift;
+ print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n";
+ })->on_fail( sub {
+ print STDERR "Cannot determine if it's prime - $_[0]\n";
+ })->get;
=head1 DESCRIPTION
@@ -111,7 +111,7 @@ The following named parameters may be passed to C<new> or C<configure>:
The body of the function to execute.
- @result = $code->( @args )
+ @result = $code->( @args )
=head2 init_code => CODE
@@ -119,9 +119,33 @@ Optional. If defined, this is invoked exactly once in every child process or
thread, after it is created, but before the first invocation of the function
body itself.
- $init_code->()
+ $init_code->()
+
+=head2 module => STRING
+
+=head2 func => STRING
+
+An alternative to the C<code> argument, which names a module to load and a
+function to call within it. C<module> should give a perl module name (i.e.
+C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give
+the basename of a function within that module (i.e. without the module name
+prefixed). It will be invoked, without extra arguments, as the main code
+body of the object.
+
+The task of loading this module and resolving the resulting function from it
+is only performed on the remote worker side, so the controlling process will
+not need to actually load the module.
-=head2 model => "fork" | "thread"
+=head2 init_func => STRING or ARRAY [ STRING, ... ]
+
+Optional addition to the C<module> and C<func> alternatives. Names a function
+within the module to call each time a new worker is created.
+
+If this value is an array reference, its first element must be a string giving
+the name of the function; the remaining values are passed to that function as
+arguments.
+
+=head2 model => "fork" | "thread" | "spawn"
Optional. Requests a specific L<IO::Async::Routine> model. If not supplied,
leaves the default choice up to Routine.
@@ -229,10 +253,15 @@ sub configure
my $need_restart;
- foreach (qw( init_code code setup )) {
+ foreach (qw( init_code code module init_func func setup )) {
$need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
}
+ defined $self->{code} and defined $self->{func} and
+ croak "Cannot ->configure both 'code' and 'func'";
+ defined $self->{func} and !defined $self->{module} and
+ croak "'func' parameter requires a 'module' as well";
+
$self->SUPER::configure( %params );
if( $need_restart and $self->loop ) {
@@ -386,12 +415,12 @@ arguments give continuations to handle successful results or failure.
A continuation that is invoked when the code has been executed. If the code
returned normally, it is called as:
- $on_result->( 'return', @values )
+ $on_result->( 'return', @values )
If the code threw an exception, or some other error occurred such as a closed
connection or the process died, it is called as:
- $on_result->( 'error', $exception_name )
+ $on_result->( 'error', $exception_name )
=item on_return => CODE and on_error => CODE
@@ -562,7 +591,7 @@ sub _new_worker
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
- ( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
+ ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
@@ -637,8 +666,12 @@ package # hide from indexer
use base qw( IO::Async::Routine );
+use Carp;
+
use IO::Async::Channel;
+use IO::Async::Internals::FunctionWorker;
+
sub new
{
my $class = shift;
@@ -647,28 +680,31 @@ sub new
my $arg_channel = IO::Async::Channel->new;
my $ret_channel = IO::Async::Channel->new;
- my $init = delete $params{init_code};
- my $code = delete $params{code};
- $params{code} = sub {
- $init->() if defined $init;
+ my $send_initial;
- while( my $args = $arg_channel->recv ) {
- my @ret;
- my $ok = eval { @ret = $code->( @$args ); 1 };
+ if( defined( my $code = $params{code} ) ) {
+ my $init_code = $params{init_code};
- if( $ok ) {
- $ret_channel->send( [ r => @ret ] );
- }
- elsif( ref $@ ) {
- # Presume that $@ is an ARRAYref of error results
- $ret_channel->send( [ e => @{ $@ } ] );
- }
- else {
- chomp( my $e = "$@" );
- $ret_channel->send( [ e => $e, error => ] );
- }
- }
- };
+ $params{code} = sub {
+ $init_code->() if defined $init_code;
+
+ IO::Async::Internals::FunctionWorker::runloop( $code, $arg_channel, $ret_channel );
+ };
+ }
+ elsif( defined( my $func = $params{func} ) ) {
+ my $module = $params{module};
+ my $init_func = $params{init_func};
+ my @init_args;
+
+ $params{module} = "IO::Async::Internals::FunctionWorker";
+ $params{func} = "run_worker";
+
+ ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";
+
+ $send_initial = [ $module, $func, $init_func, @init_args ];
+ }
+
+ delete @params{qw( init_code init_func )};
my $worker = $class->SUPER::new(
%params,
@@ -679,9 +715,19 @@ sub new
$worker->{arg_channel} = $arg_channel;
$worker->{ret_channel} = $ret_channel;
+ $worker->{send_initial} = $send_initial if $send_initial;
+
return $worker;
}
+sub _add_to_loop
+{
+ my $self = shift;
+ $self->SUPER::_add_to_loop( @_ );
+
+ $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
+}
+
sub configure
{
my $self = shift;
@@ -769,15 +815,15 @@ sub call
The array-unpacking form of exception indiciation allows the function body to
more precicely control the resulting failure from the C<call> future.
- my $divider = IO::Async::Function->new(
- code => sub {
- my ( $numerator, $divisor ) = @_;
- $divisor == 0 and
- die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
+ my $divider = IO::Async::Function->new(
+ code => sub {
+ my ( $numerator, $divisor ) = @_;
+ $divisor == 0 and
+ die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
- return $numerator / $divisor;
- }
- );
+ return $numerator / $divisor;
+ }
+ );
=head1 NOTES
diff --git a/lib/IO/Async/Future.pm b/lib/IO/Async/Future.pm
index 7e8de42..4cdc3e1 100644
--- a/lib/IO/Async/Future.pm
+++ b/lib/IO/Async/Future.pm
@@ -8,7 +8,7 @@ package IO::Async::Future;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( Future );
Future->VERSION( '0.05' ); # to respect subclassing
@@ -21,15 +21,15 @@ C<IO::Async::Future> - use L<Future> with L<IO::Async>
=head1 SYNOPSIS
- use IO::Async::Loop;
+ use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ my $loop = IO::Async::Loop->new;
- my $future = $loop->new_future;
+ my $future = $loop->new_future;
- $loop->watch_time( after => 3, code => sub { $future->done( "Done" ) } );
+ $loop->watch_time( after => 3, code => sub { $future->done( "Done" ) } );
- print $future->get, "\n";
+ print $future->get, "\n";
=head1 DESCRIPTION
diff --git a/lib/IO/Async/Handle.pm b/lib/IO/Async/Handle.pm
index 864a177..96eeea0 100644
--- a/lib/IO/Async/Handle.pm
+++ b/lib/IO/Async/Handle.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Notifier );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -31,24 +31,24 @@ to handle more specific cases. Here is an example of how it would be used to
watch a listening socket for new connections. In real code, it is likely that
the C<< Loop->listen >> method would be used instead.
- use IO::Socket::INET;
- use IO::Async::Handle;
+ use IO::Socket::INET;
+ use IO::Async::Handle;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $socket = IO::Socket::INET->new( LocalPort => 1234, Listen => 1 );
+ my $socket = IO::Socket::INET->new( LocalPort => 1234, Listen => 1 );
- my $handle = IO::Async::Handle->new(
- handle => $socket,
+ my $handle = IO::Async::Handle->new(
+ handle => $socket,
- on_read_ready => sub {
- my $new_client = $socket->accept;
- ...
- },
- );
+ on_read_ready => sub {
+ my $new_client = $socket->accept;
+ ...
+ },
+ );
- $loop->add( $handle );
+ $loop->add( $handle );
For most other uses with sockets, pipes or other filehandles that carry a byte
stream, the L<IO::Async::Stream> class is likely to be more suitable. For
@@ -385,7 +385,7 @@ sub set_handles
Shortcut for
- $handle->configure( handle => $fh )
+ $handle->configure( handle => $fh )
=cut
diff --git a/lib/IO/Async/Internals/ChildManager.pm b/lib/IO/Async/Internals/ChildManager.pm
index bca19b7..3a383e1 100644
--- a/lib/IO/Async/Internals/ChildManager.pm
+++ b/lib/IO/Async/Internals/ChildManager.pm
@@ -8,7 +8,7 @@ package IO::Async::Internals::ChildManager;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
# Not a notifier
diff --git a/lib/IO/Async/Internals/Connector.pm b/lib/IO/Async/Internals/Connector.pm
index 79111b8..7b54b09 100644
--- a/lib/IO/Async/Internals/Connector.pm
+++ b/lib/IO/Async/Internals/Connector.pm
@@ -9,7 +9,7 @@ package # hide from CPAN
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Scalar::Util qw( weaken blessed );
diff --git a/lib/IO/Async/Internals/FunctionWorker.pm b/lib/IO/Async/Internals/FunctionWorker.pm
new file mode 100644
index 0000000..4c4665d
--- /dev/null
+++ b/lib/IO/Async/Internals/FunctionWorker.pm
@@ -0,0 +1,62 @@
+# You may distribute under the terms of either the GNU General Public License
+# or the Artistic License (the same terms as Perl itself)
+#
+# (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
+
+package IO::Async::Internals::FunctionWorker;
+
+use strict;
+use warnings;
+
+our $VERSION = '0.79';
+
+# Called directly by IO::Async::Function::Worker when used in "code" mode,
+# or by run_worker() below.
+sub runloop
+{
+ my ( $code, $arg_channel, $ret_channel ) = @_;
+
+ while( my $args = $arg_channel->recv ) {
+ my @ret;
+ my $ok = eval { @ret = $code->( @$args ); 1 };
+
+ if( $ok ) {
+ $ret_channel->send( [ r => @ret ] );
+ }
+ elsif( ref $@ ) {
+ # Presume that $@ is an ARRAYref of error results
+ $ret_channel->send( [ e => @{ $@ } ] );
+ }
+ else {
+ chomp( my $e = "$@" );
+ $ret_channel->send( [ e => $e, error => ] );
+ }
+ }
+}
+
+# Called by IO::Async::Function::Worker via the module+func arguments to its
+# IO::Async::Routine superclass when used in "module+func" mode
+sub run_worker
+{
+ my ( $arg_channel, $ret_channel ) = @_;
+
+ # Setup args
+ my ( $module, $func, $init_func, @init_args ) = @{ $arg_channel->recv };
+
+ ( my $file = "$module.pm" ) =~ s{::}{/}g;
+ require $file;
+
+ my $code = $module->can( $func ) or
+ die "Module $module does not provide a function called $func\n";
+
+ if( defined $init_func ) {
+ my $init = $module->can( $init_func ) or
+ die "Module $module does not provide a function called $init_func\n";
+
+ $init->( @init_args );
+ }
+
+ runloop( $code, $arg_channel, $ret_channel );
+}
+
+0x55AA;
diff --git a/lib/IO/Async/Listener.pm b/lib/IO/Async/Listener.pm
index dc1bf23..5fefe49 100644
--- a/lib/IO/Async/Listener.pm
+++ b/lib/IO/Async/Listener.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Handle );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use IO::Async::Handle;
use IO::Async::OS;
@@ -28,54 +28,54 @@ C<IO::Async::Listener> - listen on network sockets for incoming connections
=head1 SYNOPSIS
- use IO::Async::Listener;
+ use IO::Async::Listener;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $listener = IO::Async::Listener->new(
- on_stream => sub {
- my ( undef, $stream ) = @_;
+ my $listener = IO::Async::Listener->new(
+ on_stream => sub {
+ my ( undef, $stream ) = @_;
- $stream->configure(
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
- $self->write( $$buffref );
- $$buffref = "";
- return 0;
- },
- );
+ $stream->configure(
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
+ $self->write( $$buffref );
+ $$buffref = "";
+ return 0;
+ },
+ );
- $loop->add( $stream );
- },
- );
+ $loop->add( $stream );
+ },
+ );
- $loop->add( $listener );
+ $loop->add( $listener );
- $listener->listen(
- service => "echo",
- socktype => 'stream',
- )->get;
+ $listener->listen(
+ service => "echo",
+ socktype => 'stream',
+ )->get;
- $loop->run;
+ $loop->run;
This object can also be used indirectly via an L<IO::Async::Loop>:
- use IO::Async::Stream;
+ use IO::Async::Stream;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- $loop->listen(
- service => "echo",
- socktype => 'stream',
+ $loop->listen(
+ service => "echo",
+ socktype => 'stream',
- on_stream => sub {
- ...
- },
- )->get;
+ on_stream => sub {
+ ...
+ },
+ )->get;
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -154,11 +154,11 @@ client socket is accepted from the listening socket. It is passed the listener
object itself, and is expected to return a new instance of
L<IO::Async::Handle> or a subclass, used to wrap the new client socket.
- $handle = $handle_constructor->( $listener )
+ $handle = $handle_constructor->( $listener )
This can also be given as a subclass method
- $handle = $listener->handle_constructor()
+ $handle = $listener->handle_constructor()
=head2 handle_class => STRING
@@ -166,20 +166,20 @@ Optional. If defined and C<handle_constructor> isn't, then new wrapper handles
are constructed by invoking the C<new> method on the given class name, passing
in no additional parameters.
- $handle = $handle_class->new()
+ $handle = $handle_class->new()
This can also be given as a subclass method
- $handle = $listener->handle_class->new
+ $handle = $listener->handle_class->new
=head2 acceptor => STRING|CODE
Optional. If defined, gives the name of a method or a CODE reference to use to
implement the actual accept behaviour. This will be invoked as:
- ( $accepted ) = $listener->acceptor( $socket )->get
+ ( $accepted ) = $listener->acceptor( $socket )->get
- ( $handle ) = $listener->acceptor( $socket, handle => $handle )->get
+ ( $handle ) = $listener->acceptor( $socket, handle => $handle )->get
It is invoked with the listening socket as its its argument, and optionally
an L<IO::Async::Handle> instance as a named parameter, and is expected to
@@ -429,7 +429,7 @@ Optional. A callback that is invoked when the listening socket is ready.
Similar to that on the underlying loop method, except it is passed the
listener object itself.
- $on_listen->( $listener )
+ $on_listen->( $listener )
=back
@@ -458,41 +458,41 @@ The C<handle> argument can be passed an existing socket already in listening
mode, making it possible to listen on other types of socket such as UNIX
sockets.
- use IO::Async::Listener;
- use IO::Socket::UNIX;
+ use IO::Async::Listener;
+ use IO::Socket::UNIX;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $listener = IO::Async::Listener->new(
- on_stream => sub {
- my ( undef, $stream ) = @_;
+ my $listener = IO::Async::Listener->new(
+ on_stream => sub {
+ my ( undef, $stream ) = @_;
- $stream->configure(
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
- $self->write( $$buffref );
- $$buffref = "";
- return 0;
- },
- );
+ $stream->configure(
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
+ $self->write( $$buffref );
+ $$buffref = "";
+ return 0;
+ },
+ );
- $loop->add( $stream );
- },
- );
+ $loop->add( $stream );
+ },
+ );
- $loop->add( $listener );
+ $loop->add( $listener );
- my $socket = IO::Socket::UNIX->new(
- Local => "echo.sock",
- Listen => 1,
- ) or die "Cannot make UNIX socket - $!\n";
+ my $socket = IO::Socket::UNIX->new(
+ Local => "echo.sock",
+ Listen => 1,
+ ) or die "Cannot make UNIX socket - $!\n";
- $listener->listen(
- handle => $socket,
- );
+ $listener->listen(
+ handle => $socket,
+ );
- $loop->run;
+ $loop->run;
=head2 Passing Plain Socket Addresses
@@ -502,27 +502,27 @@ method can use.
This example shows how to listen on TCP port 8001 on address 10.0.0.1:
- $listener->listen(
- addr => {
- family => "inet",
- socktype => "stream",
- port => 8001,
- ip => "10.0.0.1",
- },
- ...
- );
+ $listener->listen(
+ addr => {
+ family => "inet",
+ socktype => "stream",
+ port => 8001,
+ ip => "10.0.0.1",
+ },
+ ...
+ );
This example shows another way to listen on a UNIX socket, similar to the
earlier example:
- $listener->listen(
- addr => {
- family => "unix",
- socktype => "stream",
- path => "echo.sock",
- },
- ...
- );
+ $listener->listen(
+ addr => {
+ family => "unix",
+ socktype => "stream",
+ path => "echo.sock",
+ },
+ ...
+ );
=head2 Using A Kernel-Assigned Port Number
@@ -534,27 +534,27 @@ picked, inspect the C<sockport> accessor on the actual socket filehandle.
Either use the L<Future> returned by the C<listen> method:
- $listener->listen(
- addr => { family => "inet" },
- )->on_done( sub {
- my ( $listener ) = @_;
- my $socket = $listener->read_handle;
+ $listener->listen(
+ addr => { family => "inet" },
+ )->on_done( sub {
+ my ( $listener ) = @_;
+ my $socket = $listener->read_handle;
- say "Now listening on port ", $socket->sockport;
- });
+ say "Now listening on port ", $socket->sockport;
+ });
Or pass an C<on_listen> continuation:
- $listener->listen(
- addr => { family => "inet" },
+ $listener->listen(
+ addr => { family => "inet" },
- on_listen => sub {
- my ( $listener ) = @_;
- my $socket = $listener->read_handle;
+ on_listen => sub {
+ my ( $listener ) = @_;
+ my $socket = $listener->read_handle;
- say "Now listening on port ", $socket->sockport;
- },
- );
+ say "Now listening on port ", $socket->sockport;
+ },
+ );
=head1 AUTHOR
diff --git a/lib/IO/Async/Loop.pm b/lib/IO/Async/Loop.pm
index f671b0a..e81baa6 100644
--- a/lib/IO/Async/Loop.pm
+++ b/lib/IO/Async/Loop.pm
@@ -8,7 +8,7 @@ package IO::Async::Loop;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
# When editing this value don't forget to update the docs below
use constant NEED_API_VERSION => '0.33';
@@ -90,31 +90,31 @@ C<IO::Async::Loop> - core loop of the C<IO::Async> framework
=head1 SYNOPSIS
- use IO::Async::Stream;
- use IO::Async::Timer::Countdown;
+ use IO::Async::Stream;
+ use IO::Async::Timer::Countdown;
- use IO::Async::Loop;
+ use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ my $loop = IO::Async::Loop->new;
- $loop->add( IO::Async::Timer::Countdown->new(
- delay => 10,
- on_expire => sub { print "10 seconds have passed\n" },
- )->start );
+ $loop->add( IO::Async::Timer::Countdown->new(
+ delay => 10,
+ on_expire => sub { print "10 seconds have passed\n" },
+ )->start );
- $loop->add( IO::Async::Stream->new_for_stdin(
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
+ $loop->add( IO::Async::Stream->new_for_stdin(
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*)\n// ) {
- print "You typed a line $1\n";
- }
+ while( $$buffref =~ s/^(.*)\n// ) {
+ print "You typed a line $1\n";
+ }
- return 0;
- },
- ) );
+ return 0;
+ },
+ ) );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -227,12 +227,12 @@ module for another event system.
For example, the following two C<$loop> variables will refer to the same
object:
- use IO::Async::Loop;
- use IO::Async::Loop::Poll;
+ use IO::Async::Loop;
+ use IO::Async::Loop::Poll;
- my $loop_poll = IO::Async::Loop::Poll->new;
+ my $loop_poll = IO::Async::Loop::Poll->new;
- my $loop = IO::Async::Loop->new;
+ my $loop = IO::Async::Loop->new;
While it is not advised to do so under normal circumstances, if the program
really wishes to construct more than one Loop object, it can call the
@@ -645,7 +645,7 @@ sub new_future
Blocks until the given future is ready, as indicated by its C<is_ready> method.
As a convenience it returns the future, to simplify code:
- my @result = $loop->await( $future )->get;
+ my @result = $loop->await( $future )->get;
=cut
@@ -946,7 +946,7 @@ process before running the code or command. See below.
A continuation to be called when the child processes exits. It will be invoked
in the following way:
- $on_exit->( $pid, $exitcode, $dollarbang, $dollarat )
+ $on_exit->( $pid, $exitcode, $dollarbang, $dollarat )
The second argument is passed the plain perl C<$?> value.
@@ -1019,7 +1019,7 @@ Shortcut for passing C<fdI<n>>, where I<n> is the fileno of the IO
reference. In this case, the key must be a reference that implements the
C<fileno> method. This is mostly useful for
- $handle => 'keep'
+ $handle => 'keep'
=item fdI<n> => IO
@@ -1042,10 +1042,10 @@ existing one. If you want to simply add new keys or change the values of some
keys without removing the other existing ones, you can simply copy C<%ENV>
into the hash before setting new keys:
- env => {
- %ENV,
- ANOTHER => "key here",
- }
+ env => {
+ %ENV,
+ ANOTHER => "key here",
+ }
=item nice => INT
@@ -1372,7 +1372,7 @@ C<run_process>:
A continuation to be called when the child process exits and closed its STDOUT
and STDERR streams. It will be invoked in the following way:
- $on_finish->( $pid, $exitcode, $stdout, $stderr )
+ $on_finish->( $pid, $exitcode, $stdout, $stderr )
The second argument is passed the plain perl C<$?> value.
@@ -1518,26 +1518,26 @@ format recognised by L<IO::Async::OS>'s C<extract_addrinfo> method.
This example shows how to use the C<Socket> functions to construct one for TCP
port 8001 on address 10.0.0.1:
- $loop->connect(
- addr => {
- family => "inet",
- socktype => "stream",
- port => 8001,
- ip => "10.0.0.1",
- },
- ...
- );
+ $loop->connect(
+ addr => {
+ family => "inet",
+ socktype => "stream",
+ port => 8001,
+ ip => "10.0.0.1",
+ },
+ ...
+ );
This example shows another way to connect to a UNIX socket at F<echo.sock>.
- $loop->connect(
- addr => {
- family => "unix",
- socktype => "stream",
- path => "echo.sock",
- },
- ...
- );
+ $loop->connect(
+ addr => {
+ family => "unix",
+ socktype => "stream",
+ path => "echo.sock",
+ },
+ ...
+ );
=item peer => IO
@@ -1619,11 +1619,11 @@ this callback is invoked to inform of the error. It is passed the name of the
syscall that failed, the arguments that were passed to it, and the error it
generated. I.e.
- $on_fail->( "socket", $family, $socktype, $protocol, $! );
+ $on_fail->( "socket", $family, $socktype, $protocol, $! );
- $on_fail->( "bind", $sock, $address, $! );
+ $on_fail->( "bind", $sock, $address, $! );
- $on_fail->( "connect", $sock, $address, $! );
+ $on_fail->( "connect", $sock, $address, $! );
Because of the "try all" nature when given a list of multiple addresses, this
callback may be invoked multiple times, even before an eventual success.
@@ -1648,7 +1648,7 @@ A continuation that is invoked on a successful C<connect(2)> call to a valid
socket. It will be passed the connected socket handle, as an C<IO::Socket>
object.
- $on_connected->( $handle )
+ $on_connected->( $handle )
=item on_stream => CODE
@@ -1657,14 +1657,14 @@ of L<IO::Async::Stream> when the socket is connected. This is provided as a
convenience for the common case that a Stream object is required as the
transport for a Protocol object.
- $on_stream->( $stream )
+ $on_stream->( $stream )
=item on_socket => CODE
Similar to C<on_stream>, but constructs an instance of L<IO::Async::Socket>.
This is most useful for C<SOCK_DGRAM> or C<SOCK_RAW> sockets.
- $on_socket->( $socket )
+ $on_socket->( $socket )
=item on_connect_error => CODE
@@ -1674,7 +1674,7 @@ occurred, and the name of the operation it occurred in. Errors from the
C<connect(2)> syscall are considered most significant, then C<bind(2)>, then
finally C<socket(2)>.
- $on_connect_error->( $syscall, $! )
+ $on_connect_error->( $syscall, $! )
=item on_resolve_error => CODE
@@ -1865,13 +1865,13 @@ Optional. A callback that is invoked if a syscall fails while attempting to
create a listening sockets. It is passed the name of the syscall that failed,
the arguments that were passed to it, and the error generated. I.e.
- $on_fail->( "socket", $family, $socktype, $protocol, $! );
+ $on_fail->( "socket", $family, $socktype, $protocol, $! );
- $on_fail->( "sockopt", $sock, $optname, $optval, $! );
+ $on_fail->( "sockopt", $sock, $optname, $optval, $! );
- $on_fail->( "bind", $sock, $address, $! );
+ $on_fail->( "bind", $sock, $address, $! );
- $on_fail->( "listen", $sock, $queuesize, $! );
+ $on_fail->( "listen", $sock, $queuesize, $! );
=item queuesize => INT
@@ -1912,7 +1912,7 @@ continuations to invoke on success or failure.
Optional. A callback that is invoked when the Listener object is ready to
receive connections. The callback is passed the Listener object itself.
- $on_notifier->( $listener )
+ $on_notifier->( $listener )
If this callback is required, it may instead be better to construct the
Listener object directly.
@@ -1923,7 +1923,7 @@ Optional. A callback that is invoked when the listening socket is ready.
Typically this would be used in the name resolver case, in order to inspect
the socket's sockname address, or otherwise inspect the filehandle.
- $on_listen->( $socket )
+ $on_listen->( $socket )
=item on_listen_error => CODE
@@ -2202,7 +2202,7 @@ thows an exception).
A optional continuation to be called when the child processes exits. It will
be invoked in the following way:
- $on_exit->( $pid, $exitcode )
+ $on_exit->( $pid, $exitcode )
The second argument is passed the plain perl C<$?> value.
@@ -2285,11 +2285,11 @@ C<scalar> if not supplied.
Callback to invoke when the thread function returns or throws an exception.
If it returned, this callback will be invoked with its result
- $on_joined->( return => @result )
+ $on_joined->( return => @result )
If it threw an exception the callback is invoked with the value of C<$@>
- $on_joined->( died => $! )
+ $on_joined->( died => $! )
=back
@@ -2397,7 +2397,7 @@ The current API version is C<0.49>.
This method may be implemented using C<constant>; e.g
- use constant API_VERSION => '0.49';
+ use constant API_VERSION => '0.49';
=cut
@@ -2948,7 +2948,7 @@ The PID to watch. Will report on all child processes if this is 0.
A CODE reference to the exit handler. It will be invoked as
- $code->( $pid, $? )
+ $code->( $pid, $? )
The second argument is passed the plain perl C<$?> value.
@@ -3141,17 +3141,17 @@ remaining ones in another C<extensions> parameter.
For example,
- $loop->connect(
- extensions => [qw( FOO BAR )],
- %args
- )
+ $loop->connect(
+ extensions => [qw( FOO BAR )],
+ %args
+ )
will become
- $loop->FOO_connect(
- extensions => [qw( BAR )],
- %args
- )
+ $loop->FOO_connect(
+ extensions => [qw( BAR )],
+ %args
+ )
This is provided so that extension modules, such as L<IO::Async::SSL> can
easily be invoked indirectly, by passing extra arguments to C<connect> methods
@@ -3161,8 +3161,8 @@ also use it.
The following methods take an C<extensions> parameter:
- $loop->connect
- $loop->listen
+ $loop->connect
+ $loop->listen
If an extension C<listen> method is invoked, it will be passed a C<listener>
parameter even if one was not provided to the original C<< $loop->listen >>
diff --git a/lib/IO/Async/Loop/Poll.pm b/lib/IO/Async/Loop/Poll.pm
index 8c1dd97..c5048aa 100644
--- a/lib/IO/Async/Loop/Poll.pm
+++ b/lib/IO/Async/Loop/Poll.pm
@@ -8,7 +8,7 @@ package IO::Async::Loop::Poll;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use constant API_VERSION => '0.49';
use base qw( IO::Async::Loop );
@@ -46,19 +46,19 @@ Normally an instance of this class would not be directly constructed by a
program. It may however, be useful for runinng L<IO::Async> with an existing
program already using an C<IO::Poll> object.
- use IO::Poll;
- use IO::Async::Loop::Poll;
+ use IO::Poll;
+ use IO::Async::Loop::Poll;
- my $poll = IO::Poll->new;
- my $loop = IO::Async::Loop::Poll->new( poll => $poll );
+ my $poll = IO::Poll->new;
+ my $loop = IO::Async::Loop::Poll->new( poll => $poll );
- $loop->add( ... );
+ $loop->add( ... );
- while(1) {
- my $timeout = ...
- my $ret = $poll->poll( $timeout );
- $loop->post_poll;
- }
+ while(1) {
+ my $timeout = ...
+ my $ret = $poll->poll( $timeout );
+ $loop->post_poll;
+ }
=head1 DESCRIPTION
diff --git a/lib/IO/Async/Loop/Select.pm b/lib/IO/Async/Loop/Select.pm
index 9cb91f5..2c52fc6 100644
--- a/lib/IO/Async/Loop/Select.pm
+++ b/lib/IO/Async/Loop/Select.pm
@@ -8,7 +8,7 @@ package IO::Async::Loop::Select;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use constant API_VERSION => '0.49';
use base qw( IO::Async::Loop );
@@ -37,22 +37,22 @@ Normally an instance of this class would not be directly constructed by a
program. It may however, be useful for runinng L<IO::Async> with an existing
program already using a C<select> call.
- use IO::Async::Loop::Select;
+ use IO::Async::Loop::Select;
- my $loop = IO::Async::Loop::Select->new;
+ my $loop = IO::Async::Loop::Select->new;
- $loop->add( ... );
+ $loop->add( ... );
- while(1) {
- my ( $rvec, $wvec, $evec ) = ('') x 3;
- my $timeout;
+ while(1) {
+ my ( $rvec, $wvec, $evec ) = ('') x 3;
+ my $timeout;
- $loop->pre_select( \$rvec, \$wvec, \$evec, \$timeout );
- ...
- my $ret = select( $rvec, $wvec, $evec, $timeout );
- ...
- $loop->post_select( $rvec, $evec, $wvec );
- }
+ $loop->pre_select( \$rvec, \$wvec, \$evec, \$timeout );
+ ...
+ my $ret = select( $rvec, $wvec, $evec, $timeout );
+ ...
+ $loop->post_select( $rvec, $evec, $wvec );
+ }
=head1 DESCRIPTION
diff --git a/lib/IO/Async/LoopTests.pm b/lib/IO/Async/LoopTests.pm
index 91d6e80..70f1a48 100644
--- a/lib/IO/Async/LoopTests.pm
+++ b/lib/IO/Async/LoopTests.pm
@@ -28,7 +28,7 @@ use POSIX qw( SIGTERM );
use Socket qw( sockaddr_family AF_UNIX );
use Time::HiRes qw( time );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
# Abstract Units of Time
use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;
@@ -44,8 +44,8 @@ C<IO::Async::LoopTests> - acceptance testing for L<IO::Async::Loop> subclasses
=head1 SYNOPSIS
- use IO::Async::LoopTests;
- run_tests( 'IO::Async::Loop::Shiney', 'io' );
+ use IO::Async::LoopTests;
+ run_tests( 'IO::Async::Loop::Shiney', 'io' );
=head1 DESCRIPTION
@@ -65,7 +65,7 @@ timers are preferred on automated smoke-testing machines, to help guard
against false negatives reported simply because of scheduling delays or high
system load while testing.
- TEST_QUICK_TIMERS=1 ./Build test
+ $ TEST_QUICK_TIMERS=1 ./Build test
=cut
diff --git a/lib/IO/Async/Notifier.pm b/lib/IO/Async/Notifier.pm
index 3335205..46f929e 100644
--- a/lib/IO/Async/Notifier.pm
+++ b/lib/IO/Async/Notifier.pm
@@ -8,7 +8,7 @@ package IO::Async::Notifier;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
use Scalar::Util qw( weaken );
@@ -28,44 +28,44 @@ C<IO::Async::Notifier> - base class for L<IO::Async> event objects
Usually not directly used by a program, but one valid use case may be:
- use IO::Async::Notifier;
+ use IO::Async::Notifier;
- use IO::Async::Stream;
- use IO::Async::Signal;
+ use IO::Async::Stream;
+ use IO::Async::Signal;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $notifier = IO::Async::Notifier->new;
+ my $notifier = IO::Async::Notifier->new;
- $notifier->add_child(
- IO::Async::Stream->new_for_stdin(
- on_read => sub {
- my $self = shift;
- my ( $buffref, $eof ) = @_;
+ $notifier->add_child(
+ IO::Async::Stream->new_for_stdin(
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*)\n// ) {
- print "You said $1\n";
- }
+ while( $$buffref =~ s/^(.*)\n// ) {
+ print "You said $1\n";
+ }
- return 0;
- },
- )
- );
+ return 0;
+ },
+ )
+ );
- $notifier->add_child(
- IO::Async::Signal->new(
- name => 'INT',
- on_receipt => sub {
- print "Goodbye!\n";
- $loop->stop;
- },
- )
- );
+ $notifier->add_child(
+ IO::Async::Signal->new(
+ name => 'INT',
+ on_receipt => sub {
+ print "Goodbye!\n";
+ $loop->stop;
+ },
+ )
+ );
- $loop->add( $notifier );
+ $loop->add( $notifier );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -138,24 +138,24 @@ C<"IO_Async_Notifier__"> for namespace purposes.
This is intended mainly for defining a subclass of some other object that is
also an C<IO::Async::Notifier>, suitable to be added to an L<IO::Async::Loop>.
- package SomeEventSource::Async;
- use base qw( SomeEventSource IO::Async::Notifier );
+ package SomeEventSource::Async;
+ use base qw( SomeEventSource IO::Async::Notifier );
- sub _add_to_loop
- {
- my $self = shift;
- my ( $loop ) = @_;
+ sub _add_to_loop
+ {
+ my $self = shift;
+ my ( $loop ) = @_;
- # Code here to set up event handling on $loop that may be required
- }
+ # Code here to set up event handling on $loop that may be required
+ }
- sub _remove_from_loop
- {
- my $self = shift;
- my ( $loop ) = @_;
+ sub _remove_from_loop
+ {
+ my $self = shift;
+ my ( $loop ) = @_;
- # Code here to undo the event handling set up above
- }
+ # Code here to undo the event handling set up above
+ }
Since all the methods documented here will be available, the implementation
may wish to use the C<configure> and C<make_event_cb> or C<invoke_event>
@@ -332,7 +332,7 @@ C<< $f->fail >>. To avoid this being fatal if the failure is handled
elsewhere, use the C<else_done> method on the future to obtain a sequence one
that never fails.
- $notifier->adopt_future( $f->else_done() )
+ $notifier->adopt_future( $f->else_done() )
The future itself is returned.
@@ -609,12 +609,12 @@ stored in the Notifier itself without creating a cycle.
For example,
- my $mref = $notifier->_capture_weakself( sub {
- my ( $notifier, $arg ) = @_;
- print "Notifier $notifier got argument $arg\n";
- } );
+ my $mref = $notifier->_capture_weakself( sub {
+ my ( $notifier, $arg ) = @_;
+ print "Notifier $notifier got argument $arg\n";
+ } );
- $mref->( 123 );
+ $mref->( 123 );
This is provided as a utility for Notifier subclasses to use to build a
callback CODEref to pass to a Loop method, but which may also want to store
@@ -633,11 +633,11 @@ is possible that it has been destroyed by the time the code runs, and so the
reference will be passed as C<undef>. This should be protected against by the
code body.
- $other_object->{on_event} = $notifier->_capture_weakself( sub {
- my $notifier = shift or return;
- my ( @event_args ) = @_;
- ...
- } );
+ $other_object->{on_event} = $notifier->_capture_weakself( sub {
+ my $notifier = shift or return;
+ my ( @event_args ) = @_;
+ ...
+ } );
For stand-alone generic implementation of this behaviour, see also L<curry>
and C<curry::weak>.
@@ -684,12 +684,12 @@ stored in the Notifier itself without creating a cycle.
For example,
- my $mref = $notifier->_replace_weakself( sub {
- my ( $notifier, $arg ) = @_;
- print "Notifier $notifier got argument $arg\n";
- } );
+ my $mref = $notifier->_replace_weakself( sub {
+ my ( $notifier, $arg ) = @_;
+ print "Notifier $notifier got argument $arg\n";
+ } );
- $mref->( $object, 123 );
+ $mref->( $object, 123 );
This is provided as a utility for Notifier subclasses to use for event
callbacks on other objects, where the delegated object is passed in the
@@ -877,22 +877,22 @@ be the class name of the notifier, and any parent notifiers it is contained
by, joined by an arrow C<< <- >>. To ensure this string does not grow too
long, certain prefixes are abbreviated:
- IO::Async::Protocol:: => IaP:
- IO::Async:: => Ia:
- Net::Async:: => Na:
+ IO::Async::Protocol:: => IaP:
+ IO::Async:: => Ia:
+ Net::Async:: => Na:
Finally, each notifier that has a name defined using the C<notifier_name>
parameter has that name appended in braces.
For example, invoking
- $stream->debug_printf( "EVENT on_read" )
+ $stream->debug_printf( "EVENT on_read" )
On an L<IO::Async::Stream> instance reading and writing a file descriptor
whose C<fileno> is 4, which is a child of an L<IO::Async::Protocol::Stream>,
will produce a line of output:
- [Ia:Stream{rw=4}<-IaP:Stream] EVENT on_read
+ [Ia:Stream{rw=4}<-IaP:Stream] EVENT on_read
=cut
diff --git a/lib/IO/Async/OS.pm b/lib/IO/Async/OS.pm
index d5ede3d..7867248 100644
--- a/lib/IO/Async/OS.pm
+++ b/lib/IO/Async/OS.pm
@@ -8,7 +8,7 @@ package IO::Async::OS;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
our @ISA = qw( IO::Async::OS::_Base );
@@ -303,20 +303,20 @@ parent process.
When creating a L<IO::Async::Stream> or subclass of it, the C<read_handle>
and C<write_handle> parameters should always be used.
- my ( $childRd, $myWr, $myRd, $childWr ) = IO::Async::OS->pipequad;
+ my ( $childRd, $myWr, $myRd, $childWr ) = IO::Async::OS->pipequad;
- $loop->open_process(
- stdin => $childRd,
- stdout => $childWr,
- ...
- );
+ $loop->open_process(
+ stdin => $childRd,
+ stdout => $childWr,
+ ...
+ );
- my $str = IO::Async::Stream->new(
- read_handle => $myRd,
- write_handle => $myWr,
- ...
- );
- $loop->add( $str );
+ my $str = IO::Async::Stream->new(
+ read_handle => $myRd,
+ write_handle => $myWr,
+ ...
+ );
+ $loop->add( $str );
=cut
@@ -385,11 +385,11 @@ address suitable for C<connect> or C<bind>.
If given an ARRAY it should be in the following form:
- [ $family, $socktype, $protocol, $addr ]
+ [ $family, $socktype, $protocol, $addr ]
If given a HASH it should contain the following keys:
- family socktype protocol addr
+ family socktype protocol addr
Each field in the result will be initialised to 0 (or empty string for the
address) if not defined in the C<$ai> value.
diff --git a/lib/IO/Async/OS/MSWin32.pm b/lib/IO/Async/OS/MSWin32.pm
index 9edef9f..e7a1671 100644
--- a/lib/IO/Async/OS/MSWin32.pm
+++ b/lib/IO/Async/OS/MSWin32.pm
@@ -8,7 +8,7 @@ package IO::Async::OS::MSWin32;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
our @ISA = qw( IO::Async::OS::_Base );
diff --git a/lib/IO/Async/OS/cygwin.pm b/lib/IO/Async/OS/cygwin.pm
index 4258fdb..fef6a4e 100644
--- a/lib/IO/Async/OS/cygwin.pm
+++ b/lib/IO/Async/OS/cygwin.pm
@@ -8,7 +8,7 @@ package IO::Async::OS::cygwin;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
our @ISA = qw( IO::Async::OS::_Base );
diff --git a/lib/IO/Async/OS/linux.pm b/lib/IO/Async/OS/linux.pm
index 79482ea..74558e2 100644
--- a/lib/IO/Async/OS/linux.pm
+++ b/lib/IO/Async/OS/linux.pm
@@ -8,7 +8,7 @@ package IO::Async::OS::linux;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
our @ISA = qw( IO::Async::OS::_Base );
diff --git a/lib/IO/Async/PID.pm b/lib/IO/Async/PID.pm
index 4f9876c..c3416a6 100644
--- a/lib/IO/Async/PID.pm
+++ b/lib/IO/Async/PID.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Notifier );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -19,36 +19,36 @@ C<IO::Async::PID> - event callback on exit of a child process
=head1 SYNOPSIS
- use IO::Async::PID;
- use POSIX qw( WEXITSTATUS );
+ use IO::Async::PID;
+ use POSIX qw( WEXITSTATUS );
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $kid = $loop->fork(
- code => sub {
- print "Child sleeping..\n";
- sleep 10;
- print "Child exiting\n";
- return 20;
- },
- );
+ my $kid = $loop->fork(
+ code => sub {
+ print "Child sleeping..\n";
+ sleep 10;
+ print "Child exiting\n";
+ return 20;
+ },
+ );
- print "Child process $kid started\n";
+ print "Child process $kid started\n";
- my $pid = IO::Async::PID->new(
- pid => $kid,
+ my $pid = IO::Async::PID->new(
+ pid => $kid,
- on_exit => sub {
- my ( $self, $exitcode ) = @_;
- printf "Child process %d exited with status %d\n",
- $self->pid, WEXITSTATUS($exitcode);
- },
- );
+ on_exit => sub {
+ my ( $self, $exitcode ) = @_;
+ printf "Child process %d exited with status %d\n",
+ $self->pid, WEXITSTATUS($exitcode);
+ },
+ );
- $loop->add( $pid );
+ $loop->add( $pid );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
diff --git a/lib/IO/Async/Process.pm b/lib/IO/Async/Process.pm
index c8f0493..750c15d 100644
--- a/lib/IO/Async/Process.pm
+++ b/lib/IO/Async/Process.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Notifier );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -25,58 +25,58 @@ C<IO::Async::Process> - start and manage a child process
=head1 SYNOPSIS
- use IO::Async::Process;
-
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
-
- my $process = IO::Async::Process->new(
- command => [ "tr", "a-z", "n-za-m" ],
- stdin => {
- from => "hello world\n",
- },
- stdout => {
- on_read => sub {
- my ( $stream, $buffref ) = @_;
- while( $$buffref =~ s/^(.*)\n// ) {
- print "Rot13 of 'hello world' is '$1'\n";
- }
-
- return 0;
- },
- },
-
- on_finish => sub {
- $loop->stop;
- },
- );
+ use IO::Async::Process;
+
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
+
+ my $process = IO::Async::Process->new(
+ command => [ "tr", "a-z", "n-za-m" ],
+ stdin => {
+ from => "hello world\n",
+ },
+ stdout => {
+ on_read => sub {
+ my ( $stream, $buffref ) = @_;
+ while( $$buffref =~ s/^(.*)\n// ) {
+ print "Rot13 of 'hello world' is '$1'\n";
+ }
+
+ return 0;
+ },
+ },
+
+ on_finish => sub {
+ $loop->stop;
+ },
+ );
- $loop->add( $process );
+ $loop->add( $process );
- $loop->run;
+ $loop->run;
Also accessible via the L<IO::Async::Loop/open_process> method:
- $loop->open_process(
- command => [ "/bin/ping", "-c4", "some.host" ],
-
- stdout => {
- on_read => sub {
- my ( $stream, $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*)\n// ) {
- print "PING wrote: $1\n";
- }
- return 0;
- },
- },
-
- on_finish => sub {
- my $process = shift;
- my ( $exitcode ) = @_;
- my $status = ( $exitcode >> 8 );
- ...
- },
- );
+ $loop->open_process(
+ command => [ "/bin/ping", "-c4", "some.host" ],
+
+ stdout => {
+ on_read => sub {
+ my ( $stream, $buffref, $eof ) = @_;
+ while( $$buffref =~ s/^(.*)\n// ) {
+ print "PING wrote: $1\n";
+ }
+ return 0;
+ },
+ },
+
+ on_finish => sub {
+ my $process = shift;
+ my ( $exitcode ) = @_;
+ my $status = ( $exitcode >> 8 );
+ ...
+ },
+ );
=head1 DESCRIPTION
@@ -115,21 +115,21 @@ C<on_exit> handler in a different order it is possible that the C<$exception>
field will be an empty string. It will however always be defined. This can be
used to distinguish the two cases:
- on_exception => sub {
- my $self = shift;
- my ( $exception, $errno, $exitcode ) = @_;
-
- if( length $exception ) {
- print STDERR "The process died with the exception $exception " .
- "(errno was $errno)\n";
- }
- elsif( ( my $status = W_EXITSTATUS($exitcode) ) == 255 ) {
- print STDERR "The process failed to exec() - $errno\n";
- }
- else {
- print STDERR "The process exited with exit status $status\n";
- }
- }
+ on_exception => sub {
+ my $self = shift;
+ my ( $exception, $errno, $exitcode ) = @_;
+
+ if( length $exception ) {
+ print STDERR "The process died with the exception $exception " .
+ "(errno was $errno)\n";
+ }
+ elsif( ( my $status = W_EXITSTATUS($exitcode) ) == 255 ) {
+ print STDERR "The process failed to exec() - $errno\n";
+ }
+ else {
+ print STDERR "The process exited with exit status $status\n";
+ }
+ }
=cut
@@ -265,7 +265,7 @@ use C<setsockopt(3)>) from the controlling parent, before the child code runs.
The arguments passed in are the L<IO::Socket> objects for the parent and child
ends of the socket.
- $prefork->( $localfd, $childfd )
+ $prefork->( $localfd, $childfd )
=back
@@ -830,19 +830,19 @@ sub stdio { shift->fd( 'io' ) }
By configuring the C<stdout> filehandle of the process using the C<into> key,
data written by the process can be captured.
- my $stdout;
- my $process = IO::Async::Process->new(
- command => [ "writing-program", "arguments" ],
- stdout => { into => \$stdout },
- on_finish => sub {
- my $process = shift;
- my ( $exitcode ) = @_;
- print "Process has exited with code $exitcode, and wrote:\n";
- print $stdout;
- }
- );
+ my $stdout;
+ my $process = IO::Async::Process->new(
+ command => [ "writing-program", "arguments" ],
+ stdout => { into => \$stdout },
+ on_finish => sub {
+ my $process = shift;
+ my ( $exitcode ) = @_;
+ print "Process has exited with code $exitcode, and wrote:\n";
+ print $stdout;
+ }
+ );
- $loop->add( $process );
+ $loop->add( $process );
Note that until C<on_finish> is invoked, no guarantees are made about how much
of the data actually written by the process is yet in the C<$stdout> scalar.
@@ -853,24 +853,24 @@ To handle data more interactively as it arrives, the C<on_read> key can
instead be used, to provide a callback function to invoke whenever more data
is available from the process.
- my $process = IO::Async::Process->new(
- command => [ "writing-program", "arguments" ],
- stdout => {
- on_read => sub {
- my ( $stream, $buffref ) = @_;
- while( $$buffref =~ s/^(.*)\n// ) {
- print "The process wrote a line: $1\n";
- }
-
- return 0;
- },
- },
- on_finish => sub {
- print "The process has finished\n";
- }
- );
-
- $loop->add( $process );
+ my $process = IO::Async::Process->new(
+ command => [ "writing-program", "arguments" ],
+ stdout => {
+ on_read => sub {
+ my ( $stream, $buffref ) = @_;
+ while( $$buffref =~ s/^(.*)\n// ) {
+ print "The process wrote a line: $1\n";
+ }
+
+ return 0;
+ },
+ },
+ on_finish => sub {
+ print "The process has finished\n";
+ }
+ );
+
+ $loop->add( $process );
If the code to handle data read from the process isn't available yet when
the object is constructed, it can be supplied later by using the C<configure>
@@ -878,41 +878,41 @@ method on the C<stdout> filestream at some point before it gets added to the
Loop. In this case, C<stdin> should be configured using C<pipe_read> in the
C<via> key.
- my $process = IO::Async::Process->new(
- command => [ "writing-program", "arguments" ],
- stdout => { via => "pipe_read" },
- on_finish => sub {
- print "The process has finished\n";
- }
- );
+ my $process = IO::Async::Process->new(
+ command => [ "writing-program", "arguments" ],
+ stdout => { via => "pipe_read" },
+ on_finish => sub {
+ print "The process has finished\n";
+ }
+ );
- $process->stdout->configure(
- on_read => sub {
- my ( $stream, $buffref ) = @_;
- while( $$buffref =~ s/^(.*)\n// ) {
- print "The process wrote a line: $1\n";
- }
+ $process->stdout->configure(
+ on_read => sub {
+ my ( $stream, $buffref ) = @_;
+ while( $$buffref =~ s/^(.*)\n// ) {
+ print "The process wrote a line: $1\n";
+ }
- return 0;
- },
- );
+ return 0;
+ },
+ );
- $loop->add( $process );
+ $loop->add( $process );
=head2 Sending data to STDIN of a process
By configuring the C<stdin> filehandle of the process using the C<from> key,
data can be written into the C<STDIN> stream of the process.
- my $process = IO::Async::Process->new(
- command => [ "reading-program", "arguments" ],
- stdin => { from => "Here is the data to send\n" },
- on_finish => sub {
- print "The process has finished\n";
- }
- );
+ my $process = IO::Async::Process->new(
+ command => [ "reading-program", "arguments" ],
+ stdin => { from => "Here is the data to send\n" },
+ on_finish => sub {
+ print "The process has finished\n";
+ }
+ );
- $loop->add( $process );
+ $loop->add( $process );
The data in this scalar will be written until it is all consumed, then the
handle will be closed. This may be useful if the program waits for EOF on
@@ -922,17 +922,17 @@ To have the ability to write more data into the process once it has started.
the C<write> method on the C<stdin> stream can be used, when it is configured
using the C<pipe_write> value for C<via>:
- my $process = IO::Async::Process->new(
- command => [ "reading-program", "arguments" ],
- stdin => { via => "pipe_write" },
- on_finish => sub {
- print "The process has finished\n";
- }
- );
+ my $process = IO::Async::Process->new(
+ command => [ "reading-program", "arguments" ],
+ stdin => { via => "pipe_write" },
+ on_finish => sub {
+ print "The process has finished\n";
+ }
+ );
- $loop->add( $process );
+ $loop->add( $process );
- $process->stdin->write( "Here is some more data\n" );
+ $process->stdin->write( "Here is some more data\n" );
=head2 Setting socket options
@@ -941,24 +941,24 @@ size at both ends of the socket before the child is forked (at which point it
would be too late for the parent to be able to change the child end of the
socket).
- use Socket qw( SOL_SOCKET SO_RCVBUF );
-
- my $process = IO::Async::Process->new(
- command => [ "command-to-read-from-and-write-to", "arguments" ],
- stdio => {
- via => "socketpair",
- prefork => sub {
- my ( $parentfd, $childfd ) = @_;
-
- # Set parent end of socket receive buffer to 3 MB
- $parentfd->setsockopt(SOL_SOCKET, SO_RCVBUF, 3 * 1024 * 1024);
- # Set child end of socket receive buffer to 3 MB
- $childfd ->setsockopt(SOL_SOCKET, SO_RCVBUF, 3 * 1024 * 1024);
- },
- },
- );
+ use Socket qw( SOL_SOCKET SO_RCVBUF );
+
+ my $process = IO::Async::Process->new(
+ command => [ "command-to-read-from-and-write-to", "arguments" ],
+ stdio => {
+ via => "socketpair",
+ prefork => sub {
+ my ( $parentfd, $childfd ) = @_;
+
+ # Set parent end of socket receive buffer to 3 MB
+ $parentfd->setsockopt(SOL_SOCKET, SO_RCVBUF, 3 * 1024 * 1024);
+ # Set child end of socket receive buffer to 3 MB
+ $childfd ->setsockopt(SOL_SOCKET, SO_RCVBUF, 3 * 1024 * 1024);
+ },
+ },
+ );
- $loop->add( $process );
+ $loop->add( $process );
=cut
diff --git a/lib/IO/Async/Protocol.pm b/lib/IO/Async/Protocol.pm
index 772fe1c..2751011 100644
--- a/lib/IO/Async/Protocol.pm
+++ b/lib/IO/Async/Protocol.pm
@@ -8,7 +8,7 @@ package IO::Async::Protocol;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Notifier );
@@ -138,7 +138,7 @@ continuation will be used; otherwise C<on_socket> will be used.
Optional. If supplied, will be invoked once the connection has been
established.
- $on_connected->( $protocol )
+ $on_connected->( $protocol )
=item transport => IO::Async::Handle
@@ -195,7 +195,7 @@ sub connect
The following methods are delegated to the transport object
- close
+ close
=cut
diff --git a/lib/IO/Async/Protocol/LineStream.pm b/lib/IO/Async/Protocol/LineStream.pm
index 4ea49a7..7c4bd0c 100644
--- a/lib/IO/Async/Protocol/LineStream.pm
+++ b/lib/IO/Async/Protocol/LineStream.pm
@@ -8,7 +8,7 @@ package IO::Async::Protocol::LineStream;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Protocol::Stream );
@@ -24,31 +24,31 @@ text
Most likely this class will be subclassed to implement a particular network
protocol.
- package Net::Async::HelloWorld;
+ package Net::Async::HelloWorld;
- use strict;
- use warnings;
- use base qw( IO::Async::Protocol::LineStream );
+ use strict;
+ use warnings;
+ use base qw( IO::Async::Protocol::LineStream );
- sub on_read_line
- {
- my $self = shift;
- my ( $line ) = @_;
+ sub on_read_line
+ {
+ my $self = shift;
+ my ( $line ) = @_;
- if( $line =~ m/^HELLO (.*)/ ) {
- my $name = $1;
+ if( $line =~ m/^HELLO (.*)/ ) {
+ my $name = $1;
- $self->invoke_event( on_hello => $name );
- }
- }
+ $self->invoke_event( on_hello => $name );
+ }
+ }
- sub send_hello
- {
- my $self = shift;
- my ( $name ) = @_;
+ sub send_hello
+ {
+ my $self = shift;
+ my ( $name ) = @_;
- $self->write_line( "HELLO $name" );
- }
+ $self->write_line( "HELLO $name" );
+ }
This small example elides such details as error handling, which a real
protocol implementation would be likely to contain.
diff --git a/lib/IO/Async/Protocol/Stream.pm b/lib/IO/Async/Protocol/Stream.pm
index f6f5206..4a28f22 100644
--- a/lib/IO/Async/Protocol/Stream.pm
+++ b/lib/IO/Async/Protocol/Stream.pm
@@ -8,7 +8,7 @@ package IO::Async::Protocol::Stream;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Protocol );
@@ -23,36 +23,36 @@ C<IO::Async::Protocol::Stream> - base class for stream-based protocols
Most likely this class will be subclassed to implement a particular network
protocol.
- package Net::Async::HelloWorld;
+ package Net::Async::HelloWorld;
- use strict;
- use warnings;
- use base qw( IO::Async::Protocol::Stream );
+ use strict;
+ use warnings;
+ use base qw( IO::Async::Protocol::Stream );
- sub on_read
- {
- my $self = shift;
- my ( $buffref, $eof ) = @_;
+ sub on_read
+ {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
- return 0 unless $$buffref =~ s/^(.*)\n//;
- my $line = $1;
+ return 0 unless $$buffref =~ s/^(.*)\n//;
+ my $line = $1;
- if( $line =~ m/^HELLO (.*)/ ) {
- my $name = $1;
+ if( $line =~ m/^HELLO (.*)/ ) {
+ my $name = $1;
- $self->invoke_event( on_hello => $name );
- }
+ $self->invoke_event( on_hello => $name );
+ }
- return 1;
- }
+ return 1;
+ }
- sub send_hello
- {
- my $self = shift;
- my ( $name ) = @_;
+ sub send_hello
+ {
+ my $self = shift;
+ my ( $name ) = @_;
- $self->write( "HELLO $name\n" );
- }
+ $self->write( "HELLO $name\n" );
+ }
This small example elides such details as error handling, which a real
protocol implementation would be likely to contain.
diff --git a/lib/IO/Async/Resolver.pm b/lib/IO/Async/Resolver.pm
index c6de4d3..5c04080 100644
--- a/lib/IO/Async/Resolver.pm
+++ b/lib/IO/Async/Resolver.pm
@@ -1,7 +1,7 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2007-2018 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2007-2021 -- leonerd@leonerd.org.uk
package IO::Async::Resolver;
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Function );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
# Socket 2.006 fails to getaddrinfo() AI_NUMERICHOST properly on MSWin32
use Socket 2.007 qw(
@@ -41,26 +41,23 @@ C<IO::Async::Resolver> - performing name resolutions asynchronously
This object is used indirectly via an L<IO::Async::Loop>:
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- $loop->resolver->getaddrinfo(
- host => "www.example.com",
- service => "http",
- )->on_done( sub {
- foreach my $addr ( @_ ) {
- printf "http://www.example.com can be reached at " .
- "socket(%d,%d,%d) + connect('%v02x')\n",
- @{$addr}{qw( family socktype protocol addr )};
- }
- });
+ my @results = $loop->resolver->getaddrinfo(
+ host => "www.example.com",
+ service => "http",
+ )->get;
- $loop->resolve( type => 'getpwuid', data => [ $< ] )
- ->on_done( sub {
- print "My passwd ent: " . join( "|", @_ ) . "\n";
- });
+ foreach my $addr ( @results ) {
+ printf "http://www.example.com can be reached at " .
+ "socket(%d,%d,%d) + connect('%v02x')\n",
+ @{$addr}{qw( family socktype protocol addr )};
+ }
+
+ my @pwent = $loop->resolve( type => 'getpwuid', data => [ $< ] )->get;
- $loop->run;
+ print "My passwd ent: " . join( "|", @pwent ) . "\n";
=head1 DESCRIPTION
@@ -90,23 +87,8 @@ sub _init
my ( $params ) = @_;
$self->SUPER::_init( @_ );
- $params->{code} = sub {
- my ( $type, $timeout, @data ) = @_;
-
- if( my $code = $METHODS{$type} ) {
- local $SIG{ALRM} = sub { die "Timed out\n" };
-
- alarm( $timeout );
- my @ret = eval { $code->( @data ) };
- alarm( 0 );
-
- die $@ if $@;
- return @ret;
- }
- else {
- die "Unrecognised resolver request '$type'";
- }
- };
+ $params->{module} = __PACKAGE__;
+ $params->{func} = "_resolve";
$params->{idle_timeout} = 30;
$params->{min_workers} = 0;
@@ -114,6 +96,25 @@ sub _init
$started = 1;
}
+sub _resolve
+{
+ my ( $type, $timeout, @data ) = @_;
+
+ if( my $code = $METHODS{$type} ) {
+ local $SIG{ALRM} = sub { die "Timed out\n" };
+
+ alarm( $timeout );
+ my @ret = eval { $code->( @data ) };
+ alarm( 0 );
+
+ die $@ if $@;
+ return @ret;
+ }
+ else {
+ die "Unrecognised resolver request '$type'";
+ }
+}
+
sub debug_printf_call
{
my $self = shift;
@@ -180,7 +181,7 @@ On failure, the fail category name is C<resolve>; the details give the
individual resolver function name (e.g. C<getaddrinfo>), followed by other
error details specific to the resolver in question.
- ->fail( $message, resolve => $type => @details )
+ ->fail( $message, resolve => $type => @details )
=head2 resolve (void)
@@ -196,7 +197,7 @@ continuations to invoke on success or failure:
A continuation that is invoked when the resolver function returns a successful
result. It will be passed the array returned by the resolver function.
- $on_resolved->( @result )
+ $on_resolved->( @result )
=item on_error => CODE
@@ -307,7 +308,7 @@ C<canonname> field will also be present.
On failure, the detail field will give the error number, which should match
one of the C<Socket::EAI_*> constants.
- ->fail( $message, resolve => getaddrinfo => $eai_errno )
+ ->fail( $message, resolve => getaddrinfo => $eai_errno )
As a specific optimisation, this method will try to perform a lookup of
numeric values synchronously, rather than asynchronously, if it looks likely
@@ -331,13 +332,13 @@ continuations to invoke on success or failure:
Callback which is invoked after a successful lookup.
- $on_resolved->( @addrs )
+ $on_resolved->( @addrs )
=item on_error => CODE
Callback which is invoked after a failed lookup, including for a timeout.
- $on_error->( $exception )
+ $on_error->( $exception )
=back
@@ -455,7 +456,7 @@ Time in seconds after which to abort the lookup with a C<Timed out> exception
On failure, the detail field will give the error number, which should match
one of the C<Socket::EAI_*> constants.
- ->fail( $message, resolve => getnameinfo => $eai_errno )
+ ->fail( $message, resolve => getnameinfo => $eai_errno )
As a specific optimisation, this method will try to perform a lookup of
numeric values synchronously, rather than asynchronously, if both the
@@ -474,13 +475,13 @@ continuations to invoke on success or failure:
Callback which is invoked after a successful lookup.
- $on_resolved->( $host, $service )
+ $on_resolved->( $host, $service )
=item on_error => CODE
Callback which is invoked after a failed lookup, including for a timeout.
- $on_error->( $exception )
+ $on_error->( $exception )
=back
@@ -585,12 +586,12 @@ sub register_resolver
The following resolver names are implemented by the same-named perl function,
taking and returning a list of values exactly as the perl function does:
- getpwnam getpwuid
- getgrnam getgrgid
- getservbyname getservbyport
- gethostbyname gethostbyaddr
- getnetbyname getnetbyaddr
- getprotobyname getprotobynumber
+ getpwnam getpwuid
+ getgrnam getgrgid
+ getservbyname getservbyport
+ gethostbyname gethostbyaddr
+ getnetbyname getnetbyaddr
+ getprotobyname getprotobynumber
=cut
@@ -618,9 +619,9 @@ register_resolver getprotobynumber => sub { my @r = getprotobynumber( $_[0] ) or
The following three resolver names are implemented using the L<Socket> module.
- getaddrinfo
- getaddrinfo_array
- getnameinfo
+ getaddrinfo
+ getaddrinfo_array
+ getnameinfo
The C<getaddrinfo> resolver takes arguments in a hash of name/value pairs and
returns a list of hash structures, as the C<Socket::getaddrinfo> function
@@ -709,22 +710,22 @@ either type of key, where both functions return the same type of list. This is
purely a convention, and is in no way required or enforced by the
L<IO::Async::Resolver> itself.
- @numbers = qw( zero one two three four
- five six seven eight nine );
-
- register_resolver getnumberbyindex => sub {
- my ( $index ) = @_;
- die "Bad index $index" unless $index >= 0 and $index < @numbers;
- return ( $index, $numbers[$index] );
- };
-
- register_resolver getnumberbyname => sub {
- my ( $name ) = @_;
- foreach my $index ( 0 .. $#numbers ) {
- return ( $index, $name ) if $numbers[$index] eq $name;
- }
- die "Bad name $name";
- };
+ @numbers = qw( zero one two three four
+ five six seven eight nine );
+
+ register_resolver getnumberbyindex => sub {
+ my ( $index ) = @_;
+ die "Bad index $index" unless $index >= 0 and $index < @numbers;
+ return ( $index, $numbers[$index] );
+ };
+
+ register_resolver getnumberbyname => sub {
+ my ( $name ) = @_;
+ foreach my $index ( 0 .. $#numbers ) {
+ return ( $index, $name ) if $numbers[$index] eq $name;
+ }
+ die "Bad name $name";
+ };
=head1 AUTHOR
diff --git a/lib/IO/Async/Routine.pm b/lib/IO/Async/Routine.pm
index 18671e1..72b8e14 100644
--- a/lib/IO/Async/Routine.pm
+++ b/lib/IO/Async/Routine.pm
@@ -1,14 +1,14 @@
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
-# (C) Paul Evans, 2012-2019 -- leonerd@leonerd.org.uk
+# (C) Paul Evans, 2012-2021 -- leonerd@leonerd.org.uk
package IO::Async::Routine;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Notifier );
@@ -17,51 +17,53 @@ use Carp;
use IO::Async::OS;
use IO::Async::Process;
+use Struct::Dumb qw( readonly_struct );
+
=head1 NAME
C<IO::Async::Routine> - execute code in an independent sub-process or thread
=head1 SYNOPSIS
- use IO::Async::Routine;
- use IO::Async::Channel;
+ use IO::Async::Routine;
+ use IO::Async::Channel;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $nums_ch = IO::Async::Channel->new;
- my $ret_ch = IO::Async::Channel->new;
+ my $nums_ch = IO::Async::Channel->new;
+ my $ret_ch = IO::Async::Channel->new;
- my $routine = IO::Async::Routine->new(
- channels_in => [ $nums_ch ],
- channels_out => [ $ret_ch ],
+ my $routine = IO::Async::Routine->new(
+ channels_in => [ $nums_ch ],
+ channels_out => [ $ret_ch ],
- code => sub {
- my @nums = @{ $nums_ch->recv };
- my $ret = 0; $ret += $_ for @nums;
+ code => sub {
+ my @nums = @{ $nums_ch->recv };
+ my $ret = 0; $ret += $_ for @nums;
- # Can only send references
- $ret_ch->send( \$ret );
- },
+ # Can only send references
+ $ret_ch->send( \$ret );
+ },
- on_finish => sub {
- say "The routine aborted early - $_[-1]";
- $loop->stop;
- },
- );
+ on_finish => sub {
+ say "The routine aborted early - $_[-1]";
+ $loop->stop;
+ },
+ );
- $loop->add( $routine );
+ $loop->add( $routine );
- $nums_ch->send( [ 10, 20, 30 ] );
- $ret_ch->recv(
- on_recv => sub {
- my ( $ch, $totalref ) = @_;
- say "The total of 10, 20, 30 is: $$totalref";
- $loop->stop;
- }
- );
+ $nums_ch->send( [ 10, 20, 30 ] );
+ $ret_ch->recv(
+ on_recv => sub {
+ my ( $ch, $totalref ) = @_;
+ say "The total of 10, 20, 30 is: $$totalref";
+ $loop->stop;
+ }
+ );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -70,16 +72,9 @@ sub-process or thread, allowing it to act independently of the main program.
Once set up, all communication with the code happens by values passed into or
out of the Routine via L<IO::Async::Channel> objects.
-A choice of detachment model is available, with options being a C<fork()>ed
-child process, or a thread. In both cases the code contained within the
-Routine is free to make blocking calls without stalling the rest of the
-program. This makes it useful for using existing code which has no option not
-to block within an L<IO::Async>-based program.
-
-Code running inside a C<fork()>-based Routine runs within its own process; it
-is isolated from the rest of the program in terms of memory, CPU time, and
-other resources. Code running in a thread-based Routine however, shares memory
-and other resources such as open filehandles with the main thread.
+The code contained within the Routine is free to make blocking calls without
+stalling the rest of the program. This makes it useful for using existing code
+which has no option not to block within an L<IO::Async>-based program.
To create asynchronous wrappers of functions that return a value based only on
their arguments, and do not generally maintain state within the process it may
@@ -87,6 +82,50 @@ be more convenient to use an L<IO::Async::Function> instead, which uses an
C<IO::Async::Routine> to contain the body of the function and manages the
Channels itself.
+=head2 Models
+
+A choice of detachment model is available. Each has various advantages and
+disadvantages. Not all of them may be available on a particular system.
+
+=head3 The C<fork> model
+
+The code in this model runs within its own process, created by calling
+C<fork()> from the main process. It is isolated from the rest of the program
+in terms of memory, CPU time, and other resources. Because it is started
+using C<fork()>, the initial process state is a clone of the main process.
+
+This model performs well on UNIX-like operating systems which possess a true
+native C<fork()> system call, but is not available on C<MSWin32> for example,
+because the operating system does not provide full fork-like semantics.
+
+=head3 The C<thread> model
+
+The code in this model runs inside a separate thread within the main process.
+It therefore shares memory and other resources such as open filehandles with
+the main thread. As with the C<fork> model, the initial thread state is cloned
+from the main controlling thread.
+
+This model is only available on perls built to support threading.
+
+=head3 The C<spawn> model
+
+The code in this model runs within its own freshly-created process running
+another copy of the perl interpreter. Similar to the C<fork> model it
+therefore has its own memory, CPU time, and other resources. However, since it
+is started freshly rather than by cloning the main process, it starts up in a
+clean state, without any shared resources from its parent.
+
+Since this model creates a new fresh process rather than sharing existing
+state, it cannot use the C<code> argument to specify the routine body; it must
+instead use only the C<module> and C<func> arguments.
+
+In the current implementation this model requires exactly one input channel
+and exactly one output channel; both must be present, and there cannot be more
+than one of either.
+
+This model performs well on both UNIX and Windows-like operating systems,
+because it does not need full fork semantics.
+
=cut
=head1 EVENTS
@@ -120,11 +159,10 @@ Invoked if the code block fails with an exception.
The following named parameters may be passed to C<new> or C<configure>:
-=head2 model => "fork" | "thread"
+=head2 model => "fork" | "thread" | "spawn"
Optional. Defines how the routine will detach itself from the main process.
-C<fork> uses a child process detached using an L<IO::Async::Process>.
-C<thread> uses a thread, and is only available on threaded Perls.
+See the L</Models> section above for more detail.
If the model is not specified, the environment variable
C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined,
@@ -145,6 +183,22 @@ out of the Routine.
CODE reference to the body of the Routine, to execute once the channels are
set up.
+When using the C<spawn> model, this is not permitted; you must use C<module>
+and C<func> instead.
+
+=head2 module => STRING
+
+=head2 func => STRING
+
+An alternative to the C<code> argument, which names a module to load and a
+function to call within it. C<module> should give a perl module name (i.e.
+C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give
+the basename of a function within that module (i.e. without the module name
+prefixed). It will be invoked as the main code body of the object, and passed
+in a list of all the channels; first the input ones then the output ones.
+
+ module::func( @channels_in, @channels_out )
+
=head2 setup => ARRAY
Optional. For C<fork()>-based Routines, gives a reference to an array to pass
@@ -168,20 +222,28 @@ sub _init
$self->SUPER::_init( @_ );
}
+my %SETUP_CODE;
+
sub configure
{
my $self = shift;
my %params = @_;
# TODO: Can only reconfigure when not running
- foreach (qw( channels_in channels_out code setup on_finish on_return on_die )) {
+ foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
}
+ defined $self->{code} and defined $self->{func} and
+ croak "Cannot ->configure both 'code' and 'func'";
+ defined $self->{func} and !defined $self->{module} and
+ croak "'func' parameter requires a 'module' as well";
+
if( defined( my $model = delete $params{model} ) ) {
- $model eq "fork" or $model eq "thread" or
- croak "Expected 'model' to be either 'fork' or 'thread'";
+ ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
+ or die "Unrecognised Routine model $model";
+ # TODO: optional plugin "configure" check here?
$model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
croak "Cannot use 'fork' model as fork() is not available";
$model eq "thread" and !IO::Async::OS->HAVE_THREADS and
@@ -199,39 +261,87 @@ sub _add_to_loop
my ( $loop ) = @_;
$self->SUPER::_add_to_loop( $loop );
- return $self->_setup_fork if $self->{model} eq "fork";
- return $self->_setup_thread if $self->{model} eq "thread";
+ my $model = $self->{model};
- die "TODO: unrecognised Routine model $self->{model}";
+ my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
+ or die "Unrecognised Routine model $model";
+
+ $self->$code();
}
-sub _setup_fork
+readonly_struct ChannelSetup => [qw( chan myfd otherfd )];
+
+sub _create_channels_in
{
my $self = shift;
- my @setup;
my @channels_in;
- my @channels_out;
foreach my $ch ( @{ $self->{channels_in} || [] } ) {
my ( $rd, $wr );
unless( $rd = $ch->_extract_read_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
- push @setup, $rd => "keep";
- push @channels_in, [ $ch, $wr, $rd ];
+ push @channels_in, ChannelSetup( $ch, $wr, $rd );
}
+ return @channels_in;
+}
+
+sub _create_channels_out
+{
+ my $self = shift;
+
+ my @channels_out;
+
foreach my $ch ( @{ $self->{channels_out} || [] } ) {
my ( $rd, $wr );
unless( $wr = $ch->_extract_write_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
- push @setup, $wr => "keep";
- push @channels_out, [ $ch, $rd, $wr ];
+ push @channels_out, ChannelSetup( $ch, $rd, $wr );
+ }
+
+ return @channels_out;
+}
+
+sub _adopt_channels_in
+{
+ my $self = shift;
+ my ( @channels_in ) = @_;
+
+ foreach ( @channels_in ) {
+ my $ch = $_->chan;
+ $ch->setup_async_mode( write_handle => $_->myfd );
+ $self->add_child( $ch ) unless $ch->parent;
+ }
+}
+
+sub _adopt_channels_out
+{
+ my $self = shift;
+ my ( @channels_out ) = @_;
+
+ foreach ( @channels_out ) {
+ my $ch = $_->chan;
+ $ch->setup_async_mode( read_handle => $_->myfd );
+ $self->add_child( $ch ) unless $ch->parent;
}
+}
- my $code = $self->{code};
+sub _setup_fork
+{
+ my $self = shift;
+
+ my @channels_in = $self->_create_channels_in;
+ my @channels_out = $self->_create_channels_out;
+
+ my $code = $self->{code};
+
+ my $module = $self->{module};
+ my $func = $self->{func};
+
+ my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out;
my $setup = $self->{setup};
push @setup, @$setup if $setup;
@@ -239,20 +349,22 @@ sub _setup_fork
my $process = IO::Async::Process->new(
setup => \@setup,
code => sub {
- foreach ( @channels_in ) {
- my ( $ch, undef, $rd ) = @$_;
- $ch->setup_sync_mode( $rd );
+ foreach ( @channels_in, @channels_out ) {
+ $_->chan->setup_sync_mode( $_->otherfd );
}
- foreach ( @channels_out ) {
- my ( $ch, undef, $wr ) = @$_;
- $ch->setup_sync_mode( $wr );
+
+ if( defined $module ) {
+ ( my $file = "$module.pm" ) =~ s{::}{/}g;
+ require $file;
+
+ $code = $module->can( $func ) or
+ die "Module '$module' has no '$func'\n";
}
- my $ret = $code->();
+ my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
foreach ( @channels_in, @channels_out ) {
- my ( $ch ) = @$_;
- $ch->close;
+ $_->chan->close;
}
return $ret;
@@ -276,74 +388,46 @@ sub _setup_fork
}),
);
- foreach ( @channels_in ) {
- my ( $ch, $wr ) = @$_;
-
- $ch->setup_async_mode( write_handle => $wr );
-
- $self->add_child( $ch ) unless $ch->parent;
- }
-
- foreach ( @channels_out ) {
- my ( $ch, $rd ) = @$_;
-
- $ch->setup_async_mode( read_handle => $rd );
-
- $self->add_child( $ch ) unless $ch->parent;
- }
+ $self->_adopt_channels_in ( @channels_in );
+ $self->_adopt_channels_out( @channels_out );
$self->add_child( $self->{process} = $process );
$self->{id} = "P" . $process->pid;
- foreach ( @channels_in, @channels_out ) {
- my ( undef, undef, $other ) = @$_;
- $other->close;
- }
+ $_->otherfd->close for @channels_in, @channels_out;
}
sub _setup_thread
{
my $self = shift;
- my @channels_in;
- my @channels_out;
-
- foreach my $ch ( @{ $self->{channels_in} || [] } ) {
- my ( $rd, $wr );
- unless( $rd = $ch->_extract_read_handle ) {
- ( $rd, $wr ) = IO::Async::OS->pipepair;
- }
- push @channels_in, [ $ch, $wr, $rd ];
- }
-
- foreach my $ch ( @{ $self->{channels_out} || [] } ) {
- my ( $rd, $wr );
- unless( $wr = $ch->_extract_write_handle ) {
- ( $rd, $wr ) = IO::Async::OS->pipepair;
- }
- push @channels_out, [ $ch, $rd, $wr ];
- }
+ my @channels_in = $self->_create_channels_in;
+ my @channels_out = $self->_create_channels_out;
my $code = $self->{code};
+ my $module = $self->{module};
+ my $func = $self->{func};
+
my $tid = $self->loop->create_thread(
code => sub {
- foreach ( @channels_in ) {
- my ( $ch, $wr, $rd ) = @$_;
- $ch->setup_sync_mode( $rd );
- $wr->close if $wr;
+ foreach ( @channels_in, @channels_out ) {
+ $_->chan->setup_sync_mode( $_->otherfd );
+ $_->myfd->close;
}
- foreach ( @channels_out ) {
- my ( $ch, $rd, $wr ) = @$_;
- $ch->setup_sync_mode( $wr );
- $rd->close if $rd;
+
+ if( defined $func ) {
+ ( my $file = "$module.pm" ) =~ s{::}{/}g;
+ require $file;
+
+ $code = $module->can( $func ) or
+ die "Module '$module' has no '$func'\n";
}
- my $ret = $code->();
+ my $ret = $code->( map { $_->chan } @channels_in, @channels_out );
foreach ( @channels_in, @channels_out ) {
- my ( $ch ) = @$_;
- $ch->close;
+ $_->chan->close;
}
return $ret;
@@ -369,23 +453,72 @@ sub _setup_thread
$self->{tid} = $tid;
$self->{id} = "T" . $tid;
- foreach ( @channels_in ) {
- my ( $ch, $wr, $rd ) = @$_;
+ $self->_adopt_channels_in ( @channels_in );
+ $self->_adopt_channels_out( @channels_out );
- $ch->setup_async_mode( write_handle => $wr );
- $rd->close;
+ $_->otherfd->close for @channels_in, @channels_out;
+}
- $self->add_child( $ch ) unless $ch->parent;
- }
+# The injected program that goes into spawn mode
+use constant PERL_RUNNER => <<'EOF';
+( my ( $module, $func ), @INC ) = @ARGV;
+( my $file = "$module.pm" ) =~ s{::}{/}g;
+require $file;
+my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n";
+require IO::Async::Channel;
+exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout );
+EOF
+
+sub _setup_spawn
+{
+ my $self = shift;
- foreach ( @channels_out ) {
- my ( $ch, $rd, $wr ) = @$_;
+ $self->{code} and
+ die "Cannot run IO::Async::Routine in 'spawn' with code\n";
- $ch->setup_async_mode( read_handle => $rd );
- $wr->close;
+ @{ $self->{channels_in} } == 1 or
+ die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n";
+ @{ $self->{channels_out} } == 1 or
+ die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n";
- $self->add_child( $ch ) unless $ch->parent;
- }
+ my @channels_in = $self->_create_channels_in;
+ my @channels_out = $self->_create_channels_out;
+
+ my $module = $self->{module};
+ my $func = $self->{func};
+
+ my $process = IO::Async::Process->new(
+ setup => [
+ stdin => $channels_in[0]->otherfd,
+ stdout => $channels_out[0]->otherfd,
+ ],
+ command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ],
+ on_finish => $self->_replace_weakself( sub {
+ my $self = shift or return;
+ my ( $exitcode ) = @_;
+ $self->maybe_invoke_event( on_finish => $exitcode );
+
+ unless( $exitcode & 0x7f ) {
+ $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
+ $self->result_future->done( $exitcode >> 8 );
+ }
+ }),
+ on_exception => $self->_replace_weakself( sub {
+ my $self = shift or return;
+ my ( $exception, $errno, $exitcode ) = @_;
+
+ $self->maybe_invoke_event( on_die => $exception );
+ $self->result_future->fail( $exception, routine => );
+ }),
+ );
+
+ $self->_adopt_channels_in ( @channels_in );
+ $self->_adopt_channels_out( @channels_out );
+
+ $self->add_child( $self->{process} = $process );
+ $self->{id} = "P" . $process->pid;
+
+ $_->otherfd->close for @channels_in, @channels_out;
}
=head1 METHODS
diff --git a/lib/IO/Async/Signal.pm b/lib/IO/Async/Signal.pm
index 4eac03d..089afec 100644
--- a/lib/IO/Async/Signal.pm
+++ b/lib/IO/Async/Signal.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Notifier );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -19,22 +19,22 @@ C<IO::Async::Signal> - event callback on receipt of a POSIX signal
=head1 SYNOPSIS
- use IO::Async::Signal;
+ use IO::Async::Signal;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $signal = IO::Async::Signal->new(
- name => "HUP",
+ my $signal = IO::Async::Signal->new(
+ name => "HUP",
- on_receipt => sub {
- print "I caught SIGHUP\n";
- },
- );
+ on_receipt => sub {
+ print "I caught SIGHUP\n";
+ },
+ );
- $loop->add( $signal );
+ $loop->add( $signal );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
diff --git a/lib/IO/Async/Socket.pm b/lib/IO/Async/Socket.pm
index 149d608..5597f34 100644
--- a/lib/IO/Async/Socket.pm
+++ b/lib/IO/Async/Socket.pm
@@ -8,7 +8,7 @@ package IO::Async::Socket;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Handle );
@@ -23,34 +23,34 @@ filehandle
=head1 SYNOPSIS
- use IO::Async::Socket;
+ use IO::Async::Socket;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $socket = IO::Async::Socket->new(
- on_recv => sub {
- my ( $self, $dgram, $addr ) = @_;
+ my $socket = IO::Async::Socket->new(
+ on_recv => sub {
+ my ( $self, $dgram, $addr ) = @_;
- print "Received reply: $dgram\n",
- $loop->stop;
- },
- on_recv_error => sub {
- my ( $self, $errno ) = @_;
- die "Cannot recv - $errno\n";
- },
- );
- $loop->add( $socket );
+ print "Received reply: $dgram\n",
+ $loop->stop;
+ },
+ on_recv_error => sub {
+ my ( $self, $errno ) = @_;
+ die "Cannot recv - $errno\n";
+ },
+ );
+ $loop->add( $socket );
- $socket->connect(
- host => "some.host.here",
- service => "echo",
- socktype => 'dgram',
- )->get;
+ $socket->connect(
+ host => "some.host.here",
+ service => "echo",
+ socktype => 'dgram',
+ )->get;
- $socket->send( "A TEST DATAGRAM" );
+ $socket->send( "A TEST DATAGRAM" );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -324,22 +324,22 @@ sub on_write_ready
C<UDP> is carried by the C<SOCK_DGRAM> socket type, for which the string
C<'dgram'> is a convenient shortcut:
- $socket->connect(
- host => $hostname,
- service => $service,
- socktype => 'dgram',
- ...
- )
+ $socket->connect(
+ host => $hostname,
+ service => $service,
+ socktype => 'dgram',
+ ...
+ )
=head2 Receive-first on a UDP Socket
A typical server pattern with C<UDP> involves binding a well-known port
number instead of connecting to one, and waiting on incoming packets.
- $socket->bind(
- service => 12345,
- socktype => 'dgram',
- )->get;
+ $socket->bind(
+ service => 12345,
+ socktype => 'dgram',
+ )->get;
=head1 SEE ALSO
diff --git a/lib/IO/Async/Stream.pm b/lib/IO/Async/Stream.pm
index 0d928c1..75c0900 100644
--- a/lib/IO/Async/Stream.pm
+++ b/lib/IO/Async/Stream.pm
@@ -8,7 +8,7 @@ package IO::Async::Stream;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use base qw( IO::Async::Handle );
@@ -50,33 +50,33 @@ filehandle
=head1 SYNOPSIS
- use IO::Async::Stream;
+ use IO::Async::Stream;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $stream = IO::Async::Stream->new(
- read_handle => \*STDIN,
- write_handle => \*STDOUT,
+ my $stream = IO::Async::Stream->new(
+ read_handle => \*STDIN,
+ write_handle => \*STDOUT,
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*\n)// ) {
- print "Received a line $1";
- }
+ while( $$buffref =~ s/^(.*\n)// ) {
+ print "Received a line $1";
+ }
- if( $eof ) {
- print "EOF; last partial line is $$buffref\n";
- }
+ if( $eof ) {
+ print "EOF; last partial line is $$buffref\n";
+ }
- return 0;
- }
- );
+ return 0;
+ }
+ );
- $loop->add( $stream );
+ $loop->add( $stream );
- $stream->write( "An initial line here\n" );
+ $stream->write( "An initial line here\n" );
=head1 DESCRIPTION
@@ -180,7 +180,7 @@ and re-enable notifications again once something has read enough to cause it to
drop. If these events are overridden, the overriding code will have to perform
this behaviour if required, by using
- $self->want_readready_for_read(...)
+ $self->want_readready_for_read(...)
=head2 on_outgoing_empty
@@ -325,8 +325,8 @@ Optional. If defined, gives the name of a method or a CODE reference to use
to implement the actual reading from or writing to the filehandle. These will
be invoked as
- $stream->reader( $read_handle, $buffer, $len )
- $stream->writer( $write_handle, $buffer, $len )
+ $stream->reader( $read_handle, $buffer, $len )
+ $stream->writer( $write_handle, $buffer, $len )
Each is expected to modify the passed buffer; C<reader> by appending to it,
C<writer> by removing a prefix from it. Each is expected to return a true
@@ -571,8 +571,8 @@ handles, and removes the stream from its containing loop. If the write buffer
still contains data, then this is deferred until the buffer is empty. This is
intended for "write-then-close" one-shot streams.
- $stream->write( "Here is my final data\n" );
- $stream->close_when_empty;
+ $stream->write( "Here is my final data\n" );
+ $stream->close_when_empty;
Because of this deferred nature, it may not be suitable for error handling.
See instead the C<close_now> method.
@@ -668,14 +668,14 @@ C<Future>s, as well as plain strings.
For example, to stream the contents of an existing opened filehandle:
- open my $fileh, "<", $path or die "Cannot open $path - $!";
+ open my $fileh, "<", $path or die "Cannot open $path - $!";
- $stream->write( sub {
- my ( $stream ) = @_;
+ $stream->write( sub {
+ my ( $stream ) = @_;
- sysread $fileh, my $buffer, 8192 or return;
- return $buffer;
- } );
+ sysread $fileh, my $buffer, 8192 or return;
+ return $buffer;
+ } );
Takes the following optional named parameters in C<%params>:
@@ -693,7 +693,7 @@ that were written by this call, which may not be the entire length of the
buffer - if it takes more than one C<syscall> operation to empty the buffer
then this callback will be invoked multiple times.
- $on_write->( $stream, $len )
+ $on_write->( $stream, $len )
=item on_flush => CODE
@@ -701,14 +701,14 @@ A CODE reference which will be invoked once the data queued by this C<write>
call has been flushed. This will be invoked even if the buffer itself is not
yet empty; if more data has been queued since the call.
- $on_flush->( $stream )
+ $on_flush->( $stream )
=item on_error => CODE
A CODE reference which will be invoked if a C<syswrite> error happens while
performing this write. Invoked as for the C<Stream>'s C<on_write_error> event.
- $on_error->( $stream, $errno )
+ $on_error->( $stream, $errno )
=back
@@ -1117,9 +1117,9 @@ enough data has been read by the Stream into its buffer. At this point, the
data is removed from the buffer and given to the C<Future> object to complete
it.
- my $f = $stream->read_...
+ my $f = $stream->read_...
- my ( $string ) = $f->get;
+ my ( $string ) = $f->get;
Unlike the C<on_read> event handlers, these methods don't allow for access to
"partial" results; they only provide the final result once it is ready.
@@ -1133,17 +1133,17 @@ C<Future>-returning methods instead of the C<on_read> event, it may be useful
to configure a trivial return-false event handler to keep it from consuming
any input, and to allow it to be added to a C<Loop> in the first place.
- my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
- $loop->add( $stream );
+ my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
+ $loop->add( $stream );
- my $f = $stream->read_...
+ my $f = $stream->read_...
If a read EOF or error condition happens while there are read C<Future>s
pending, they are all completed. In the case of a read EOF, they are done with
C<undef>; in the case of a read error they are failed using the C<$!> error
value as the failure.
- $f->fail( $message, sysread => $! )
+ $f->fail( $message, sysread => $! )
If a read EOF condition happens to the currently-processing read C<Future>, it
will return a partial result. The calling code can detect this by the fact
@@ -1152,7 +1152,7 @@ short in C<read_exactly>'s case, or lacking the ending pattern in
C<read_until>'s case). Additionally, each C<Future> will yield the C<$eof>
value in its results.
- my ( $string, $eof ) = $f->get;
+ my ( $string, $eof ) = $f->get;
=cut
@@ -1340,17 +1340,17 @@ Log byte buffers as data is written to a Stream
The following C<on_read> method accepts incoming C<\n>-terminated lines and
prints them to the program's C<STDOUT> stream.
- sub on_read
- {
- my $self = shift;
- my ( $buffref, $eof ) = @_;
+ sub on_read
+ {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
- while( $$buffref =~ s/^(.*\n)// ) {
- print "Received a line: $1";
- }
+ while( $$buffref =~ s/^(.*\n)// ) {
+ print "Received a line: $1";
+ }
- return 0;
- }
+ return 0;
+ }
Because a reference to the buffer itself is passed, it is simple to use a
C<s///> regular expression on the scalar it points at, to both check if data
@@ -1363,23 +1363,23 @@ again when it has finished, so it can return a constant C<0>.
This C<on_read> method accepts incoming records in 16-byte chunks, printing
each one.
- sub on_read
- {
- my ( $self, $buffref, $eof ) = @_;
+ sub on_read
+ {
+ my ( $self, $buffref, $eof ) = @_;
- if( length $$buffref >= 16 ) {
- my $record = substr( $$buffref, 0, 16, "" );
- print "Received a 16-byte record: $record\n";
+ if( length $$buffref >= 16 ) {
+ my $record = substr( $$buffref, 0, 16, "" );
+ print "Received a 16-byte record: $record\n";
- return 1;
- }
+ return 1;
+ }
- if( $eof and length $$buffref ) {
- print "EOF: a partial record still exists\n";
- }
+ if( $eof and length $$buffref ) {
+ print "EOF: a partial record still exists\n";
+ }
- return 0;
- }
+ return 0;
+ }
This time, rather than a C<while()> loop we have decided to have the handler
just process one record, and use the C<return 1> mechanism to ask that the
@@ -1394,23 +1394,23 @@ A lot of protocols use a fixed-size header, followed by a variable-sized body
of data, whose size is given by one of the fields of the header. The following
C<on_read> method extracts messages in such a protocol.
- sub on_read
- {
- my ( $self, $buffref, $eof ) = @_;
+ sub on_read
+ {
+ my ( $self, $buffref, $eof ) = @_;
- return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
+ return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
- my ( $len, $x, $y ) = unpack "N n n", $$buffref;
+ my ( $len, $x, $y ) = unpack "N n n", $$buffref;
- return 0 unless length $$buffref >= 8 + $len;
+ return 0 unless length $$buffref >= 8 + $len;
- substr( $$buffref, 0, 8, "" );
- my $data = substr( $$buffref, 0, $len, "" );
+ substr( $$buffref, 0, 8, "" );
+ my $data = substr( $$buffref, 0, $len, "" );
- print "A record with values x=$x y=$y\n";
+ print "A record with values x=$x y=$y\n";
- return 1;
- }
+ return 1;
+ }
In this example, the header is C<unpack()>ed first, to extract the body
length, and then the body is extracted. If the buffer does not have enough
@@ -1425,41 +1425,41 @@ C<\n>-terminated lines that may have an optional data block attached. The
presence of such a data block, as well as its size, is indicated by the line
prefix.
- sub on_read
- {
- my $self = shift;
- my ( $buffref, $eof ) = @_;
+ sub on_read
+ {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
- if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
- my $length = $1;
- my $line = $2;
+ if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
+ my $length = $1;
+ my $line = $2;
- return sub {
- my $self = shift;
- my ( $buffref, $eof ) = @_;
+ return sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
- return 0 unless length $$buffref >= $length;
+ return 0 unless length $$buffref >= $length;
- # Take and remove the data from the buffer
- my $data = substr( $$buffref, 0, $length, "" );
+ # Take and remove the data from the buffer
+ my $data = substr( $$buffref, 0, $length, "" );
- print "Received a line $line with some data ($data)\n";
+ print "Received a line $line with some data ($data)\n";
- return undef; # Restore the original method
- }
- }
- elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
- my $line = $1;
+ return undef; # Restore the original method
+ }
+ }
+ elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
+ my $line = $1;
- print "Received a line $line with no data\n";
+ print "Received a line $line with no data\n";
- return 1;
- }
- else {
- print STDERR "Unrecognised input\n";
- # Handle it somehow
- }
- }
+ return 1;
+ }
+ else {
+ print STDERR "Unrecognised input\n";
+ # Handle it somehow
+ }
+ }
In the case where trailing data is supplied, a new temporary C<on_read>
callback is provided in a closure. This closure captures the C<$length>
diff --git a/lib/IO/Async/Test.pm b/lib/IO/Async/Test.pm
index 78a0512..3695ac9 100644
--- a/lib/IO/Async/Test.pm
+++ b/lib/IO/Async/Test.pm
@@ -8,7 +8,7 @@ package IO::Async::Test;
use strict;
use warnings;
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Exporter 'import';
our @EXPORT = qw(
@@ -24,37 +24,37 @@ C<IO::Async::Test> - utility functions for use in test scripts
=head1 SYNOPSIS
- use Test::More tests => 1;
- use IO::Async::Test;
+ use Test::More tests => 1;
+ use IO::Async::Test;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
- testing_loop( $loop );
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
+ testing_loop( $loop );
- my $result;
+ my $result;
- $loop->do_something(
- some => args,
+ $loop->do_something(
+ some => args,
- on_done => sub {
- $result = the_outcome;
- }
- );
+ on_done => sub {
+ $result = the_outcome;
+ }
+ );
- wait_for { defined $result };
+ wait_for { defined $result };
- is( $result, what_we_expected, 'The event happened' );
+ is( $result, what_we_expected, 'The event happened' );
- ...
+ ...
- my $buffer = "";
- my $handle = IO::Handle-> ...
+ my $buffer = "";
+ my $handle = IO::Handle-> ...
- wait_for_stream { length $buffer >= 10 } $handle => $buffer;
+ wait_for_stream { length $buffer >= 10 } $handle => $buffer;
- is( substr( $buffer, 0, 10, "" ), "0123456789", 'Buffer was correct' );
+ is( substr( $buffer, 0, 10, "" ), "0123456789", 'Buffer was correct' );
- my $result = wait_for_future( $stream->read_until( "\n" ) )->get;
+ my $result = wait_for_future( $stream->read_until( "\n" ) )->get;
=head1 DESCRIPTION
diff --git a/lib/IO/Async/Timer.pm b/lib/IO/Async/Timer.pm
index f00fd8a..7ef08c3 100644
--- a/lib/IO/Async/Timer.pm
+++ b/lib/IO/Async/Timer.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Notifier );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -129,7 +129,7 @@ duration after the time it was added.
As a convenience, C<$timer> is returned. This may be useful for starting
timers at construction time:
- $loop->add( IO::Async::Timer->new( ... )->start );
+ $loop->add( IO::Async::Timer->new( ... )->start );
=cut
diff --git a/lib/IO/Async/Timer/Absolute.pm b/lib/IO/Async/Timer/Absolute.pm
index 6869430..d1b779a 100644
--- a/lib/IO/Async/Timer/Absolute.pm
+++ b/lib/IO/Async/Timer/Absolute.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Timer );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -19,27 +19,27 @@ C<IO::Async::Timer::Absolute> - event callback at a fixed future time
=head1 SYNOPSIS
- use IO::Async::Timer::Absolute;
+ use IO::Async::Timer::Absolute;
- use POSIX qw( mktime );
+ use POSIX qw( mktime );
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my @time = gmtime;
+ my @time = gmtime;
- my $timer = IO::Async::Timer::Absolute->new(
- time => mktime( 0, 0, 0, $time[3]+1, $time[4], $time[5] ),
+ my $timer = IO::Async::Timer::Absolute->new(
+ time => mktime( 0, 0, 0, $time[3]+1, $time[4], $time[5] ),
- on_expire => sub {
- print "It's midnight\n";
- $loop->stop;
- },
- );
+ on_expire => sub {
+ print "It's midnight\n";
+ $loop->stop;
+ },
+ );
- $loop->add( $timer );
+ $loop->add( $timer );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
diff --git a/lib/IO/Async/Timer/Countdown.pm b/lib/IO/Async/Timer/Countdown.pm
index 138ea3e..49b3a1d 100644
--- a/lib/IO/Async/Timer/Countdown.pm
+++ b/lib/IO/Async/Timer/Countdown.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Timer );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -19,25 +19,25 @@ C<IO::Async::Timer::Countdown> - event callback after a fixed delay
=head1 SYNOPSIS
- use IO::Async::Timer::Countdown;
+ use IO::Async::Timer::Countdown;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $timer = IO::Async::Timer::Countdown->new(
- delay => 10,
+ my $timer = IO::Async::Timer::Countdown->new(
+ delay => 10,
- on_expire => sub {
- print "Sorry, your time's up\n";
- $loop->stop;
- },
- );
+ on_expire => sub {
+ print "Sorry, your time's up\n";
+ $loop->stop;
+ },
+ );
- $timer->start;
+ $timer->start;
- $loop->add( $timer );
+ $loop->add( $timer );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
@@ -196,42 +196,42 @@ expire and run its callback.
For example, to expire an accepted connection after 30 seconds of inactivity:
- ...
+ ...
- on_accept => sub {
- my ( $newclient ) = @_;
+ on_accept => sub {
+ my ( $newclient ) = @_;
- my $watchdog = IO::Async::Timer::Countdown->new(
- delay => 30,
+ my $watchdog = IO::Async::Timer::Countdown->new(
+ delay => 30,
- on_expire => sub {
- my $self = shift;
+ on_expire => sub {
+ my $self = shift;
- my $stream = $self->parent;
- $stream->close;
- },
- );
+ my $stream = $self->parent;
+ $stream->close;
+ },
+ );
- my $stream = IO::Async::Stream->new(
- handle => $newclient,
+ my $stream = IO::Async::Stream->new(
+ handle => $newclient,
- on_read => sub {
- my ( $self, $buffref, $eof ) = @_;
- $watchdog->reset;
+ on_read => sub {
+ my ( $self, $buffref, $eof ) = @_;
+ $watchdog->reset;
- ...
- },
+ ...
+ },
- on_closed => sub {
- $watchdog->stop;
- },
- ) );
+ on_closed => sub {
+ $watchdog->stop;
+ },
+ ) );
- $stream->add_child( $watchdog );
- $watchdog->start;
+ $stream->add_child( $watchdog );
+ $watchdog->start;
- $loop->add( $watchdog );
- }
+ $loop->add( $watchdog );
+ }
Rather than setting up a lexical variable to store the Stream so that the
Timer's C<on_expire> closure can call C<close> on it, the parent/child
@@ -251,20 +251,20 @@ creates an arrangement similar to an L<IO::Async::Timer::Periodic>, except
that it will wait until the previous invocation has indicated it is finished,
before starting the countdown for the next call.
- my $timer = IO::Async::Timer::Countdown->new(
- delay => 60,
+ my $timer = IO::Async::Timer::Countdown->new(
+ delay => 60,
- on_expire => sub {
- my $self = shift;
+ on_expire => sub {
+ my $self = shift;
- start_some_operation(
- on_complete => sub { $self->start },
- );
- },
- );
+ start_some_operation(
+ on_complete => sub { $self->start },
+ );
+ },
+ );
- $timer->start;
- $loop->add( $timer );
+ $timer->start;
+ $loop->add( $timer );
This example invokes the C<start_some_operation> function 60 seconds after the
previous iteration has indicated it has finished.
diff --git a/lib/IO/Async/Timer/Periodic.pm b/lib/IO/Async/Timer/Periodic.pm
index 07e1b99..523303c 100644
--- a/lib/IO/Async/Timer/Periodic.pm
+++ b/lib/IO/Async/Timer/Periodic.pm
@@ -9,7 +9,7 @@ use strict;
use warnings;
use base qw( IO::Async::Timer );
-our $VERSION = '0.78';
+our $VERSION = '0.79';
use Carp;
@@ -19,24 +19,24 @@ C<IO::Async::Timer::Periodic> - event callback at regular intervals
=head1 SYNOPSIS
- use IO::Async::Timer::Periodic;
+ use IO::Async::Timer::Periodic;
- use IO::Async::Loop;
- my $loop = IO::Async::Loop->new;
+ use IO::Async::Loop;
+ my $loop = IO::Async::Loop->new;
- my $timer = IO::Async::Timer::Periodic->new(
- interval => 60,
+ my $timer = IO::Async::Timer::Periodic->new(
+ interval => 60,
- on_tick => sub {
- print "You've had a minute\n";
- },
- );
+ on_tick => sub {
+ print "You've had a minute\n";
+ },
+ );
- $timer->start;
+ $timer->start;
- $loop->add( $timer );
+ $loop->add( $timer );
- $loop->run;
+ $loop->run;
=head1 DESCRIPTION
diff --git a/t/25socket.t b/t/25socket.t
index 26847b0..92a5ac5 100644
--- a/t/25socket.t
+++ b/t/25socket.t
@@ -161,7 +161,7 @@ SKIP: {
'Allowed to construct a Socket without an on_recv handler' );
ok( exception { $loop->add( $no_on_recv_socket ) },
'Not allowed to add an on_recv-less Socket to a Loop' );
- }
+}
# Subclass
diff --git a/t/41routine.t b/t/41routine.t
index 8c84ae4..b0f803b 100644
--- a/t/41routine.t
+++ b/t/41routine.t
@@ -14,6 +14,8 @@ use IO::Async::Routine;
use IO::Async::Channel;
use IO::Async::Loop;
+use lib ".";
+
my $loop = IO::Async::Loop->new_builtin;
testing_loop( $loop );
@@ -158,6 +160,40 @@ foreach my $model (qw( fork thread )) {
}
}
+foreach my $model (qw( fork thread spawn )) {
+ SKIP: {
+ skip "This Perl does not support threads", 1
+ if $model eq "thread" and not IO::Async::OS->HAVE_THREADS;
+ skip "This Perl does not support fork()", 1
+ if $model eq "fork" and not IO::Async::OS->HAVE_POSIX_FORK;
+
+ my $in = IO::Async::Channel->new;
+ my $out = IO::Async::Channel->new;
+
+ my $routine = IO::Async::Routine->new(
+ model => $model,
+ module => "t::RoutineTester",
+ func => "test_routine",
+ channels_in => [ $in ],
+ channels_out => [ $out ],
+ on_finish => sub {
+ print STDERR "Process exited @_\n";
+ },
+ );
+
+ $loop->add( $routine );
+
+ $in->send( \"value" );
+
+ my $f = wait_for_future $out->recv;
+
+ my $result = eval { $f->get };
+ is( ${$result}, "VALUE", "Result for $model model via module+func" );
+
+ $loop->remove( $routine );
+ }
+}
+
# multiple channels in and out
{
my $in1 = IO::Async::Channel->new;
diff --git a/t/42function.t b/t/42function.t
index 0140b9c..69eb919 100644
--- a/t/42function.t
+++ b/t/42function.t
@@ -362,6 +362,29 @@ SKIP: {
$loop->remove( $function );
}
+# module + func in all models
+foreach my $model (qw( fork thread spawn )) {
+ SKIP: {
+ skip "This Perl does not support threads", 9
+ if $model eq "thread" and not IO::Async::OS->HAVE_THREADS;
+ skip "This Perl does not support fork()", 9
+ if $model eq "fork" and not IO::Async::OS->HAVE_POSIX_FORK;
+
+ my $function = IO::Async::Function->new(
+ model => $model,
+ # We're sure to have List::Util::sum available as that has been core since 5.8
+ module => "List::Util",
+ func => "sum",
+ );
+
+ $loop->add( $function );
+
+ my $f = wait_for_future $function->call( args => [ 10, 20, 30 ] );
+
+ is( scalar $f->get, 60, "result by module + func in '$model' model" );
+ }
+}
+
## Now test that parallel runs really are parallel
{
# touch $dir/$n in each worker, touch $dir/done to finish it
diff --git a/t/RoutineTester.pm b/t/RoutineTester.pm
new file mode 100644
index 0000000..9b043b5
--- /dev/null
+++ b/t/RoutineTester.pm
@@ -0,0 +1,16 @@
+package t::RoutineTester;
+
+use strict;
+use warnings;
+
+sub test_routine
+{
+ my ( $in, $out ) = @_;
+
+ while( my $ref = $in->recv ) {
+ my $value = $$ref;
+ $out->send( \ uc $value );
+ }
+}
+
+0x55AA;