diff options
Diffstat (limited to 'lib/MCE/Channel.pm')
-rw-r--r-- | lib/MCE/Channel.pm | 658 |
1 files changed, 658 insertions, 0 deletions
diff --git a/lib/MCE/Channel.pm b/lib/MCE/Channel.pm new file mode 100644 index 0000000..aa90d81 --- /dev/null +++ b/lib/MCE/Channel.pm @@ -0,0 +1,658 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like and two-way communication capability. +## +############################################################################### + +package MCE::Channel; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +## no critic (BuiltinFunctions::ProhibitStringyEval) +## no critic (TestingAndDebugging::ProhibitNoStrict) + +use if $^O eq 'MSWin32', 'threads'; +use if $^O eq 'MSWin32', 'threads::shared'; + +use Carp (); + +$Carp::Internal{ (__PACKAGE__) }++; + +my ( $freeze, $thaw ); + +BEGIN { + if ( ! defined $INC{'PDL.pm'} ) { + local $@; eval ' + use Sereal::Encoder 3.015 qw( encode_sereal ); + use Sereal::Decoder 3.015 qw( decode_sereal ); + '; + if ( ! $@ ) { + my $encoder_ver = int( Sereal::Encoder->VERSION() ); + my $decoder_ver = int( Sereal::Decoder->VERSION() ); + if ( $encoder_ver - $decoder_ver == 0 ) { + $freeze = \&encode_sereal; + $thaw = \&decode_sereal; + } + } + } + + if ( ! defined $freeze ) { + require Storable; + $freeze = \&Storable::freeze; + $thaw = \&Storable::thaw; + } +} + +use MCE::Util (); +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $has_threads = $INC{'threads.pm'} ? 1 : 0; +my $tid = $has_threads ? threads->tid() : 0; + +sub new { + my ( $class, %argv ) = @_; + my $impl = defined( $argv{impl} ) ? ucfirst( lc $argv{impl} ) : 'Mutex'; + + $impl = 'Threads' if ( $^O eq 'MSWin32' && $impl eq 'Mutex' ); + + eval "require MCE::Channel::$impl; 1" || + Carp::croak("Could not load Channel implementation '$impl': $@"); + + my $pkg = 'MCE::Channel::'.$impl; + no strict 'refs'; + + $pkg->new(%argv); +} + +sub CLONE { + $tid = threads->tid if $has_threads; +} + +sub DESTROY { + my ( $pid, $self ) = ( $has_threads ? $$ .'.'. $tid : $$, @_ ); + + if ( $self->{'init_pid'} && $self->{'init_pid'} eq $pid ) { + MCE::Util::_destroy_socks( $self, qw(c_sock p_sock) ); + delete $self->{c_mutex}; + delete $self->{p_mutex}; + } + + return; +} + +sub impl { + $_[0]->{'impl'} || 'Not defined'; +} + +sub _get_freeze { $freeze; } +sub _get_thaw { $thaw; } + +sub _ended { + warn "WARNING: ($_[0]) called on a channel that has been 'end'ed\n"; + + return; +} + +sub _read { + my $bytes = MCE::Util::_sysread( $_[0], $_[1], my $len = $_[2] ); + my $read = $bytes; + + while ( $bytes && $read != $len ) { + $bytes = MCE::Util::_sysread( $_[0], $_[1], $len - $read, length($_[1]) ); + $read += $bytes if $bytes; + } + + return; +} + +sub _pid { + $has_threads ? $$ .'.'. $tid : $$; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel - Queue-like and two-way communication capability + +=head1 VERSION + +This document describes MCE::Channel version 1.843 + +=head1 SYNOPSIS + + use MCE::Channel; + + ######################## + # Construction + ######################## + + # A single producer and many consumers supporting processes and threads + + my $c1 = MCE::Channel->new( impl => 'Mutex' ); # default implementation + my $c2 = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking + + # Set the mp flag if two or more workers (many producers) will be calling + # enqueue/send or recv2/recv2_nb on the left end of the channel + + my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 ); + + # Tuned for one producer and one consumer, no locking + + my $c5 = MCE::Channel->new( impl => 'Simple' ); + + ######################## + # Queue-like behavior + ######################## + + # Send data to consumers + $c1->enqueue('item'); + $c1->enqueue(qw/item1 item2 item3 itemN/); + + # Receive data + my $item = $c1->dequeue(); # item + my @items = $c1->dequeue(2); # (item1, item2) + + # Receive, non-blocking + my $item = $c1->dequeue_nb(); # item + my @items = $c1->dequeue_nb(2); # (item1, item2) + + # Signal that there is no more work to be sent + $c1->end(); + + ######################## + # Two-way communication + ######################## + + # Producer(s) sending data + $c3->send('message'); + $c3->send(qw/arg1 arg2 arg3/); + + # Consumer(s) receiving data + my $mesg = $c3->recv(); # message + my @args = $c3->recv(); # (arg1, arg2, arg3) + + # Alternatively, non-blocking + my $mesg = $c3->recv_nb(); # message + my @args = $c3->recv_nb(); # (arg1, arg2, arg3) + + # A producer signaling no more work to be sent + $c3->end(); + + # Consumers(s) sending data + $c3->send2('message'); + $c3->send2(qw/arg1 arg2 arg3/); + + # Producer(s) receiving data + my $mesg = $c3->recv2(); # message + my @args = $c3->recv2(); # (arg1, arg2, arg3) + + # Alternatively, non-blocking + my $mesg = $c3->recv2_nb(); # message + my @args = $c3->recv2_nb(); # (arg1, arg2, arg3) + +=head1 DESCRIPTION + +A MCE::Channel object is a container for sending and receiving data using +socketpair handles. Serialization is provided by L<Sereal> if available. +Defaults to L<Storable> otherwise. Excluding the C<Simple> implementation, +both ends of the C<channel> support many workers concurrently (with mp => 1). + +=head2 new ( impl => STRING, mp => BOOLEAN ) + +This creates a new channel. Three implementations are provided C<Mutex> (default), +C<Threads>, and C<Simple> indicating the locking mechanism to use C<MCE::Mutex>, +C<threads::shared>, and no locking respectively. + + $chnl = MCE::Channel->new(); # default: impl => 'Mutex', mp => 0 + +The C<Mutex> channel implementation supports processes and threads whereas the +C<Threads> channel implementation is suited for threads only. + + $chnl = MCE::Channel->new( impl => 'Mutex' ); # MCE::Mutex locking + $chnl = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking + +Set the C<mp> (m)any (p)roducers option to a true value if there will be two +or more workers calling C<enqueue>, <send>, C<recv2>, or C<recv2_nb> on the +left end of the channel. This is important to not incur a race condition. + + $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 ); + +The C<Simple> implementation is optimized for one producer and one consumer max. +It omits locking for maximum performance. This implementation is preferred for +parent to child communication not shared by another worker. + + $chnl = MCE::Channel->new( impl => 'Simple' ); + +=head1 QUEUE-LIKE BEHAVIOR + +=head2 enqueue ( ITEM1 [, ITEM2, ... ] ) + +Appends a list of items onto the left end of the channel. This will block once +the internal socket buffer becomes full (i.e. awaiting workers to dequeue on the +other end). This prevents producer(s) from running faster than consumer(s). + +Object (de)serialization is handled automatically using L<Sereal> if available +or defaults to L<Storable> otherwise. + + $chnl->enqueue('item1'); + $chnl->enqueue(qw/item2 item3 .../); + + $chnl->enqueue([ array_ref1 ]); + $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...); + + $chnl->enqueue({ hash_ref1 }); + $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...); + +=head2 dequeue + +=head2 dequeue ( COUNT ) + +Removes the requested number of items (default 1) from the right end of the +channel. If the channel contains fewer than the requested number of items, +the method will block (i.e. until other producer(s) enqueue more items). + + $item = $chnl->dequeue(); # item1 + @items = $chnl->dequeue(2); # ( item2, item3 ) + +=head2 dequeue_nb + +=head2 dequeue_nb ( COUNT ) + +Removes the requested number of items (default 1) from the right end of the +channel. If the channel contains fewer than the requested number of items, +the method will return what it was able to retrieve and return immediately. +If the channel is empty, then returns C<an empty list> in list context or +C<undef> in scalar context. + + $item = $chnl->dequeue_nb(); # array_ref1 + @items = $chnl->dequeue_nb(2); # ( array_ref2, array_ref3 ) + +=head2 end + +This is called by a producer to signal that there is no more work to be sent. +Once ended, no more items may be sent by the producer. Calling C<end> by +multiple producers is not supported. + + $chnl->end; + +=head1 TWO-WAY IPC - PRODUCER TO CONSUMER + +=head2 send ( ARG1 [, ARG2, ... ] ) + +Append data onto the left end of the channel. Unlike C<enqueue>, the values +are kept together for the receiving consumer, similarly to calling a method. +Object (de)serialization is handled automatically. + + $chnl->send('item'); + $chnl->send([ list_ref ]); + $chnl->send([ hash_ref ]); + + $chnl->send(qw/item1 item2 .../); + $chnl->send($id, [ list_ref ]); + $chnl->send($id, { hash_ref }); + +=head2 recv + +=head2 recv_nb + +Blocking and non-blocking fetch methods from the right end of the channel. +For the latter and when the channel is empty, returns C<an empty list> in +list context or C<undef> in scalar context. + + $item = $chnl->recv(); + $array_ref = $chnl->recv(); + $hash_ref = $chnl->recv(); + + ($item1, $item2) = $chnl->recv_nb(); + ($id, $array_ref) = $chnl->recv_nb(); + ($id, $hash_ref) = $chnl->recv_nb(); + +=head1 TWO-WAY IPC - CONSUMER TO PRODUCER + +=head2 send2 ( ARG1 [, ARG2, ... ] ) + +Append data onto the right end of the channel. Unlike C<enqueue>, the values +are kept together for the receiving producer, similarly to calling a method. +Object (de)serialization is handled automatically. + + $chnl->send2('item'); + $chnl->send2([ list_ref ]); + $chnl->send2([ hash_ref ]); + + $chnl->send2(qw/item1 item2 .../); + $chnl->send2($id, [ list_ref ]); + $chnl->send2($id, { hash_ref }); + +=head2 recv2 + +=head2 recv2_nb + +Blocking and non-blocking fetch methods from the left end of the channel. +For the latter and when the channel is empty, returns C<an empty list> in +list context or C<undef> in scalar context. + + $item = $chnl->recv2(); + $array_ref = $chnl->recv2(); + $hash_ref = $chnl->recv2(); + + ($item1, $item2) = $chnl->recv2_nb(); + ($id, $array_ref) = $chnl->recv2_nb(); + ($id, $hash_ref) = $chnl->recv2_nb(); + +=head1 DEMONSTRATIONS + +=head2 Example 1 - threads + +C<MCE::Channel> was made to work efficiently with L<threads>. The reason is from +using L<threads::shared> for locking versus L<MCE::Mutex>. + + use strict; + use warnings; + + use threads; + use MCE::Channel; + + my $queue = MCE::Channel->new( impl => 'Threads' ); + my $num_consumers = 10; + + sub consumer { + # receive items + my $count = 0; + while ( my ($item1, $item2) = $queue->dequeue(2) ) { + $count += 2; + } + # send result + $queue->send2( threads->tid => $count ); + } + + threads->create('consumer') for 1 .. $num_consumers; + + ## producer + + $queue->enqueue($_, $_ * 2) for 1 .. 40000; + $queue->end; + + my %results; + my $total = 0; + + for ( 1 .. $num_consumers ) { + my ($id, $count) = $queue->recv2; + $results{$id} = $count; + $total += $count; + } + + $_->join for threads->list; + + print $results{$_}, "\n" for keys %results; + print "$total total\n\n"; + + __END__ + + # output + + 8034 + 8008 + 8036 + 8058 + 7990 + 7948 + 8068 + 7966 + 7960 + 7932 + 80000 total + +=head2 Example 2 - MCE::Child + +The following is similarly threads-like for Perl lacking threads support. +It spawns processes instead, thus requires the C<Mutex> channel implementation +which is the default if omitted. + + use strict; + use warnings; + + use MCE::Child; + use MCE::Channel; + + my $queue = MCE::Channel->new( impl => 'Mutex' ); + my $num_consumers = 10; + + sub consumer { + # receive items + my $count = 0; + while ( my ($item1, $item2) = $queue->dequeue(2) ) { + $count += 2; + } + # send result + $queue->send2( MCE::Child->pid => $count ); + } + + MCE::Child->create('consumer') for 1 .. $num_consumers; + + ## producer + + $queue->enqueue($_, $_ * 2) for 1 .. 40000; + $queue->end; + + my %results; + my $total = 0; + + for ( 1 .. $num_consumers ) { + my ($id, $count) = $queue->recv2; + $results{$id} = $count; + $total += $count; + } + + $_->join for MCE::Child->list; + + print $results{$_}, "\n" for keys %results; + print "$total total\n\n"; + +=head2 Example 3 - Many producers + +Running with 2 or more producers requires setting the C<mp> option. Internally, +this enables locking support for the left end of the channel. The C<mp> option +applies to C<Mutex> and C<Threads> channel implementations only. + +Here, using the MCE facility for gathering the final count. + + use strict; + use warnings; + + use MCE::Flow; + use MCE::Channel; + + my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + my $num_consumers = 10; + + sub consumer { + # receive items + my $count = 0; + while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) { + $count += 2; + } + # send result + MCE->gather( MCE->wid => $count ); + } + + sub producer { + $queue->enqueue($_, $_ * 2) for 1 .. 20000; + } + + ## run 2 producers and many consumers + + MCE::Flow::init( + max_workers => [ 2, $num_consumers ], + task_name => [ 'producer', 'consumer' ], + task_end => sub { + my ($mce, $task_id, $task_name) = @_; + if ( $task_name eq 'producer' ) { + $queue->end; + } + } + ); + + # consumers call gather above (i.e. send a key-value pair), + # have MCE append to a hash + + my %results = mce_flow \&producer, \&consumer; + + MCE::Flow::finish; + + my $total = 0; + + for ( keys %results ) { + $total += $results{$_}; + print $results{$_}, "\n"; + } + + print "$total total\n\n"; + +=head2 Example 4 - Request input + +This demonstration configures a channel per consumer. Plus, a common channel +for consumers to request the next input item. The C<Simple> implementation is +specified for the individual channels whereas locking may be necessary for the +C<$ready> channel. However, consumers do not incur reading and what is written +is very small (i.e. atomic write is guaranteed by the OS). Thus, am safely +choosing the C<Simple> implementation versus C<Mutex>. + + use strict; + use warnings; + + use MCE::Flow; + use MCE::Channel; + + my $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g; + my $input_size = shift || 3000; + + unless ($input_size =~ /\A\d+\z/) { + print {*STDERR} "usage: $prog_name [ size ]\n"; + exit 1; + } + + my $consumers = 4; + + my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers; + + my $ready = MCE::Channel->new( impl => 'Simple' ); + + sub producer { + my $id = 0; + + # send the next input item upon request + for ( 0 .. $input_size - 1 ) { + my $chnl_num = $ready->recv2; + $chnls[ $chnl_num ]->send( ++$id, $_ ); + } + + # signal no more work + $_->send( 0, undef ) for @chnls; + } + + sub consumer { + my $chnl_num = MCE->task_wid - 1; + + while () { + # notify the producer ready for input + $ready->send2( $chnl_num ); + + # retrieve input data + my ( $id, $item ) = $chnls[ $chnl_num ]->recv; + + # leave loop if no more work + last unless $id; + + # compute and send the result to the manager process + # ordered output requires an id (must be 1st argument) + MCE->gather( $id, [ $item, sqrt($item) ] ); + } + } + + # A custom 'ordered' output iterator for MCE's gather facility. + # It returns a closure block, expecting an ID for 1st argument. + + sub output_iterator { + my %tmp; my $order_id = 1; + + return sub { + my ( $id, $result ) = @_; + $tmp{ $id } = $result; + + while () { + last unless exists $tmp{ $order_id }; + $result = delete $tmp{ $order_id }; + printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1]; + $order_id++; + } + }; + } + + # Run one producer and many consumers. + # Output to be sent orderly to STDOUT. + + MCE::Flow->init( + gather => output_iterator(), + max_workers => [ 1, $consumers ], + ); + + MCE::Flow->run( \&producer, \&consumer ); + MCE::Flow->finish; + + __END__ + + # Output + + n: 0 sqrt(n): 0.000000 + n: 1 sqrt(n): 1.000000 + n: 2 sqrt(n): 1.414214 + n: 3 sqrt(n): 1.732051 + n: 4 sqrt(n): 2.000000 + n: 5 sqrt(n): 2.236068 + n: 6 sqrt(n): 2.449490 + n: 7 sqrt(n): 2.645751 + n: 8 sqrt(n): 2.828427 + n: 9 sqrt(n): 3.000000 + ... + +=head1 SEE ALSO + +=over 3 + +=item * L<https://github.com/marioroy/mce-examples/tree/master/chameneos> + +=item * L<threads::lite> + +=back + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=head1 COPYRIGHT AND LICENSE + +Copyright (C) 2019 by Mario E. Roy + +MCE::Shared is released under the same license as Perl. + +See L<http://dev.perl.org/licenses/> for more information. + +=cut + |