summaryrefslogtreecommitdiff
path: root/lib/IO/Async/Routine.pm
blob: 0420bc5886295a36b92d93c3e2f2b890e193c6a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2012-2021 -- leonerd@leonerd.org.uk

package IO::Async::Routine;

use strict;
use warnings;

our $VERSION = '0.802';

use base qw( IO::Async::Notifier );

use Carp;

use IO::Async::OS;
use IO::Async::Process;

use Struct::Dumb qw( readonly_struct );

=head1 NAME

C<IO::Async::Routine> - execute code in an independent sub-process or thread

=head1 SYNOPSIS

   use IO::Async::Routine;
   use IO::Async::Channel;

   use IO::Async::Loop;
   my $loop = IO::Async::Loop->new;

   my $nums_ch = IO::Async::Channel->new;
   my $ret_ch  = IO::Async::Channel->new;

   my $routine = IO::Async::Routine->new(
      channels_in  => [ $nums_ch ],
      channels_out => [ $ret_ch ],

      code => sub {
         my @nums = @{ $nums_ch->recv };
         my $ret = 0; $ret += $_ for @nums;

         # Can only send references
         $ret_ch->send( \$ret );
      },

      on_finish => sub {
         say "The routine aborted early - $_[-1]";
         $loop->stop;
      },
   );

   $loop->add( $routine );

   $nums_ch->send( [ 10, 20, 30 ] );
   $ret_ch->recv(
      on_recv => sub {
         my ( $ch, $totalref ) = @_;
         say "The total of 10, 20, 30 is: $$totalref";
         $loop->stop;
      }
   );

   $loop->run;

=head1 DESCRIPTION

This L<IO::Async::Notifier> contains a body of code and executes it in a
sub-process or thread, allowing it to act independently of the main program.
Once set up, all communication with the code happens by values passed into or
out of the Routine via L<IO::Async::Channel> objects.

The code contained within the Routine is free to make blocking calls without
stalling the rest of the program. This makes it useful for using existing code
which has no option not to block within an L<IO::Async>-based program.

To create asynchronous wrappers of functions that return a value based only on
their arguments, and do not generally maintain state within the process it may
be more convenient to use an L<IO::Async::Function> instead, which uses an
C<IO::Async::Routine> to contain the body of the function and manages the
Channels itself.

=head2 Models

A choice of detachment model is available. Each has various advantages and
disadvantages. Not all of them may be available on a particular system.

=head3 The C<fork> model

The code in this model runs within its own process, created by calling
C<fork()> from the main process. It is isolated from the rest of the program
in terms of memory, CPU time, and other resources. Because it is started
using C<fork()>, the initial process state is a clone of the main process.

This model performs well on UNIX-like operating systems which possess a true
native C<fork()> system call, but is not available on C<MSWin32> for example,
because the operating system does not provide full fork-like semantics.

=head3 The C<thread> model

The code in this model runs inside a separate thread within the main process.
It therefore shares memory and other resources such as open filehandles with
the main thread. As with the C<fork> model, the initial thread state is cloned
from the main controlling thread.

This model is only available on perls built to support threading.

=head3 The C<spawn> model

I<Since version 0.79.>

The code in this model runs within its own freshly-created process running
another copy of the perl interpreter. Similar to the C<fork> model it
therefore has its own memory, CPU time, and other resources. However, since it
is started freshly rather than by cloning the main process, it starts up in a
clean state, without any shared resources from its parent.

Since this model creates a new fresh process rather than sharing existing
state, it cannot use the C<code> argument to specify the routine body; it must
instead use only the C<module> and C<func> arguments.

In the current implementation this model requires exactly one input channel
and exactly one output channel; both must be present, and there cannot be more
than one of either.

=cut

=head1 EVENTS

=head2 on_finish $exitcode

For C<fork()>-based Routines, this is invoked after the process has exited and
is passed the raw exitcode status.

=head2 on_finish $type, @result

For thread-based Routines, this is invoked after the thread has returned from
its code block and is passed the C<on_joined> result.

As the behaviour of these events differs per model, it may be more convenient
to use C<on_return> and C<on_die> instead.

=head2 on_return $result

Invoked if the code block returns normally. Note that C<fork()>-based Routines
can only transport an integer result between 0 and 255, as this is the actual
C<exit()> value.

=head2 on_die $exception

Invoked if the code block fails with an exception.

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 model => "fork" | "thread" | "spawn"

Optional. Defines how the routine will detach itself from the main process.
See the L</Models> section above for more detail.

If the model is not specified, the environment variable
C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined,
C<fork> is preferred if it is available, otherwise C<thread>.

=head2 channels_in => ARRAY of IO::Async::Channel

ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
in to the Routine.

=head2 channels_out => ARRAY of IO::Async::Channel

ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
out of the Routine.

=head2 code => CODE

CODE reference to the body of the Routine, to execute once the channels are
set up.

When using the C<spawn> model, this is not permitted; you must use C<module>
and C<func> instead.

=head2 module => STRING

=head2 func => STRING

I<Since version 0.79.>

