summaryrefslogtreecommitdiff
path: root/lib/IO/Async/Routine.pm
blob: 18671e169eb167a8da63a3efd07368cc6ec86d1b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2012-2019 -- leonerd@leonerd.org.uk

package IO::Async::Routine;

use strict;
use warnings;

our $VERSION = '0.78';

use base qw( IO::Async::Notifier );

use Carp;

use IO::Async::OS;
use IO::Async::Process;

=head1 NAME

C<IO::Async::Routine> - execute code in an independent sub-process or thread

=head1 SYNOPSIS

 use IO::Async::Routine;
 use IO::Async::Channel;

 use IO::Async::Loop;
 my $loop = IO::Async::Loop->new;

 my $nums_ch = IO::Async::Channel->new;
 my $ret_ch  = IO::Async::Channel->new;

 my $routine = IO::Async::Routine->new(
    channels_in  => [ $nums_ch ],
    channels_out => [ $ret_ch ],

    code => sub {
       my @nums = @{ $nums_ch->recv };
       my $ret = 0; $ret += $_ for @nums;

       # Can only send references
       $ret_ch->send( \$ret );
    },

    on_finish => sub {
       say "The routine aborted early - $_[-1]";
       $loop->stop;
    },
 );

 $loop->add( $routine );

 $nums_ch->send( [ 10, 20, 30 ] );
 $ret_ch->recv(
    on_recv => sub {
       my ( $ch, $totalref ) = @_;
       say "The total of 10, 20, 30 is: $$totalref";
       $loop->stop;
    }
 );

 $loop->run;

=head1 DESCRIPTION

This L<IO::Async::Notifier> contains a body of code and executes it in a
sub-process or thread, allowing it to act independently of the main program.
Once set up, all communication with the code happens by values passed into or
out of the Routine via L<IO::Async::Channel> objects.

A choice of detachment model is available, with options being a C<fork()>ed
child process, or a thread. In both cases the code contained within the
Routine is free to make blocking calls without stalling the rest of the
program. This makes it useful for using existing code which has no option not
to block within an L<IO::Async>-based program.

Code running inside a C<fork()>-based Routine runs within its own process; it
is isolated from the rest of the program in terms of memory, CPU time, and
other resources. Code running in a thread-based Routine however, shares memory
and other resources such as open filehandles with the main thread.

To create asynchronous wrappers of functions that return a value based only on
their arguments, and do not generally maintain state within the process it may
be more convenient to use an L<IO::Async::Function> instead, which uses an
C<IO::Async::Routine> to contain the body of the function and manages the
Channels itself.

=cut

=head1 EVENTS

=head2 on_finish $exitcode

For C<fork()>-based Routines, this is invoked after the process has exited and
is passed the raw exitcode status.

=head2 on_finish $type, @result

For thread-based Routines, this is invoked after the thread has returned from
its code block and is passed the C<on_joined> result.

As the behaviour of these events differs per model, it may be more convenient
to use C<on_return> and C<on_die> instead.

=head2 on_return $result

Invoked if the code block returns normally. Note that C<fork()>-based Routines
can only transport an integer result between 0 and 255, as this is the actual
C<exit()> value.

=head2 on_die $exception

Invoked if the code block fails with an exception.

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 model => "fork" | "thread"

Optional. Defines how the routine will detach itself from the main process.
C<fork> uses a child process detached using an L<IO::Async::Process>.
C<thread> uses a thread, and is only available on threaded Perls.

If the model is not specified, the environment variable
C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined,
C<fork> is preferred if it is available, otherwise C<thread>.

=head2 channels_in => ARRAY of IO::Async::Channel

ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
in to the Routine.

=head2 channels_out => ARRAY of IO::Async::Channel

ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
out of the Routine.

=head2 code => CODE

CODE reference to the body of the Routine, to execute once the channels are
set up.

=head2 setup => ARRAY

Optional. For C<fork()>-based Routines, gives a reference to an array to pass
to the underlying C<Loop> C<fork_child> method. Ignored for thread-based
Routines.

=cut

use constant PREFERRED_MODEL =>
   IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
   IO::Async::OS->HAVE_THREADS    ? "thread" :
      die "No viable Routine models";

sub _init
{
   my $self = shift;
   my ( $params ) = @_;

   $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;

   $self->SUPER::_init( @_ );
}