An alternative to the C<code> argument, which names a module to load and a
function to call within it. C<module> should give a perl module name (i.e.
C<Some::Name>, not a filename like F<Some/Name.pm>), and C<func> should give
the basename of a function within that module (i.e. without the module name
prefixed). It will be invoked as the main code body of the object, and passed
in a list of all the channels; first the input ones then the output ones.

   module::func( @channels_in, @channels_out )

=head2 setup => ARRAY

Optional. For C<fork()>-based Routines, gives a reference to an array to pass
to the underlying C<Loop> C<fork_child> method. Ignored for thread-based
Routines.

=cut

use constant PREFERRED_MODEL =>
   IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
   IO::Async::OS->HAVE_THREADS    ? "thread" :
      die "No viable Routine models";

sub _init
{
   my $self = shift;
   my ( $params ) = @_;

   $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;

   $self->SUPER::_init( @_ );
}

my %SETUP_CODE;

sub configure
{
   my $self = shift;
   my %params = @_;

   # TODO: Can only reconfigure when not running
   foreach (qw( channels_in channels_out code module func setup on_finish on_return on_die )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   defined $self->{code} and defined $self->{func} and
      croak "Cannot ->configure both 'code' and 'func'";
   defined $self->{func} and !defined $self->{module} and
      croak "'func' parameter requires a 'module' as well";

   if( defined( my $model = delete $params{model} ) ) {
      ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
         or die "Unrecognised Routine model $model";

      # TODO: optional plugin "configure" check here?
      $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
         croak "Cannot use 'fork' model as fork() is not available";
      $model eq "thread" and !IO::Async::OS->HAVE_THREADS and
         croak "Cannot use 'thread' model as threads are not available";

      $self->{model} = $model;
   }

   $self->SUPER::configure( %params );
}

sub _add_to_loop
{
   my $self = shift;
   my ( $loop ) = @_;
   $self->SUPER::_add_to_loop( $loop );

   my $model = $self->{model};

   my $code = ( $SETUP_CODE{$model} ||= $self->can( "_setup_$model" ) )
      or die "Unrecognised Routine model $model";

   $self->$code();
}

readonly_struct ChannelSetup => [qw( chan myfd otherfd )];

sub _create_channels_in
{
   my $self = shift;

   my @channels_in;

   foreach my $ch ( @{ $self->{channels_in} || [] } ) {
      my ( $rd, $wr );
      unless( $rd = $ch->_extract_read_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @channels_in, ChannelSetup( $ch, $wr, $rd );
   }

   return @channels_in;
}

sub _create_channels_out
{
   my $self = shift;

   my @channels_out;

   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
      my ( $rd, $wr );
      unless( $wr = $ch->_extract_write_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @channels_out, ChannelSetup( $ch, $rd, $wr );
   }

   return @channels_out;
}

sub _adopt_channels_in
{
   my $self = shift;
   my ( @channels_in ) = @_;

   foreach ( @channels_in ) {
      my $ch = $_->chan;
      $ch->setup_async_mode( write_handle => $_->myfd );
      $self->add_child( $ch ) unless $ch->parent;
   }
}

sub _adopt_channels_out
{
   my $self = shift;
   my ( @channels_out ) = @_;

   foreach ( @channels_out ) {
      my $ch = $_->chan;
      $ch->setup_async_mode( read_handle => $_->myfd );
      $self->add_child( $ch ) unless $ch->parent;
   }
}

sub _setup_fork
{
   my $self = shift;

   my @channels_in  = $self->_create_channels_in;
   my @channels_out = $self->_create_channels_out;

   my $code = $self->{code};

   my $module = $self->{module};
   my $func   = $self->{func};

   my @setup = map { $_->otherfd => "keep" } @channels_in, @channels_out;

   my $setup = $self->{setup};
   push @setup, @$setup if $setup;

   my $process = IO::Async::Process->new(
      setup => \@setup,
      code => sub {
         foreach ( @channels_in, @channels_out ) {
            $_->chan->setup_sync_mode( $_->otherfd );
         }

         if( defined $module ) {
            ( my $file = "$module.pm" ) =~ s{::}{/}g;
            require $file;

            $code = $module->can( $func ) or
               die "Module '$module' has no '$func'\n";
         }

         my $ret = $code->( map { $_->chan } @channels_in, @channels_out );

         foreach ( @channels_in, @channels_out ) {
            $_->chan->close;
         }

         return $ret;
      },
      on_finish => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exitcode ) = @_;
         $self->maybe_invoke_event( on_finish => $exitcode );

         unless( $exitcode & 0x7f ) {
            $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
            $self->result_future->done( $exitcode >> 8 );
         }
      }),
      on_exception => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exception, $errno, $exitcode ) = @_;

         $self->maybe_invoke_event( on_die => $exception );
         $self->result_future->fail( $exception, routine => );
      }),
   );

   $self->_adopt_channels_in ( @channels_in  );
   $self->_adopt_channels_out( @channels_out );

   $self->add_child( $self->{process} = $process );
   $self->{id} = "P" . $process->pid;

   $_->otherfd->close for @channels_in, @channels_out;
}

sub _setup_thread
{
   my $self = shift;

   my @channels_in  = $self->_create_channels_in;
   my @channels_out = $self->_create_channels_out;

   my $code = $self->{code};

   my $module = $self->{module};
   my $func   = $self->{func};

   my $tid = $self->loop->create_thread(
      code => sub {
         foreach ( @channels_in, @channels_out ) {
            $_->chan->setup_sync_mode( $_->otherfd );
            defined and $_->close for $_->myfd;
         }

         if( defined $func ) {
            ( my $file = "$module.pm" ) =~ s{::}{/}g;
            require $file;

            $code = $module->can( $func ) or
               die "Module '$module' has no '$func'\n";
         }

         my $ret = $code->( map { $_->chan } @channels_in, @channels_out );

         foreach ( @channels_in, @channels_out ) {
            $_->chan->close;
         }

         return $ret;
      },
      on_joined => $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $ev, @result ) = @_;
         $self->maybe_invoke_event( on_finish => @_ );

         if( $ev eq "return" ) {
            $self->maybe_invoke_event( on_return => @result );
            $self->result_future->done( @result );
         }
         if( $ev eq "died" ) {
            $self->maybe_invoke_event( on_die => $result[0] );
            $self->result_future->fail( $result[0], routine => );
         }

         delete $self->{tid};
      }),
   );

   $self->{tid} = $tid;
   $self->{id} = "T" . $tid;

   $self->_adopt_channels_in ( @channels_in  );
   $self->_adopt_channels_out( @channels_out );

   $_->otherfd->close for @channels_in, @channels_out;
}

# The injected program that goes into spawn mode
use constant PERL_RUNNER => <<'EOF';
( my ( $module, $func ), @INC ) = @ARGV;
( my $file = "$module.pm" ) =~ s{::}{/}g;
require $file;
my $code = $module->can( $func ) or die "Module '$module' has no '$func'\n";
require IO::Async::Channel;
exit $code->( IO::Async::Channel->new_stdin, IO::Async::Channel->new_stdout );
EOF

sub _setup_spawn
{
   my $self = shift;

   $self->{code} and
      die "Cannot run IO::Async::Routine in 'spawn' with code\n";

   @{ $self->{channels_in} } == 1 or
      die "IO::Async::Routine in 'spawn' mode requires exactly one input channel\n";
   @{ $self->{channels_out} } == 1 or
      die "IO::Async::Routine in 'spawn' mode requires exactly one output channel\n";

   my @channels_in  = $self->_create_channels_in;
   my @channels_out = $self->_create_channels_out;

   my $module = $self->{module};
   my $func   = $self->{func};

   my $process = IO::Async::Process->new(
      setup => [
         stdin  => $channels_in[0]->otherfd,
         stdout => $channels_out[0]->otherfd,
      ],
      command => [ $^X, "-E", PERL_RUNNER, $module, $func, grep { !ref } @INC ],
      on_finish => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exitcode ) = @_;
         $self->maybe_invoke_event( on_finish => $exitcode );

         unless( $exitcode & 0x7f ) {
            $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
            $self->result_future->done( $exitcode >> 8 );
         }
      }),
      on_exception => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exception, $errno, $exitcode ) = @_;

         $self->maybe_invoke_event( on_die => $exception );
         $self->result_future->fail( $exception, routine => );
      }),
   );

   $self->_adopt_channels_in ( @channels_in  );
   $self->_adopt_channels_out( @channels_out );

   $self->add_child( $self->{process} = $process );
   $self->{id} = "P" . $process->pid;

   $_->otherfd->close for @channels_in, @channels_out;
}

=head1 METHODS

=cut

=head2 id

   $id = $routine->id

Returns an ID string that uniquely identifies the Routine out of all the
currently-running ones. (The ID of already-exited Routines may be reused,
however.)

=cut

sub id
{
   my $self = shift;
   return $self->{id};
}

=head2 model

   $model = $routine->model

Returns the detachment model in use by the Routine.

=cut

sub model
{
   my $self = shift;
   return $self->{model};
}

=head2 kill

   $routine->kill( $signal )

Sends the specified signal to the routine code. This is either implemented by
C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case
this has the usual limits of signal delivery to threads; namely, that it works
at the Perl interpreter level, and cannot actually interrupt blocking system
calls.

=cut

sub kill
{
   my $self = shift;
   my ( $signal ) = @_;

   $self->{process}->kill( $signal ) if $self->{model} eq "fork";
   threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
}

=head2 result_future

   $f = $routine->result_future

I<Since version 0.75.>

Returns a new C<IO::Async::Future> which will complete with the eventual
return value or exception when the routine finishes.

If the routine finishes with a successful result then this will be the C<done>
result of the future. If the routine fails with an exception then this will be
the C<fail> result.

=cut

sub result_future
{
   my $self = shift;

   return $self->{result_future} //= do {
      my $f = $self->loop->new_future;
      # This future needs to strongly retain $self to ensure it definitely gets
      # notified
      $f->on_ready( sub { undef $self } );
      $f;
   };
}

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;