sub configure
{
   my $self = shift;
   my %params = @_;

   # TODO: Can only reconfigure when not running
   foreach (qw( channels_in channels_out code setup on_finish on_return on_die )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   if( defined( my $model = delete $params{model} ) ) {
      $model eq "fork" or $model eq "thread" or
         croak "Expected 'model' to be either 'fork' or 'thread'";

      $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
         croak "Cannot use 'fork' model as fork() is not available";
      $model eq "thread" and !IO::Async::OS->HAVE_THREADS and
         croak "Cannot use 'thread' model as threads are not available";

      $self->{model} = $model;
   }

   $self->SUPER::configure( %params );
}

sub _add_to_loop
{
   my $self = shift;
   my ( $loop ) = @_;
   $self->SUPER::_add_to_loop( $loop );

   return $self->_setup_fork   if $self->{model} eq "fork";
   return $self->_setup_thread if $self->{model} eq "thread";

   die "TODO: unrecognised Routine model $self->{model}";
}

sub _setup_fork
{
   my $self = shift;

   my @setup;
   my @channels_in;
   my @channels_out;

   foreach my $ch ( @{ $self->{channels_in} || [] } ) {
      my ( $rd, $wr );
      unless( $rd = $ch->_extract_read_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @setup, $rd => "keep";
      push @channels_in, [ $ch, $wr, $rd ];
   }

   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
      my ( $rd, $wr );
      unless( $wr = $ch->_extract_write_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @setup, $wr => "keep";
      push @channels_out, [ $ch, $rd, $wr ];
   }

   my $code  = $self->{code};

   my $setup = $self->{setup};
   push @setup, @$setup if $setup;

   my $process = IO::Async::Process->new(
      setup => \@setup,
      code => sub {
         foreach ( @channels_in ) {
            my ( $ch, undef, $rd ) = @$_;
            $ch->setup_sync_mode( $rd );
         }
         foreach ( @channels_out ) {
            my ( $ch, undef, $wr ) = @$_;
            $ch->setup_sync_mode( $wr );
         }

         my $ret = $code->();

         foreach ( @channels_in, @channels_out ) {
            my ( $ch ) = @$_;
            $ch->close;
         }

         return $ret;
      },
      on_finish => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exitcode ) = @_;
         $self->maybe_invoke_event( on_finish => $exitcode );

         unless( $exitcode & 0x7f ) {
            $self->maybe_invoke_event( on_return => ($exitcode >> 8) );
            $self->result_future->done( $exitcode >> 8 );
         }
      }),
      on_exception => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exception, $errno, $exitcode ) = @_;

         $self->maybe_invoke_event( on_die => $exception );
         $self->result_future->fail( $exception, routine => );
      }),
   );

   foreach ( @channels_in ) {
      my ( $ch, $wr ) = @$_;

      $ch->setup_async_mode( write_handle => $wr );

      $self->add_child( $ch ) unless $ch->parent;
   }

   foreach ( @channels_out ) {
      my ( $ch, $rd ) = @$_;

      $ch->setup_async_mode( read_handle => $rd );

      $self->add_child( $ch ) unless $ch->parent;
   }

   $self->add_child( $self->{process} = $process );
   $self->{id} = "P" . $process->pid;

   foreach ( @channels_in, @channels_out ) {
      my ( undef, undef, $other ) = @$_;
      $other->close;
   }
}

sub _setup_thread
{
   my $self = shift;

   my @channels_in;
   my @channels_out;

   foreach my $ch ( @{ $self->{channels_in} || [] } ) {
      my ( $rd, $wr );
      unless( $rd = $ch->_extract_read_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @channels_in, [ $ch, $wr, $rd ];
   }

   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
      my ( $rd, $wr );
      unless( $wr = $ch->_extract_write_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @channels_out, [ $ch, $rd, $wr ];
   }

   my $code = $self->{code};

   my $tid = $self->loop->create_thread(
      code => sub {
         foreach ( @channels_in ) {
            my ( $ch, $wr, $rd ) = @$_;
            $ch->setup_sync_mode( $rd );
            $wr->close if $wr;
         }
         foreach ( @channels_out ) {
            my ( $ch, $rd, $wr ) = @$_;
            $ch->setup_sync_mode( $wr );
            $rd->close if $rd;
         }

         my $ret = $code->();

         foreach ( @channels_in, @channels_out ) {
            my ( $ch ) = @$_;
            $ch->close;
         }

         return $ret;
      },
      on_joined => $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $ev, @result ) = @_;
         $self->maybe_invoke_event( on_finish => @_ );

         if( $ev eq "return" ) {
            $self->maybe_invoke_event( on_return => @result );
            $self->result_future->done( @result );
         }
         if( $ev eq "died" ) {
            $self->maybe_invoke_event( on_die => $result[0] );
            $self->result_future->fail( $result[0], routine => );
         }

         delete $self->{tid};
      }),
   );

   $self->{tid} = $tid;
   $self->{id} = "T" . $tid;

   foreach ( @channels_in ) {
      my ( $ch, $wr, $rd ) = @$_;

      $ch->setup_async_mode( write_handle => $wr );
      $rd->close;

      $self->add_child( $ch ) unless $ch->parent;
   }

   foreach ( @channels_out ) {
      my ( $ch, $rd, $wr ) = @$_;

      $ch->setup_async_mode( read_handle => $rd );
      $wr->close;

      $self->add_child( $ch ) unless $ch->parent;
   }
}

=head1 METHODS

=cut

=head2 id

   $id = $routine->id

Returns an ID string that uniquely identifies the Routine out of all the
currently-running ones. (The ID of already-exited Routines may be reused,
however.)

=cut

sub id
{
   my $self = shift;
   return $self->{id};
}

=head2 model

   $model = $routine->model

Returns the detachment model in use by the Routine.

=cut

sub model
{
   my $self = shift;
   return $self->{model};
}

=head2 kill

   $routine->kill( $signal )

Sends the specified signal to the routine code. This is either implemented by
C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case
this has the usual limits of signal delivery to threads; namely, that it works
at the Perl interpreter level, and cannot actually interrupt blocking system
calls.

=cut

sub kill
{
   my $self = shift;
   my ( $signal ) = @_;

   $self->{process}->kill( $signal ) if $self->{model} eq "fork";
   threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
}

=head2 result_future

   $f = $routine->result_future

I<Since version 0.75.>

Returns a new C<IO::Async::Future> which will complete with the eventual
return value or exception when the routine finishes.

If the routine finishes with a successful result then this will be the C<done>
result of the future. If the routine fails with an exception then this will be
the C<fail> result.

=cut

sub result_future
{
   my $self = shift;

   return $self->{result_future} //= do {
      my $f = $self->loop->new_future;
      # This future needs to strongly retain $self to ensure it definitely gets
      # notified
      $f->on_ready( sub { undef $self } );
      $f;
   };
}

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;