diff options
author | intrigeri <intrigeri@boum.org> | 2019-07-25 02:23:34 +0000 |
---|---|---|
committer | intrigeri <intrigeri@boum.org> | 2019-07-25 02:23:34 +0000 |
commit | a0567bc8c39a9f006202a716ca0da1e5b12771a5 (patch) | |
tree | 086bfbe53d72fe8722339cd762a8670ed4897419 /lib | |
parent | ced942ca55cb90ddce354f56c6696bb34ba8c2ff (diff) |
New upstream version 1.843
Diffstat (limited to 'lib')
33 files changed, 4369 insertions, 327 deletions
@@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -90,9 +90,9 @@ BEGIN { ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status ## _init_pid _init_total_workers ## - ## _bsb_r_sock _bsb_w_sock _bse_r_sock _bse_w_sock _com_r_sock _com_w_sock - ## _dat_r_sock _dat_w_sock _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock - ## _data_channels _lock_chn _mutex_n + ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock + ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels + ## _lock_chn _mutex_n %_valid_fields_new = map { $_ => 1 } qw( max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw @@ -212,7 +212,7 @@ sub import { use constant { # Max data channels. This cannot be greater than 8 on MSWin32. - DATA_CHANNELS => ($^O eq 'MSWin32') ? 8 : 12, + DATA_CHANNELS => ($^O eq 'MSWin32') ? 8 : 10, # Max GC size. Undef variable when exceeding size. MAX_GC_SIZE => 1024 * 1024 * 64, @@ -477,11 +477,14 @@ sub new { $self{_last_sref} = (ref $self{input_data} eq 'SCALAR') ? refaddr($self{input_data}) : 0; - my $_data_channels = ("$$.$_tid" eq $_oid) ? DATA_CHANNELS : 2; + my $_data_channels = ("$$.$_tid" eq $_oid) + ? ( $INC{'MCE/Channel.pm'} ? 6 : DATA_CHANNELS ) + : 2; + my $_total_workers = 0; if (defined $self{user_tasks}) { - $_total_workers += $_->{max_workers} for (@{ $self{user_tasks} }); + $_total_workers += $_->{max_workers} for @{ $self{user_tasks} }; } else { $_total_workers = $self{max_workers}; } @@ -492,7 +495,7 @@ sub new { ? $_total_workers : $_data_channels; $self{_lock_chn} = ($_total_workers > $_data_channels) ? 1 : 0; - $self{_lock_chn} = 1 if $INC{'MCE/Hobo.pm'}; + $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'}; $MCE = \%self if ($MCE->{_wid} == 0); @@ -527,7 +530,7 @@ sub spawn { local $@; eval 'require Net::HTTP; require Net::HTTPS'; } - ## Start the shared-manager process if present. + ## Start the shared-manager process if not running. MCE::Shared->start() if $INC{'MCE/Shared.pm'}; ## Load input module. @@ -599,7 +602,6 @@ sub spawn { ## Create sockets for IPC. MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock)); # sync - MCE::Util::_sock_pair($self, qw(_bse_r_sock _bse_w_sock)); # sync MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock)); # core MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_) # core for (0 .. $_data_channels); @@ -1123,9 +1125,7 @@ sub run { ## Insert the first message into the queue if defined. if (defined $_first_msg) { - MCE::Util::_syswrite( - $self->{_que_w_sock}, pack($_que_template, 0, $_first_msg) - ); + syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg)); } ## Submit params data to workers. @@ -1162,11 +1162,11 @@ sub run { ## Notify workers to commence processing. if ($_is_MSWin32) { my $_buf = _sprintf("%${_total_workers}s", ""); - MCE::Util::_syswrite($self->{_bse_w_sock}, $_buf); + syswrite($self->{_bsb_r_sock}, $_buf); } else { - my $_BSE_W_SOCK = $self->{_bse_w_sock}; + my $_BSB_R_SOCK = $self->{_bsb_r_sock}; for my $_i (1 .. $_total_workers) { - MCE::Util::_syswrite($_BSE_W_SOCK, $LF); + syswrite($_BSB_R_SOCK, $LF); } } } @@ -1345,9 +1345,8 @@ sub shutdown { $_COM_R_SOCK = undef; MCE::Util::_destroy_socks($self, qw( - _bsb_w_sock _bsb_r_sock _bse_w_sock _bse_r_sock - _com_w_sock _com_r_sock _dat_w_sock _dat_r_sock - _rla_w_sock _rla_r_sock + _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock + _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock )); ($_is_MSWin32) @@ -1393,7 +1392,7 @@ sub sync { my $_chn = $self->{_chn}; my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; my $_BSB_R_SOCK = $self->{_bsb_r_sock}; - my $_BSE_R_SOCK = $self->{_bse_r_sock}; + my $_BSB_W_SOCK = $self->{_bsb_w_sock}; my $_buf; local $\ = undef if (defined $\); @@ -1409,7 +1408,7 @@ sub sync { print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF; ## Wait until all workers from (task_id 0) have un-synced. - MCE::Util::_sysread($_BSE_R_SOCK, $_buf, 1); + MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1); return; } @@ -1460,7 +1459,7 @@ sub abort { if ($_abort_msg > 0) { MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size); - MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg)); + syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg)); } if ($self->{_wid} > 0) { @@ -1503,7 +1502,9 @@ sub exit { unless ($self->{_exiting}) { $self->{_exiting} = 1; - ## Check nested Hobo workers not yet joined. + ## Check for nested workers not yet joined. + MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'}; + MCE::Hobo->finish('MCE') if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') ); @@ -1608,17 +1609,20 @@ sub do { my $self = shift; $self = $MCE unless ref($self); my $_pkg = caller() eq 'MCE' ? caller(1) : caller(); - _croak('MCE::do: method is not allowed by the manager process') - unless ($self->{_wid}); _croak('MCE::do: (code ref) is not supported') if (ref $_[0] eq 'CODE'); - _croak('MCE::do: (callback) is not specified') unless (defined ( my $_func = shift )); $_func = $_pkg.'::'.$_func if (index($_func, ':') < 0); - return _do_callback($self, $_func, [ @_ ]); + if ($self->{_wid}) { + return _do_callback($self, $_func, [ @_ ]); + } + else { + no strict 'refs'; + return $_func->(@_); + } } ## Gather method. @@ -1855,7 +1859,6 @@ sub _make_sessdir { sub _sprintf { my ($_fmt, $_arg) = @_; - # remove tainted'ness ($_fmt) = $_fmt =~ /(.*)/; @@ -1923,7 +1926,9 @@ sub _dispatch { ## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x @_ = (); - $ENV{'PERL_MCE_IPC'} = 'win32' if ($_is_MSWin32 && $INC{'MCE/Hobo.pm'}); + $ENV{'PERL_MCE_IPC'} = 'win32' if ( + $_is_MSWin32 && ( $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'} ) + ); if (!$_is_thread && UNIVERSAL::can('Prima', 'cleanup')) { no warnings 'redefine'; local $@; eval '*Prima::cleanup = sub {}'; @@ -1944,6 +1949,8 @@ sub _dispatch { MCE::Hobo->_clear() if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') ); + + MCE::Child->_clear() if $INC{'MCE/Child.pm'}; } if (!$self->{use_threads} && $INC{'Math/Random.pm'}) { diff --git a/lib/MCE.pod b/lib/MCE.pod index b606382..d434111 100644 --- a/lib/MCE.pod +++ b/lib/MCE.pod @@ -5,7 +5,7 @@ MCE - Many-Core Engine for Perl providing parallel processing capabilities =head1 VERSION -This document describes MCE version 1.838 +This document describes MCE version 1.843 Many-Core Engine (MCE) for Perl helps enable a new level of performance by maximizing all available cores. @@ -135,28 +135,35 @@ The next demonstration loops through a sequence of numbers with MCE::Flow. =head1 CORE MODULES -Three modules make up the core engine for MCE. +Four modules make up the core engine for MCE. =over 3 =item L<MCE::Core> -Provides the Core API for Many-Core Engine. The various MCE options are -described here. It includes several demonstrations at the end of the page. +This is the POD documentation describing the core Many-Core Engine (MCE) API. +Go here for help with the various MCE options. See also, L<MCE::Examples> +for additional demonstrations. + +=item L<MCE::Mutex> + +Provides a simple semaphore implementation supporting threads and processes. +Two implementations are provided; one via pipes or socket depending on the +platform and the other using Fcntl. =item L<MCE::Signal> -Temporary directory creation, cleanup, and signal handling. +Provides signal handling, temporary directory creation, and cleanup for MCE. =item L<MCE::Util> -Utility functions for Many-Core Engine. +Provides utility functions for MCE. =back =head1 MCE EXTRAS -There are 4 add-on modules for use with MCE. +There are 5 add-on modules for use with MCE. =over 3 @@ -165,11 +172,18 @@ There are 4 add-on modules for use with MCE. Provides a collection of sugar methods and output iterators for preserving output order. -=item L<MCE::Mutex> +=item L<MCE::Channel> -Provides a simple semaphore implementation supporting threads and processes. -Two implementations are provided. One via pipes or socket depending on the -platform. The other via Fcntl. +Introduced in MCE 1.839, provides queue-like and two-way communication +capability. Three implementations C<Simple>, C<Mutex>, and C<Threads> are +provided. C<Simple> does not involve locking whereas C<Mutex> and C<Threads> +do locking transparently using C<MCE::Mutex> and C<threads> respectively. + +=item L<MCE::Child> + +Also introduced in MCE 1.839, provides a threads-like parallelization module +that is compatible with Perl 5.8. It is a fork of L<MCE::Hobo>. The difference +is using a common C<MCE::Channel> object when yielding and joining. =item L<MCE::Queue> @@ -180,51 +194,53 @@ threads. =item L<MCE::Relay> -Enables workers to receive and pass on information orderly with zero -involvement by the manager process while running. +Provides workers the ability to receive and pass information orderly with zero +involvement by the manager process. This module is loaded automatically by +MCE when specifying the C<init_relay> MCE option. =back =head1 MCE MODELS -The models take Many-Core Engine to a new level for ease of use. Two options -(chunk_size and max_workers) are configured automatically as well as spawning -and shutdown. +The MCE models are sugar syntax on top of the L<MCE::Core> API. Two MCE options +(chunk_size and max_workers) are configured automatically. Moreover, spawning +workers and later shutdown occur transparently behind the scene. + +Choosing a MCE Model largely depends on the application. It all boils down +to how much automation you need MCE to handle transparently. Or if you prefer, +constructing the MCE object and running using the core MCE API is fine too. =over 3 -=item L<MCE::Loop> +=item L<MCE::Grep> -Provides a MCE model for building parallel loops. +Provides a parallel grep implementation similar to the native grep function. -=item L<MCE::Flow> +=item L<MCE::Map> -A parallel flow model for building creative applications. This makes use of -user_tasks in MCE. The author has full control when utilizing this model. -MCE::Flow is similar to MCE::Loop, but allows for multiple code blocks to -run in parallel with a slight change to syntax. +Provides a parallel map implementation similar to the native map function. -=item L<MCE::Grep> +=item L<MCE::Loop> -Provides a parallel grep implementation similar to the native grep function. +Provides a parallel for loop implementation. -=item L<MCE::Map> +=item L<MCE::Flow> -Provides a parallel map model similar to the native map function. +Like C<MCE::Loop>, but with support for multiple pools of workers. The pool +of workers are configured transparently via the MCE C<user_tasks> option. =item L<MCE::Step> -Provides a parallel step implementation utilizing MCE::Queue between user -tasks. MCE::Step is a spin off from MCE::Flow with a touch of MCE::Stream. -This model, introduced in 1.506, allows one to pass data from one sub-task -into the next transparently. +Like C<MCE::Flow>, but adds a C<MCE::Queue> object between each pool of +workers. This model, introduced in 1.506, allows one to pass data forward +(left to right) from one sub-task into another with little effort. =item L<MCE::Stream> -Provides an efficient parallel implementation for chaining multiple maps -and greps together through user_tasks and MCE::Queue. Like with MCE::Flow, -MCE::Stream can run multiple code blocks in parallel with a slight change -to syntax from MCE::Map and MCE::Grep. +This provides an efficient parallel implementation for chaining multiple maps +and greps transparently. Like C<MCE::Flow> and C<MCE::Step>, it too supports +multiple pools of workers. The distinction is that C<MCE::Stream> passes +data from right to left and done for you transparently. =back @@ -266,7 +282,8 @@ The source, cookbook, and examples are hosted at GitHub. =head1 SEE ALSO C<MCE::Shared> provides data sharing capabilities for C<MCE>. It includes -C<MCE::Hobo> for running code asynchronously. +C<MCE::Hobo> for running code asynchronously with the IPC handled by the +shared-manager process. =over 3 diff --git a/lib/MCE/Candy.pm b/lib/MCE/Candy.pm index 5facf29..6fa20eb 100644 --- a/lib/MCE/Candy.pm +++ b/lib/MCE/Candy.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; our @CARP_NOT = qw( MCE ); @@ -219,7 +219,7 @@ MCE::Candy - Sugar methods and output iterators =head1 VERSION -This document describes MCE::Candy version 1.838 +This document describes MCE::Candy version 1.843 =head1 DESCRIPTION @@ -458,7 +458,7 @@ processing input data. my @results; mce_flow { - max_workers => 'auto', ## Note that 'auto' is never higher than 8 + max_workers => 'auto', ## Note that 'auto' is never greater than 8 gather => MCE::Candy::out_iter_array(\@results) }, sub { diff --git a/lib/MCE/Channel.pm b/lib/MCE/Channel.pm new file mode 100644 index 0000000..aa90d81 --- /dev/null +++ b/lib/MCE/Channel.pm @@ -0,0 +1,658 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like and two-way communication capability. +## +############################################################################### + +package MCE::Channel; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +## no critic (BuiltinFunctions::ProhibitStringyEval) +## no critic (TestingAndDebugging::ProhibitNoStrict) + +use if $^O eq 'MSWin32', 'threads'; +use if $^O eq 'MSWin32', 'threads::shared'; + +use Carp (); + +$Carp::Internal{ (__PACKAGE__) }++; + +my ( $freeze, $thaw ); + +BEGIN { + if ( ! defined $INC{'PDL.pm'} ) { + local $@; eval ' + use Sereal::Encoder 3.015 qw( encode_sereal ); + use Sereal::Decoder 3.015 qw( decode_sereal ); + '; + if ( ! $@ ) { + my $encoder_ver = int( Sereal::Encoder->VERSION() ); + my $decoder_ver = int( Sereal::Decoder->VERSION() ); + if ( $encoder_ver - $decoder_ver == 0 ) { + $freeze = \&encode_sereal; + $thaw = \&decode_sereal; + } + } + } + + if ( ! defined $freeze ) { + require Storable; + $freeze = \&Storable::freeze; + $thaw = \&Storable::thaw; + } +} + +use MCE::Util (); +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $has_threads = $INC{'threads.pm'} ? 1 : 0; +my $tid = $has_threads ? threads->tid() : 0; + +sub new { + my ( $class, %argv ) = @_; + my $impl = defined( $argv{impl} ) ? ucfirst( lc $argv{impl} ) : 'Mutex'; + + $impl = 'Threads' if ( $^O eq 'MSWin32' && $impl eq 'Mutex' ); + + eval "require MCE::Channel::$impl; 1" || + Carp::croak("Could not load Channel implementation '$impl': $@"); + + my $pkg = 'MCE::Channel::'.$impl; + no strict 'refs'; + + $pkg->new(%argv); +} + +sub CLONE { + $tid = threads->tid if $has_threads; +} + +sub DESTROY { + my ( $pid, $self ) = ( $has_threads ? $$ .'.'. $tid : $$, @_ ); + + if ( $self->{'init_pid'} && $self->{'init_pid'} eq $pid ) { + MCE::Util::_destroy_socks( $self, qw(c_sock p_sock) ); + delete $self->{c_mutex}; + delete $self->{p_mutex}; + } + + return; +} + +sub impl { + $_[0]->{'impl'} || 'Not defined'; +} + +sub _get_freeze { $freeze; } +sub _get_thaw { $thaw; } + +sub _ended { + warn "WARNING: ($_[0]) called on a channel that has been 'end'ed\n"; + + return; +} + +sub _read { + my $bytes = MCE::Util::_sysread( $_[0], $_[1], my $len = $_[2] ); + my $read = $bytes; + + while ( $bytes && $read != $len ) { + $bytes = MCE::Util::_sysread( $_[0], $_[1], $len - $read, length($_[1]) ); + $read += $bytes if $bytes; + } + + return; +} + +sub _pid { + $has_threads ? $$ .'.'. $tid : $$; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel - Queue-like and two-way communication capability + +=head1 VERSION + +This document describes MCE::Channel version 1.843 + +=head1 SYNOPSIS + + use MCE::Channel; + + ######################## + # Construction + ######################## + + # A single producer and many consumers supporting processes and threads + + my $c1 = MCE::Channel->new( impl => 'Mutex' ); # default implementation + my $c2 = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking + + # Set the mp flag if two or more workers (many producers) will be calling + # enqueue/send or recv2/recv2_nb on the left end of the channel + + my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 ); + + # Tuned for one producer and one consumer, no locking + + my $c5 = MCE::Channel->new( impl => 'Simple' ); + + ######################## + # Queue-like behavior + ######################## + + # Send data to consumers + $c1->enqueue('item'); + $c1->enqueue(qw/item1 item2 item3 itemN/); + + # Receive data + my $item = $c1->dequeue(); # item + my @items = $c1->dequeue(2); # (item1, item2) + + # Receive, non-blocking + my $item = $c1->dequeue_nb(); # item + my @items = $c1->dequeue_nb(2); # (item1, item2) + + # Signal that there is no more work to be sent + $c1->end(); + + ######################## + # Two-way communication + ######################## + + # Producer(s) sending data + $c3->send('message'); + $c3->send(qw/arg1 arg2 arg3/); + + # Consumer(s) receiving data + my $mesg = $c3->recv(); # message + my @args = $c3->recv(); # (arg1, arg2, arg3) + + # Alternatively, non-blocking + my $mesg = $c3->recv_nb(); # message + my @args = $c3->recv_nb(); # (arg1, arg2, arg3) + + # A producer signaling no more work to be sent + $c3->end(); + + # Consumers(s) sending data + $c3->send2('message'); + $c3->send2(qw/arg1 arg2 arg3/); + + # Producer(s) receiving data + my $mesg = $c3->recv2(); # message + my @args = $c3->recv2(); # (arg1, arg2, arg3) + + # Alternatively, non-blocking + my $mesg = $c3->recv2_nb(); # message + my @args = $c3->recv2_nb(); # (arg1, arg2, arg3) + +=head1 DESCRIPTION + +A MCE::Channel object is a container for sending and receiving data using +socketpair handles. Serialization is provided by L<Sereal> if available. +Defaults to L<Storable> otherwise. Excluding the C<Simple> implementation, +both ends of the C<channel> support many workers concurrently (with mp => 1). + +=head2 new ( impl => STRING, mp => BOOLEAN ) + +This creates a new channel. Three implementations are provided C<Mutex> (default), +C<Threads>, and C<Simple> indicating the locking mechanism to use C<MCE::Mutex>, +C<threads::shared>, and no locking respectively. + + $chnl = MCE::Channel->new(); # default: impl => 'Mutex', mp => 0 + +The C<Mutex> channel implementation supports processes and threads whereas the +C<Threads> channel implementation is suited for threads only. + + $chnl = MCE::Channel->new( impl => 'Mutex' ); # MCE::Mutex locking + $chnl = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking + +Set the C<mp> (m)any (p)roducers option to a true value if there will be two +or more workers calling C<enqueue>, <send>, C<recv2>, or C<recv2_nb> on the +left end of the channel. This is important to not incur a race condition. + + $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 ); + +The C<Simple> implementation is optimized for one producer and one consumer max. +It omits locking for maximum performance. This implementation is preferred for +parent to child communication not shared by another worker. + + $chnl = MCE::Channel->new( impl => 'Simple' ); + +=head1 QUEUE-LIKE BEHAVIOR + +=head2 enqueue ( ITEM1 [, ITEM2, ... ] ) + +Appends a list of items onto the left end of the channel. This will block once +the internal socket buffer becomes full (i.e. awaiting workers to dequeue on the +other end). This prevents producer(s) from running faster than consumer(s). + +Object (de)serialization is handled automatically using L<Sereal> if available +or defaults to L<Storable> otherwise. + + $chnl->enqueue('item1'); + $chnl->enqueue(qw/item2 item3 .../); + + $chnl->enqueue([ array_ref1 ]); + $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...); + + $chnl->enqueue({ hash_ref1 }); + $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...); + +=head2 dequeue + +=head2 dequeue ( COUNT ) + +Removes the requested number of items (default 1) from the right end of the +channel. If the channel contains fewer than the requested number of items, +the method will block (i.e. until other producer(s) enqueue more items). + + $item = $chnl->dequeue(); # item1 + @items = $chnl->dequeue(2); # ( item2, item3 ) + +=head2 dequeue_nb + +=head2 dequeue_nb ( COUNT ) + +Removes the requested number of items (default 1) from the right end of the +channel. If the channel contains fewer than the requested number of items, +the method will return what it was able to retrieve and return immediately. +If the channel is empty, then returns C<an empty list> in list context or +C<undef> in scalar context. + + $item = $chnl->dequeue_nb(); # array_ref1 + @items = $chnl->dequeue_nb(2); # ( array_ref2, array_ref3 ) + +=head2 end + +This is called by a producer to signal that there is no more work to be sent. +Once ended, no more items may be sent by the producer. Calling C<end> by +multiple producers is not supported. + + $chnl->end; + +=head1 TWO-WAY IPC - PRODUCER TO CONSUMER + +=head2 send ( ARG1 [, ARG2, ... ] ) + +Append data onto the left end of the channel. Unlike C<enqueue>, the values +are kept together for the receiving consumer, similarly to calling a method. +Object (de)serialization is handled automatically. + + $chnl->send('item'); + $chnl->send([ list_ref ]); + $chnl->send([ hash_ref ]); + + $chnl->send(qw/item1 item2 .../); + $chnl->send($id, [ list_ref ]); + $chnl->send($id, { hash_ref }); + +=head2 recv + +=head2 recv_nb + +Blocking and non-blocking fetch methods from the right end of the channel. +For the latter and when the channel is empty, returns C<an empty list> in +list context or C<undef> in scalar context. + + $item = $chnl->recv(); + $array_ref = $chnl->recv(); + $hash_ref = $chnl->recv(); + + ($item1, $item2) = $chnl->recv_nb(); + ($id, $array_ref) = $chnl->recv_nb(); + ($id, $hash_ref) = $chnl->recv_nb(); + +=head1 TWO-WAY IPC - CONSUMER TO PRODUCER + +=head2 send2 ( ARG1 [, ARG2, ... ] ) + +Append data onto the right end of the channel. Unlike C<enqueue>, the values +are kept together for the receiving producer, similarly to calling a method. +Object (de)serialization is handled automatically. + + $chnl->send2('item'); + $chnl->send2([ list_ref ]); + $chnl->send2([ hash_ref ]); + + $chnl->send2(qw/item1 item2 .../); + $chnl->send2($id, [ list_ref ]); + $chnl->send2($id, { hash_ref }); + +=head2 recv2 + +=head2 recv2_nb + +Blocking and non-blocking fetch methods from the left end of the channel. +For the latter and when the channel is empty, returns C<an empty list> in +list context or C<undef> in scalar context. + + $item = $chnl->recv2(); + $array_ref = $chnl->recv2(); + $hash_ref = $chnl->recv2(); + + ($item1, $item2) = $chnl->recv2_nb(); + ($id, $array_ref) = $chnl->recv2_nb(); + ($id, $hash_ref) = $chnl->recv2_nb(); + +=head1 DEMONSTRATIONS + +=head2 Example 1 - threads + +C<MCE::Channel> was made to work efficiently with L<threads>. The reason is from +using L<threads::shared> for locking versus L<MCE::Mutex>. + + use strict; + use warnings; + + use threads; + use MCE::Channel; + + my $queue = MCE::Channel->new( impl => 'Threads' ); + my $num_consumers = 10; + + sub consumer { + # receive items + my $count = 0; + while ( my ($item1, $item2) = $queue->dequeue(2) ) { + $count += 2; + } + # send result + $queue->send2( threads->tid => $count ); + } + + threads->create('consumer') for 1 .. $num_consumers; + + ## producer + + $queue->enqueue($_, $_ * 2) for 1 .. 40000; + $queue->end; + + my %results; + my $total = 0; + + for ( 1 .. $num_consumers ) { + my ($id, $count) = $queue->recv2; + $results{$id} = $count; + $total += $count; + } + + $_->join for threads->list; + + print $results{$_}, "\n" for keys %results; + print "$total total\n\n"; + + __END__ + + # output + + 8034 + 8008 + 8036 + 8058 + 7990 + 7948 + 8068 + 7966 + 7960 + 7932 + 80000 total + +=head2 Example 2 - MCE::Child + +The following is similarly threads-like for Perl lacking threads support. +It spawns processes instead, thus requires the C<Mutex> channel implementation +which is the default if omitted. + + use strict; + use warnings; + + use MCE::Child; + use MCE::Channel; + + my $queue = MCE::Channel->new( impl => 'Mutex' ); + my $num_consumers = 10; + + sub consumer { + # receive items + my $count = 0; + while ( my ($item1, $item2) = $queue->dequeue(2) ) { + $count += 2; + } + # send result + $queue->send2( MCE::Child->pid => $count ); + } + + MCE::Child->create('consumer') for 1 .. $num_consumers; + + ## producer + + $queue->enqueue($_, $_ * 2) for 1 .. 40000; + $queue->end; + + my %results; + my $total = 0; + + for ( 1 .. $num_consumers ) { + my ($id, $count) = $queue->recv2; + $results{$id} = $count; + $total += $count; + } + + $_->join for MCE::Child->list; + + print $results{$_}, "\n" for keys %results; + print "$total total\n\n"; + +=head2 Example 3 - Many producers + +Running with 2 or more producers requires setting the C<mp> option. Internally, +this enables locking support for the left end of the channel. The C<mp> option +applies to C<Mutex> and C<Threads> channel implementations only. + +Here, using the MCE facility for gathering the final count. + + use strict; + use warnings; + + use MCE::Flow; + use MCE::Channel; + + my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + my $num_consumers = 10; + + sub consumer { + # receive items + my $count = 0; + while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) { + $count += 2; + } + # send result + MCE->gather( MCE->wid => $count ); + } + + sub producer { + $queue->enqueue($_, $_ * 2) for 1 .. 20000; + } + + ## run 2 producers and many consumers + + MCE::Flow::init( + max_workers => [ 2, $num_consumers ], + task_name => [ 'producer', 'consumer' ], + task_end => sub { + my ($mce, $task_id, $task_name) = @_; + if ( $task_name eq 'producer' ) { + $queue->end; + } + } + ); + + # consumers call gather above (i.e. send a key-value pair), + # have MCE append to a hash + + my %results = mce_flow \&producer, \&consumer; + + MCE::Flow::finish; + + my $total = 0; + + for ( keys %results ) { + $total += $results{$_}; + print $results{$_}, "\n"; + } + + print "$total total\n\n"; + +=head2 Example 4 - Request input + +This demonstration configures a channel per consumer. Plus, a common channel +for consumers to request the next input item. The C<Simple> implementation is +specified for the individual channels whereas locking may be necessary for the +C<$ready> channel. However, consumers do not incur reading and what is written +is very small (i.e. atomic write is guaranteed by the OS). Thus, am safely +choosing the C<Simple> implementation versus C<Mutex>. + + use strict; + use warnings; + + use MCE::Flow; + use MCE::Channel; + + my $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g; + my $input_size = shift || 3000; + + unless ($input_size =~ /\A\d+\z/) { + print {*STDERR} "usage: $prog_name [ size ]\n"; + exit 1; + } + + my $consumers = 4; + + my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers; + + my $ready = MCE::Channel->new( impl => 'Simple' ); + + sub producer { + my $id = 0; + + # send the next input item upon request + for ( 0 .. $input_size - 1 ) { + my $chnl_num = $ready->recv2; + $chnls[ $chnl_num ]->send( ++$id, $_ ); + } + + # signal no more work + $_->send( 0, undef ) for @chnls; + } + + sub consumer { + my $chnl_num = MCE->task_wid - 1; + + while () { + # notify the producer ready for input + $ready->send2( $chnl_num ); + + # retrieve input data + my ( $id, $item ) = $chnls[ $chnl_num ]->recv; + + # leave loop if no more work + last unless $id; + + # compute and send the result to the manager process + # ordered output requires an id (must be 1st argument) + MCE->gather( $id, [ $item, sqrt($item) ] ); + } + } + + # A custom 'ordered' output iterator for MCE's gather facility. + # It returns a closure block, expecting an ID for 1st argument. + + sub output_iterator { + my %tmp; my $order_id = 1; + + return sub { + my ( $id, $result ) = @_; + $tmp{ $id } = $result; + + while () { + last unless exists $tmp{ $order_id }; + $result = delete $tmp{ $order_id }; + printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1]; + $order_id++; + } + }; + } + + # Run one producer and many consumers. + # Output to be sent orderly to STDOUT. + + MCE::Flow->init( + gather => output_iterator(), + max_workers => [ 1, $consumers ], + ); + + MCE::Flow->run( \&producer, \&consumer ); + MCE::Flow->finish; + + __END__ + + # Output + + n: 0 sqrt(n): 0.000000 + n: 1 sqrt(n): 1.000000 + n: 2 sqrt(n): 1.414214 + n: 3 sqrt(n): 1.732051 + n: 4 sqrt(n): 2.000000 + n: 5 sqrt(n): 2.236068 + n: 6 sqrt(n): 2.449490 + n: 7 sqrt(n): 2.645751 + n: 8 sqrt(n): 2.828427 + n: 9 sqrt(n): 3.000000 + ... + +=head1 SEE ALSO + +=over 3 + +=item * L<https://github.com/marioroy/mce-examples/tree/master/chameneos> + +=item * L<threads::lite> + +=back + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=head1 COPYRIGHT AND LICENSE + +Copyright (C) 2019 by Mario E. Roy + +MCE::Shared is released under the same license as Perl. + +See L<http://dev.perl.org/licenses/> for more information. + +=cut + diff --git a/lib/MCE/Channel/Mutex.pm b/lib/MCE/Channel/Mutex.pm new file mode 100644 index 0000000..27696ef --- /dev/null +++ b/lib/MCE/Channel/Mutex.pm @@ -0,0 +1,356 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Channel for producer(s) and many consumers supporting processes and threads. +## +############################################################################### + +package MCE::Channel::Mutex; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +use base 'MCE::Channel'; +use MCE::Mutex (); +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $freeze = MCE::Channel::_get_freeze(); +my $thaw = MCE::Channel::_get_thaw(); + +sub new { + my ( $class, %obj ) = ( @_, impl => 'Mutex' ); + + $obj{init_pid} = MCE::Channel::_pid(); + MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' ); + + # locking for the consumer side of the channel + $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' ); + + # optionally, support many-producers writing and reading + $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp}; + + bless \%obj, $class; + + if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) { + MCE::Mutex::Channel::_save_for_global_destruction($obj{c_mutex}); + MCE::Mutex::Channel::_save_for_global_destruction($obj{p_mutex}) + if $obj{mp}; + } + + return \%obj; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like methods. +## +############################################################################### + +sub end { + my ( $self ) = @_; + return if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', -1); + + $self->{ended} = 1; +} + +sub enqueue { + my $self = shift; + return MCE::Channel::_ended('enqueue') if $self->{ended}; + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock2 if $p_mutex; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + + while ( @_ ) { + my $data; + if ( ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ shift ]), $data .= '1'; + } else { + $data = shift, $data .= '0'; + } + print { $self->{p_sock} } pack('i', length $data), $data; + } + + $p_mutex->unlock2 if $p_mutex; + + return 1; +} + +sub dequeue { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + if ( $count == 1 ) { + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end, $c_mutex->unlock; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + $c_mutex->unlock; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; + } + else { + my ( $plen, @ret ); + + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + while ( $count-- ) { + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + $c_mutex->unlock; + + wantarray ? @ret : $ret[-1]; + } +} + +sub dequeue_nb { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + my ( $plen, @ret ); + ( my $c_mutex = $self->{c_mutex} )->lock; + + while ( $count-- ) { + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + $c_mutex->unlock; + + wantarray ? @ret : $ret[-1]; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; producer to consumer. +## +############################################################################### + +sub send { + my $self = shift; + return MCE::Channel::_ended('send') if $self->{ended}; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock2 if $p_mutex; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', length $data), $data; + + $p_mutex->unlock2 if $p_mutex; + + return 1; +} + +sub recv { + my ( $self ) = @_; + + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end, $c_mutex->unlock; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + $c_mutex->unlock; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv_nb { + my ( $self ) = @_; + + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + $c_mutex->unlock; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + $c_mutex->unlock; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; consumer to producer. +## +############################################################################### + +sub send2 { + my $self = shift; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + ( my $c_mutex = $self->{c_mutex} )->lock2; + MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32; + print { $self->{c_sock} } pack('i', length $data), $data; + $c_mutex->unlock2; + + return 1; +} + +sub recv2 { + my ( $self ) = @_; + my ( $plen, $data ); + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock if $p_mutex; + + MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32; + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + $p_mutex->unlock if $p_mutex; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv2_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock if $p_mutex; + + MCE::Util::_nonblocking( $self->{p_sock}, 1 ); + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{p_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len ) { + $p_mutex->unlock if $p_mutex; + return wantarray ? () : undef; + } + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + $p_mutex->unlock if $p_mutex; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel::Mutex - Channel for producer(s) and many consumers + +=head1 VERSION + +This document describes MCE::Channel::Mutex version 1.843 + +=head1 DESCRIPTION + +A channel class providing queue-like and two-way communication +for processes and threads. Locking is handled using MCE::Mutex. + + use MCE::Channel; + + # The default is tuned for one producer and many consumers. + my $chnl_a = MCE::Channel->new( impl => 'Mutex' ); + + # Specify the 'mp' option for safe use by two or more producers + # sending or recieving on the left side of the channel. + # E.g. C<->enqueue/->send> or C<->recv2/->recv2_nb> + + my $chnl_b = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + +The API is described in L<MCE::Channel>. + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Channel/Simple.pm b/lib/MCE/Channel/Simple.pm new file mode 100644 index 0000000..7593a36 --- /dev/null +++ b/lib/MCE/Channel/Simple.pm @@ -0,0 +1,332 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Channel tuned for one producer and one consumer involving no locking. +## +############################################################################### + +package MCE::Channel::Simple; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +use base 'MCE::Channel'; +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $freeze = MCE::Channel::_get_freeze(); +my $thaw = MCE::Channel::_get_thaw(); + +sub new { + my ( $class, %obj ) = ( @_, impl => 'Simple' ); + + $obj{init_pid} = MCE::Channel::_pid(); + MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' ); + + return bless \%obj, $class; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like methods. +## +############################################################################### + +sub end { + my ( $self ) = @_; + return if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', -1); + + $self->{ended} = 1; +} + +sub enqueue { + my $self = shift; + return MCE::Channel::_ended('enqueue') if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + + while ( @_ ) { + my $data; + if ( ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ shift ]), $data .= '1'; + } else { + $data = shift, $data .= '0'; + } + print { $self->{p_sock} } pack('i', length $data) . $data; + } + + return 1; +} + +sub dequeue { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + if ( $count == 1 ) { + my ( $plen, $data ); + + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; + } + else { + my ( $plen, @ret ); + + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + while ( $count-- ) { + my $data; + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + last; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + wantarray ? @ret : $ret[-1]; + } +} + +sub dequeue_nb { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + my ( $plen, @ret ); + + while ( $count-- ) { + my $data; + + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + last; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + wantarray ? @ret : $ret[-1]; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; producer(s) to consumers. +## +############################################################################### + +sub send { + my $self = shift; + return MCE::Channel::_ended('send') if $self->{ended}; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', length $data) . $data; + + return 1; +} + +sub recv { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + return wantarray ? () : undef; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; consumers to producer(s). +## +############################################################################### + +sub send2 { + my $self = shift; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32; + print { $self->{c_sock} } pack('i', length $data) . $data; + + return 1; +} + +sub recv2 { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32; + + $is_MSWin32 + ? sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + + $is_MSWin32 + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv2_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_nonblocking( $self->{p_sock}, 1 ); + + $is_MSWin32 + ? sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{p_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + return wantarray ? () : undef unless $len; + + $is_MSWin32 + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel::Simple - Channel tuned for one producer and one consumer + +=head1 VERSION + +This document describes MCE::Channel::Simple version 1.843 + +=head1 DESCRIPTION + +A channel class providing queue-like and two-way communication +for one process or thread on either end; no locking needed. + + use MCE::Channel; + + my $chnl = MCE::Channel->new( impl => 'Simple' ); + +The API is described in L<MCE::Channel>. + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Channel/Threads.pm b/lib/MCE/Channel/Threads.pm new file mode 100644 index 0000000..e41b606 --- /dev/null +++ b/lib/MCE/Channel/Threads.pm @@ -0,0 +1,361 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Channel for producer(s) and many consumers supporting threads only. +## +############################################################################### + +package MCE::Channel::Threads; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +use threads; +use threads::shared; + +use base 'MCE::Channel'; +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $freeze = MCE::Channel::_get_freeze(); +my $thaw = MCE::Channel::_get_thaw(); + +sub new { + my ( $class, %obj ) = ( @_, impl => 'Threads' ); + + $obj{init_pid} = MCE::Channel::_pid(); + MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' ); + + # locking for the consumer side of the channel + $obj{cr_mutex} = threads::shared::share( my $cr_mutex ); + $obj{cw_mutex} = threads::shared::share( my $cw_mutex ); + + # optionally, support many-producers writing and reading + $obj{pr_mutex} = threads::shared::share( my $pr_mutex ) if $obj{mp}; + $obj{pw_mutex} = threads::shared::share( my $pw_mutex ) if $obj{mp}; + + return bless \%obj, $class; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like methods. +## +############################################################################### + +sub end { + my ( $self ) = @_; + return if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', -1); + + $self->{ended} = 1; +} + +sub enqueue { + my $self = shift; + return MCE::Channel::_ended('enqueue') if $self->{ended}; + + { + CORE::lock $self->{pw_mutex} if $self->{pw_mutex}; + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + + while ( @_ ) { + my $data; + if ( ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ shift ]), $data .= '1'; + } else { + $data = shift, $data .= '0'; + } + print { $self->{p_sock} } pack('i', length $data), $data; + } + } + + return 1; +} + +sub dequeue { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + if ( $count == 1 ) { + my ( $plen, $data ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; + } + else { + my ( $plen, @ret ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + while ( $count-- ) { + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + } + + wantarray ? @ret : $ret[-1]; + } +} + +sub dequeue_nb { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + my ( $plen, @ret ); + + { + CORE::lock $self->{cr_mutex}; + + while ( $count-- ) { + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + } + + wantarray ? @ret : $ret[-1]; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; producer(s) to consumers. +## +############################################################################### + +sub send { + my $self = shift; + return MCE::Channel::_ended('send') if $self->{ended}; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + { + CORE::lock $self->{pw_mutex} if $self->{pw_mutex}; + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', length $data), $data; + } + + return 1; +} + +sub recv { + my ( $self ) = @_; + my ( $plen, $data ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; consumers to producer(s). +## +############################################################################### + +sub send2 { + my $self = shift; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + { + CORE::lock $self->{cw_mutex}; + MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32; + print { $self->{c_sock} } pack('i', length $data), $data; + } + + return 1; +} + +sub recv2 { + my ( $self ) = @_; + my ( $plen, $data ); + + { + my $pr_mutex = $self->{pr_mutex}; + CORE::lock $pr_mutex if $pr_mutex; + + MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32; + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv2_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + { + my $pr_mutex = $self->{pr_mutex}; + CORE::lock $pr_mutex if $pr_mutex; + + MCE::Util::_nonblocking( $self->{p_sock}, 1 ); + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{p_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + + return wantarray ? () : undef unless $len; + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel::Threads - Channel for producer(s) and many consumers + +=head1 VERSION + +This document describes MCE::Channel::Threads version 1.843 + +=head1 DESCRIPTION + +A channel class providing queue-like and two-way communication +for threads only. Locking is handled using threads::shared. + + use MCE::Channel; + + # The default is tuned for one producer and many consumers. + my $chnl_a = MCE::Channel->new( impl => 'Threads' ); + + # Specify the 'mp' option for safe use by two or more producers + # sending or recieving on the left side of the channel. + # E.g. C<->enqueue/->send> or C<->recv2/->recv2_nb> + + my $chnl_b = MCE::Channel->new( impl => 'Threads', mp => 1 ); + +The API is described in L<MCE::Channel>. + +=head1 LIMITATIONS + +The t/04_channel_threads tests are disabled on Unix platforms for Perl +less than 5.10.1. Basically, the MCE::Channel::Threads implementation +is not supported on older Perls unless the OS vendor applied upstream +patches (i.e. works on RedHat/CentOS 5.x running Perl 5.8.x). + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Child.pm b/lib/MCE/Child.pm new file mode 100644 index 0000000..38aebf2 --- /dev/null +++ b/lib/MCE/Child.pm @@ -0,0 +1,1940 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## A threads-like parallelization module compatible with Perl 5.8. +## +############################################################################### + +use strict; +use warnings; + +no warnings qw( threads recursion uninitialized once redefine ); + +package MCE::Child; + +our $VERSION = '1.843'; + +## no critic (BuiltinFunctions::ProhibitStringyEval) +## no critic (Subroutines::ProhibitExplicitReturnUndef) +## no critic (Subroutines::ProhibitSubroutinePrototypes) +## no critic (TestingAndDebugging::ProhibitNoStrict) + +use MCE::Signal (); +use MCE::Channel; +use Time::HiRes 'sleep'; +use bytes; + +use overload ( + q(==) => \&equal, + q(!=) => sub { !equal(@_) }, + fallback => 1 +); + +sub import { + no strict 'refs'; no warnings 'redefine'; + *{ caller().'::mce_child' } = \&child; + return; +} + +## The POSIX module has many symbols. Try not loading it simply +## to have WNOHANG. The following covers most platforms. + +use constant { + _WNOHANG => ( $INC{'POSIX.pm'} ) + ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1 +}; + +my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} ); + +my $_freeze = MCE::Channel::_get_freeze(); +my $_thaw = MCE::Channel::_get_thaw(); + +my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0; +my $_has_threads = $INC{'threads.pm'} ? 1 : 0; +my $_tid = $_has_threads ? threads->tid() : 0; + +sub CLONE { + $_tid = threads->tid(), &_clear() if $_has_threads; +} + +sub _clear { + %{ $_LIST } = (); +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Init routine. +## +############################################################################### + +bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__; + +sub init { + shift if ( defined $_[0] && $_[0] eq __PACKAGE__ ); + + # -- options ---------------------------------------------------------- + # max_workers child_timeout posix_exit on_start on_finish void_context + # --------------------------------------------------------------------- + + my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller ); + my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ }; + + @_ = (); + + $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg, + $mngd->{WRK_ID} = $$; + + &_force_reap($pkg), $_DATA->{$pkg}->clear() if exists $_LIST->{$pkg}; + + # Start the shared-manager process if not running. + MCE::Shared->start() if $INC{'MCE/Shared.pm'}; + + if ( !exists $_LIST->{$pkg} ) { + my $chnl = MCE::Channel->new( impl => 'Mutex' ); + $_LIST->{ $pkg } = MCE::Child::_ordhash->new(); + $_DELY->{ $pkg } = MCE::Child::_delay->new( $chnl ); + $_DATA->{ $pkg } = MCE::Child::_hash->new( $chnl ); + $_DATA->{"$pkg:seed"} = int(rand() * 1e9); + $_DATA->{"$pkg:id" } = 0; + } + + if ( !exists $mngd->{posix_exit} ) { + $mngd->{posix_exit} = 1 if ( + ( $_has_threads && $_tid ) || $INC{'Mojo/IOLoop.pm'} || + $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} || + $INC{'Prima.pm'} || $INC{'Tk.pm'} || $INC{'Wx.pm'} || + $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'} || + $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || + $INC{'Win32/GUI.pm'} || $INC{'stfl.pm'} + ); + } + + if ( $mngd->{max_workers} ) { + my $cpus = $mngd->{max_workers}; + $cpus = MCE::Util::get_ncpu() if $cpus eq 'auto'; + $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1; + $mngd->{max_workers} = int($cpus); + } + + if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) { + local $@; eval 'require Net::HTTP; require Net::HTTPS'; + } + + require POSIX + if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 ); + + return; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## 'new', 'child (mce_child)', and 'create' for threads-like similarity. +## +############################################################################### + +## 'new' and 'tid' are aliases for 'create' and 'pid' respectively. + +*new = \&create, *tid = \&pid; + +## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2) +## Tip found in threads::async. + +sub child (&;@) { + goto &create; +} + +sub create { + my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do { + # construct mngd internally on first use unless defined + init(); $_MNGD->{ "$$.$_tid.".caller() }; + }; + + shift if ( $_[0] eq __PACKAGE__ ); + + # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ + + my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__; + + $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG}; + $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' ); + + my $func = shift; $func = caller().'::'.$func + if ( !ref $func && length $func && index($func,':') < 0 ); + + if ( !defined $func ) { + local $\; print {*STDERR} "code function is not specified or valid\n"; + return undef; + } + + my ( $list, $max_workers, $pkg ) = ( + $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG} + ); + + $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) > 2e9 ); + + if ( $max_workers ) { + local $!; + + # Reap completed child processes. + $_DATA->{$pkg}->reapdata; + + for my $wrk_id ( keys %{ $list->[0] } ) { + waitpid($wrk_id, _WNOHANG) or next; + _reap_child($list->del($wrk_id), 0); + } + + # Wait for a slot if saturated. + if ( keys(%{ $list->[0] }) >= $max_workers ) { + my $count = keys(%{ $list->[0] }) - $max_workers + 1; + _wait_one($pkg) for 1 .. $count; + } + } + + # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ + + local $SIG{TTIN} unless $_is_MSWin32; + local $SIG{TTOU} unless $_is_MSWin32; + local $SIG{WINCH} unless $_is_MSWin32; + + my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages + my $pid = fork(); + + if ( !defined $pid ) { # error + local $\; print {*STDERR} "fork error: $!\n"; + + return undef; + } + elsif ( $pid ) { # parent + $self->{WRK_ID} = $pid, $list->set($pid, $self); + $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start}; + + return $self; + } + + %{ $_LIST } = (), $_SELF = $self; # child + + if ( UNIVERSAL::can('Prima', 'cleanup') ) { + no warnings 'redefine'; local $@; eval '*Prima::cleanup = sub {}'; + } + + MCE::Shared::init($id) if $INC{'MCE/Shared.pm'}; + + # Sets the seed of the base generator uniquely between workers. + # The new seed is computed using the current seed and ID value. + # One may set the seed at the application level for predictable + # results. Ditto for Math::Prime::Util, Math::Random, and + # Math::Random::MT::Auto. + + srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 ); + + if ( $INC{'Math/Prime/Util.pm'} ) { + Math::Prime::Util::srand( + abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 + ); + } + + if ( $INC{'Math/Random.pm'} ) { + my $cur_seed = Math::Random::random_get_seed(); + my $new_seed = ($cur_seed < 1073741781) + ? $cur_seed + ((abs($id) * 10000) % 1073741780) + : $cur_seed - ((abs($id) * 10000) % 1073741780); + + Math::Random::random_set_seed($new_seed, $new_seed); + } + + if ( $INC{'Math/Random/MT/Auto.pm'} ) { + my $cur_seed = Math::Random::MT::Auto::get_seed()->[0]; + my $new_seed = ($cur_seed < 1073741781) + ? $cur_seed + ((abs($id) * 10000) % 1073741780) + : $cur_seed - ((abs($id) * 10000) % 1073741780); + + Math::Random::MT::Auto::set_seed($new_seed); + } + + _dispatch($mngd, $func, \@args); +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Public methods. +## +############################################################################### + +sub equal { + return 0 unless ( ref $_[0] && ref $_[1] ); + $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0; +} + +sub error { + _croak('Usage: $child->error()') unless ref( my $self = $_[0] ); + $self->join() if ( !exists $self->{JOINED} ); + $self->{ERROR} || undef; +} + +sub exit { + shift if ( defined $_[0] && $_[0] eq __PACKAGE__ ); + + my ( $self ) = ( ref $_[0] ? shift : $_SELF ); + my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} ); + + if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) { + MCE::Child->finish('MCE'); CORE::exit(@_); + } + elsif ( $wrk_id == $$ ) { + alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0; + $_DATA->{$pkg}->set('R'.$wrk_id, @res ? $_freeze->(\@res) : ''); + die "Child exited ($?)\n"; + _exit($?); # not reached + } + + return $self if ( exists $self->{JOINED} ); + + if ( exists $_DATA->{$pkg} ) { + sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id); + } else { + sleep 0.030; + } + + if ($_is_MSWin32) { + CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id); + } else { + CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id); + } + + $self; +} + +sub finish { + _croak('Usage: MCE::Child->finish()') if ref($_[0]); + shift if ( defined $_[0] && $_[0] eq __PACKAGE__ ); + + my $pkg = defined($_[0]) ? $_[0] : caller(); + + if ( $pkg eq 'MCE' ) { + for my $key ( keys %{ $_LIST } ) { MCE::Child->finish($key); } + } + elsif ( exists $_LIST->{$pkg} ) { + return if $MCE::Signal::KILLED; + + if ( exists $_DELY->{$pkg} ) { + &_force_reap($pkg); + delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}), + delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}), + delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg }); + } + } + + @_ = (); + + return; +} + +sub is_joinable { + _croak('Usage: $child->is_joinable()') unless ref( my $self = $_[0] ); + my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} ); + + if ( $wrk_id == $$ ) { + ''; + } + elsif ( $self->{MGR_ID} eq "$$.$_tid" ) { + return undef if ( exists $self->{JOINED} ); + local $!; $_DATA->{$pkg}->reapdata; + ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do { + _reap_child($_LIST->{$pkg}->del($self->{WRK_ID}), 0); + 1; + }; + } + else { + return undef if ( exists $self->{JOINED} ); + $_DATA->{$pkg}->exists('R'.$wrk_id) ? 1 : ''; + } +} + +sub is_running { + _croak('Usage: $child->is_running()') unless ref( my $self = $_[0] ); + my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} ); + + if ( $wrk_id == $$ ) { + 1; + } + elsif ( $self->{MGR_ID} eq "$$.$_tid" ) { + return undef if ( exists $self->{JOINED} ); + local $!; $_DATA->{$pkg}->reapdata; + ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do { + _reap_child($_LIST->{$pkg}->del($self->{WRK_ID}), 0); + ''; + }; + } + else { + return undef if ( exists $self->{JOINED} ); + $_DATA->{$pkg}->exists('R'.$wrk_id) ? '' : 1; + } +} + +sub join { + _croak('Usage: $child->join()') unless ref( my $self = $_[0] ); + my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} ); + + if ( exists $self->{JOINED} ) { + _croak('Child already joined') unless exists( $self->{RESULT} ); + + return ( defined wantarray ) + ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1] + : (); + } + + if ( $wrk_id == $$ ) { + _croak('Cannot join self'); + } + elsif ( $self->{MGR_ID} eq "$$.$_tid" ) { + _reap_child($_LIST->{$pkg}->del($wrk_id), 1); + } + else { + sleep 0.3 until ( $_DATA->{$pkg}->exists('R'.$wrk_id) ); + _reap_child($self, 0); + } + + ( defined wantarray ) + ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1] + : (); +} + +sub kill { + _croak('Usage: $child->kill()') unless ref( my $self = $_[0] ); + my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] ); + + if ( $wrk_id == $$ ) { + CORE::kill($signal || 'INT', $$); + return $self; + } + if ( $self->{MGR_ID} eq "$$.$_tid" ) { + return $self if ( exists $self->{JOINED} ); + if ( exists $_DATA->{$pkg} ) { + sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id); + } else { + sleep 0.030; + } + } + + CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id); + + $self; +} + +sub list { + _croak('Usage: MCE::Child->list()') if ref($_[0]); + my $pkg = "$$.$_tid.".caller(); + + ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : (); +} + +sub list_joinable { + _croak('Usage: MCE::Child->list_joinable()') if ref($_[0]); + my $pkg = "$$.$_tid.".caller(); + + return () unless ( my $list = $_LIST->{$pkg} ); + local ($!, $?, $_); + + $_DATA->{$pkg}->reapdata; + + map { + ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do { + _reap_child($list->del($_->{WRK_ID}), 0); + $_; + }; + } + $list->vals(); +} + +sub list_running { + _croak('Usage: MCE::Child->list_running()') if ref($_[0]); + my $pkg = "$$.$_tid.".caller(); + + return () unless ( my $list = $_LIST->{$pkg} ); + local ($!, $?, $_); + + $_DATA->{$pkg}->reapdata; + + map { + ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do { + _reap_child($list->del($_->{WRK_ID}), 0); + (); + }; + } + $list->vals(); +} + +sub max_workers { + _croak('Usage: MCE::Child->max_workers()') if ref($_[0]); + my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do { + # construct mngd internally on first use unless defined + init(); $_MNGD->{ "$$.$_tid.".caller() }; + }; + shift if ( $_[0] eq __PACKAGE__ ); + + if ( @_ ) { + $mngd->{max_workers} = shift; + if ( $mngd->{max_workers} ) { + my $cpus = $mngd->{max_workers}; + $cpus = MCE::Util::get_ncpu() if $cpus eq 'auto'; + $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1; + $mngd->{max_workers} = int($cpus); + } + } + + $mngd->{max_workers}; +} + +sub pending { + _croak('Usage: MCE::Child->pending()') if ref($_[0]); + my $pkg = "$$.$_tid.".caller(); + + ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0; +} + +sub pid { + ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID}; +} + +sub result { + _croak('Usage: $child->result()') unless ref( my $self = $_[0] ); + return $self->join() if ( !exists $self->{JOINED} ); + + _croak('Child already joined') unless exists( $self->{RESULT} ); + wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]; +} + +sub self { + ref($_[0]) ? $_[0] : $_SELF; +} + +sub wait_all { + _croak('Usage: MCE::Child->wait_all()') if ref($_[0]); + my $pkg = "$$.$_tid.".caller(); + + return wantarray ? () : 0 + if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() ); + + local $_; ( wantarray ) + ? map { $_->join(); $_ } $_LIST->{$pkg}->vals() + : map { $_->join(); () } $_LIST->{$pkg}->vals(); +} + +*waitall = \&wait_all; # compatibility + +sub wait_one { + _croak('Usage: MCE::Child->wait_one()') if ref($_[0]); + my $pkg = "$$.$_tid.".caller(); + + return undef + if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() ); + + _wait_one($pkg); +} + +*waitone = \&wait_one; # compatibility + +sub yield { + _croak('Usage: MCE::Child->yield()') if ref($_[0]); + shift if ( defined $_[0] && $_[0] eq __PACKAGE__ ); + my $pkg = $_SELF->{PKG}; + + return unless ( my $mngd = $_MNGD->{$pkg} ); + + ( $INC{'Coro/AnyEvent.pm'} ) + ? Coro::AnyEvent::sleep( $_DELY->{$pkg}->seconds(@_) ) + : sleep $_DELY->{$pkg}->seconds(@_); + + return; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Private methods. +## +############################################################################### + +sub _croak { + if ( $INC{'MCE.pm'} ) { + goto &MCE::_croak; + } + else { + $SIG{__DIE__} = \&MCE::Signal::_die_handler; + $SIG{__WARN__} = \&MCE::Signal::_warn_handler; + + $\ = undef; goto &Carp::croak; + } +} + +sub _dispatch { + my ( $mngd, $func, $args ) = @_; + $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$; + + $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32; + $SIG{TERM} = $SIG{INT} = $SIG{HUP} = \&_trap; + $SIG{QUIT} = \&_quit; + + { + local $!; + (*STDERR)->autoflush(1) if defined( fileno *STDERR ); + (*STDOUT)->autoflush(1) if defined( fileno *STDOUT ); + } + + # Run task. + $_DATA->{ $_SELF->{PKG} }->set('S'.$$, ''), $? = 0; + + my $child_timeout = ( exists $_SELF->{child_timeout} ) + ? $_SELF->{child_timeout} : $mngd->{child_timeout}; + + my $void_context = ( exists $_SELF->{void_context} ) + ? $_SELF->{void_context} : $mngd->{void_context}; + + my @res; local $SIG{'ALRM'} = sub { alarm 0; die "Child timed out\n" }; + + if ( $void_context ) { + no strict 'refs'; + eval { + alarm( $child_timeout || 0 ); + $func->( @{ $args } ); + }; + } + else { + no strict 'refs'; + @res = eval { + alarm( $child_timeout || 0 ); + $func->( @{ $args } ); + }; + } + + alarm 0; _exit($?) if ( $@ && $@ =~ /^Child exited \(\S+\)$/ ); + + if ( $@ ) { + my $err = $@; + $? = 1, $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err); + warn "Child $$ terminated abnormally: reason $err\n" if ( + $err ne "Child timed out" && !$mngd->{on_finish} + ); + } + + $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? $_freeze->(\@res) : ''); + + _exit($?); +} + +sub _exit { + my ( $exit_status ) = @_; + + # Check for nested workers not yet joined. + if ( !$_SELF->{SIGNALED} ) { + MCE::Child->finish('MCE') if ( keys %{ $_LIST } > 0 ); + MCE::Hobo->finish('MCE') if ( $INC{'MCE/Hobo.pm'} ); + } + + # Exit child process. + $SIG{__DIE__} = sub { } unless $_tid; + $SIG{__WARN__} = sub { }; + + threads->exit($exit_status) if ( $_has_threads && $_is_MSWin32 ); + + if ( ! $_tid ) { + $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { + $SIG{$_[0]} = $SIG{INT} = sub { }; + CORE::kill($_[0], getppid()) if ( $_[0] eq 'INT' && !$_is_MSWin32 ); + CORE::kill('KILL', $$); + }; + } + + my $posix_exit = ( exists $_SELF->{posix_exit} ) + ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit}; + + if ( $posix_exit && !$_is_MSWin32 ) { + eval { MCE::Mutex::Channel::_destroy() }; + POSIX::_exit($exit_status) if $INC{'POSIX.pm'}; + CORE::kill('KILL', $$); + } + + CORE::exit($exit_status); +} + +sub _force_reap { + my ( $count, $pkg ) = ( 0, @_ ); + return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() ); + + for my $child ( $_LIST->{$pkg}->vals() ) { + if ( $child->is_running() ) { + CORE::kill('KILL', $child->pid()) + if CORE::kill('ZERO', $child->pid()); + $count++; + } + } + + $_LIST->{$pkg}->clear(); + + warn "Finished with active child processes [$pkg] ($count)\n" + if ( $count && !$_is_MSWin32 ); + + return; +} + +sub _quit { + my ( $name ) = @_; + $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//; + + $SIG{$name} = sub {}, CORE::kill($name, -$$) + if ( exists $SIG{$name} ); + + _exit(0); +} + +sub _reap_child { + my ( $child, $wait_flag ) = @_; + local @_ = $_DATA->{ $child->{PKG} }->get( $child->{WRK_ID}, $wait_flag ); + + ( $child->{ERROR}, $child->{RESULT}, $child->{JOINED} ) = + ( pop || '', length $_[0] ? $_thaw->(pop) : [], 1 ); + + if ( my $on_finish = $_MNGD->{ $child->{PKG} }{on_finish} ) { + my ( $exit, $err ) = ( $? || 0, $child->{ERROR} ); + my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f ); + + if ( ( $code > 100 || $sig == 9 ) && !$err ) { + $code = 2, $sig = 1, $err = 'received SIGHUP' if $code == 101; + $code = 2, $sig = 2, $err = 'received SIGINT' if $code == 102; + $code = 2, $sig = 15, $err = 'received SIGTERM' if $code == 115; + $code = 2, $sig = 9, $err = 'received SIGKILL' if $sig == 9; + } + + $on_finish->( + $child->{WRK_ID}, $code, $child->{ident}, $sig, $err, + @{ $child->{RESULT} } + ); + } + + return; +} + +sub _trap { + my ( $exit_status, $name ) = ( 2, @_ ); + $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//; + + $SIG{$name} = sub {}, CORE::kill($name, -$$) + if ( exists $SIG{$name} ); + + if ( $name eq 'HUP' ) { $exit_status = 101 } + elsif ( $name eq 'INT' ) { $exit_status = 102 } + elsif ( $name eq 'TERM' ) { $exit_status = 115 } + + _exit($exit_status); +} + +sub _wait_one { + my ( $pkg ) = @_; + my ( $list, $data ) = ( $_LIST->{$pkg}, $_DATA->{$pkg} ); + my ( $self, $wrk_id, $found ); local $!; + + while () { + for my $child ( $list->vals ) { + $wrk_id = $child->{WRK_ID}; + $found = $data->exists('R'.$wrk_id); + waitpid($wrk_id, 0), $self = $list->del($wrk_id), last if $found; + + $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG); + } + last if $self; + sleep 0.015; + } + + _reap_child($self, 0); + + $self; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Delay implementation suited for MCE::Child. +## +############################################################################### + +package # hide from rpm + MCE::Child::_delay; + +sub new { + my ( $class, $chnl, $delay ) = @_; + + if ( !defined $delay ) { + $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008; + } + + $chnl->send(undef); + + bless [ $delay, $chnl ], $class; +} + +sub seconds { + my ( $self, $how_long ) = @_; + my ( $delay, $time ) = ( $how_long || $self->[0], Time::HiRes::time() ); + my ( $lapse ) = $self->[1]->recv(); + + if ( !$lapse || $time >= $lapse ) { + $self->[1]->send($time + $delay); + return $delay; + } + + $self->[1]->send( $lapse += $delay ); + + return $lapse - $time; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Hash and ordhash implementations suited for MCE::Child. +## +############################################################################### + +package # hide from rpm + MCE::Child::_hash; + +use Time::HiRes 'sleep'; + +use constant { + _WNOHANG => ( $INC{'POSIX.pm'} ) + ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1 +}; + +sub new { + my ( $class, $chnl ) = @_; + + bless [ {}, $chnl ], shift; +} + +sub clear { + my ( $self ) = @_; + + 1 while ( $self->[1]->recv2_nb() ); + + %{ $self->[0] } = (); +} + +sub exists { + my ( $self, $key ) = @_; + + while ( my $data = $self->[1]->recv2_nb() ) { + $self->[0]{ $data->[0] } = $data->[1]; + } + + CORE::exists $self->[0]{ $key }; +} + +sub get { + my ( $self, $wrk_id, $wait_flag ) = @_; + + if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) { + while ( my $data = $self->[1]->recv2_nb() ) { + $self->[0]{ $data->[0] } = $data->[1]; + } + } + + if ( $wait_flag ) { + local $!; + ( CORE::exists $self->[0]{ 'R'.$wrk_id } ) ? waitpid($wrk_id, 0) : do { + while () { + my $data = $self->[1]->recv2_nb(); + if ( !defined $data ) { + last if waitpid($wrk_id, _WNOHANG); + sleep(0.015), next; + } + $self->[0]{ $data->[0] } = $data->[1]; + waitpid($wrk_id, 0), last if $data->[0] eq 'R'.$wrk_id; + } + }; + } + + if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) { + sleep 0.015; # retry + while ( my $data = $self->[1]->recv2_nb() ) { + $self->[0]{ $data->[0] } = $data->[1]; + } + } + + my $result = delete $self->[0]{ 'R'.$wrk_id }; + my $error = delete $self->[0]{ 'S'.$wrk_id }; + + $result = '' unless defined $result; + $error = '' unless defined $error; + + return ( $result, $error ); +} + +sub reapdata { + my ( $self ) = @_; + + while ( my $data = $self->[1]->recv2_nb() ) { + $self->[0]{ $data->[0] } = $data->[1]; + } + + return; +} + +sub set { + $_[0]->[1]->send2([ $_[1], $_[2] ]); +} + +package # hide from rpm + MCE::Child::_ordhash; + +sub new { my $gcnt = 0; bless [ {}, [], {}, \$gcnt ], shift; } +sub exists { CORE::exists $_[0]->[0]{ $_[1] }; } +sub get { $_[0]->[0]{ $_[1] }; } +sub len { scalar keys %{ $_[0]->[0] }; } + +sub clear { + %{ $_[0]->[0] } = @{ $_[0]->[1] } = %{ $_[0]->[2] } = (); + ${ $_[0]->[3] } = 0; + + return; +} + +sub del { + my ( $data, $keys, $indx, $gcnt ) = @{ $_[0] }; + my $pos = delete $indx->{ $_[1] }; + return undef unless defined $pos; + + $keys->[ $pos ] = undef; + + if ( ++${ $gcnt } > @{ $keys } * 0.667 ) { + my $i; $i = ${ $gcnt } = 0; + for my $k ( @{ $keys } ) { + $keys->[ $i ] = $k, $indx->{ $k } = $i++ if ( defined $k ); + } + splice @{ $keys }, $i; + } + + delete $data->{ $_[1] }; +} + +sub set { + my ( $key, $data, $keys, $indx ) = ( $_[1], @{ $_[0] } ); + $data->{ $key } = $_[2], $indx->{ $key } = @{ $keys }; + push @{ $keys }, "$key"; + + return; +} + +sub vals { + my ( $self ) = @_; + + ${ $self->[3] } + ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } } + : @{ $self->[0] }{ @{ $self->[1] } }; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Child - A threads-like parallelization module compatible with Perl 5.8 + +=head1 VERSION + +This document describes MCE::Child version 1.843 + +=head1 SYNOPSIS + + use MCE::Child; + + MCE::Child->init( + max_workers => 'auto', # default undef, unlimited + child_timeout => 20, # default undef, no timeout + posix_exit => 1, # default undef, CORE::exit + void_context => 1, # default undef + on_start => sub { + my ( $pid, $ident ) = @_; + ... + }, + on_finish => sub { + my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_; + ... + } + ); + + MCE::Child->create( sub { print "Hello from child\n" } )->join(); + + sub parallel { + my ($arg1) = @_; + print "Hello again, $arg1\n" if defined($arg1); + print "Hello again, $_\n"; # same thing + } + + MCE::Child->create( \¶llel, $_ ) for 1 .. 3; + + my @procs = MCE::Child->list(); + my @running = MCE::Child->list_running(); + my @joinable = MCE::Child->list_joinable(); + my @count = MCE::Child->pending(); + + # Joining is orderly, e.g. child1 is joined first, child2, child3. + $_->join() for @procs; + + # Joining occurs immediately as child processes complete execution. + 1 while MCE::Child->wait_one(); + + my $child = mce_child { foreach (@files) { ... } }; + + $child->join(); + + if ( my $err = $child->error() ) { + warn "Child error: $err\n"; + } + + # Get a child's object + $child = MCE::Child->self(); + + # Get a child's ID + $pid = MCE::Child->pid(); # $$ + $pid = $child->pid(); + $pid = MCE::Child->tid(); # tid is an alias for pid + $pid = $child->tid(); + + # Test child objects + if ( $child1 == $child2 ) { + ... + } + + # Give other workers a chance to run + MCE::Child->yield(); + MCE::Child->yield(0.05); + + # Return context, wantarray aware + my ($value1, $value2) = $child->join(); + my $value = $child->join(); + + # Check child's state + if ( $child->is_running() ) { + sleep 1; + } + if ( $child->is_joinable() ) { + $child->join(); + } + + # Send a signal to a child + $child->kill('SIGUSR1'); + + # Exit a child + MCE::Child->exit(0); + MCE::Child->exit(0, @ret); + +=head1 DESCRIPTION + +L<MCE::Child> is a fork of L<MCE::Hobo> for compatibility with Perl 5.8. + +A child is a migratory worker inside the machine that carries the asynchronous +gene. Child processes are equipped with C<threads>-like capability for running +code asynchronously. Unlike threads, each child is a unique process to the +underlying OS. The IPC is handled via C<MCE::Channel>, which runs on all the +major platforms including Cygwin and Strawberry Perl. + +C<MCE::Child> may be used as a standalone or together with C<MCE> including +running alongside C<threads>. + + use MCE::Child; + use MCE::Shared; + + # synopsis: head -20 file.txt | perl script.pl + + my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared + my $ofh = MCE::Shared->handle( ">", \*STDOUT ); + my $ary = MCE::Shared->array(); + + sub parallel_task { + my ( $id ) = @_; + while ( <$ifh> ) { + printf {$ofh} "[ %4d ] %s", $., $_; + # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing + $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO + } + } + + my $child1 = MCE::Child->new( "parallel_task", 1 ); + my $child2 = MCE::Child->new( \¶llel_task, 2 ); + my $child3 = MCE::Child->new( sub { parallel_task(3) } ); + + $_->join for MCE::Child->list(); # ditto: MCE::Child->wait_all(); + + # search array (total one round-trip via IPC) + my @vals = $ary->vals( "val =~ / ID 2 /" ); + + print {*STDERR} join("", @vals); + +=head1 API DOCUMENTATION + +=over 3 + +=item $child = MCE::Child->create( FUNCTION, ARGS ) + +=item $child = MCE::Child->new( FUNCTION, ARGS ) + +This will create a new child process that will begin execution with function +as the entry point, and optionally ARGS for list of parameters. It will return +the corresponding MCE::Child object, or undef if child creation failed. + +I<FUNCTION> may either be the name of a function, an anonymous subroutine, or +a code ref. + + my $child = MCE::Child->create( "func_name", ... ); + # or + my $child = MCE::Child->create( sub { ... }, ... ); + # or + my $child = MCE::Child->create( \&func, ... ); + +=item $child = MCE::Child->create( { options }, FUNCTION, ARGS ) + +=item $child = MCE::Child->create( IDENT, FUNCTION, ARGS ) + +Options, excluding C<ident>, may be specified globally via the C<init> function. +Otherwise, C<ident>, C<child_timeout>, C<posix_exit>, and C<void_context> may +be set uniquely. + +The C<ident> option is used by callback functions C<on_start> and C<on_finish> +for identifying the started and finished child process respectively. + + my $child1 = MCE::Child->create( { posix_exit => 1 }, sub { + ... + } ); + + $child1->join; + + my $child2 = MCE::Child->create( { child_timeout => 3 }, sub { + sleep 1 for ( 1 .. 9 ); + } ); + + $child2->join; + + if ( $child2->error() eq "Child timed out\n" ) { + ... + } + +The C<new()> method is an alias for C<create()>. + +=item mce_child { BLOCK } ARGS; + +=item mce_child { BLOCK }; + +C<mce_child> runs the block asynchronously similarly to C<MCE::Child->create()>. +It returns the child object, or undef if child creation failed. + + my $child = mce_child { foreach (@files) { ... } }; + + $child->join(); + + if ( my $err = $child->error() ) { + warn("Child error: $err\n"); + } + +=item $child->join() + +This will wait for the corresponding child process to complete its execution. +In non-voided context, C<join()> will return the value(s) of the entry point +function. + +The context (void, scalar or list) for the return value(s) for C<join> is +determined at the time of joining and mostly C<wantarray> aware. + + my $child1 = MCE::Child->create( sub { + my @res = qw(foo bar baz); + return (@res); + }); + + my @res1 = $child1->join(); # ( foo, bar, baz ) + my $res1 = $child1->join(); # baz + + my $child2 = MCE::Child->create( sub { + return 'foo'; + }); + + my @res2 = $child2->join(); # ( foo ) + my $res2 = $child2->join(); # foo + +=item $child1->equal( $child2 ) + +Tests if two child objects are the same child or not. Child comparison is based +on process IDs. This is overloaded to the more natural forms. + + if ( $child1 == $child2 ) { + print("Child objects are the same\n"); + } + # or + if ( $child1 != $child2 ) { + print("Child objects differ\n"); + } + +=item $child->error() + +Child processes are executed in an C<eval> context. This method will return +C<undef> if the child terminates I<normally>. Otherwise, it returns the value +of C<$@> associated with the child's execution status in its C<eval> context. + +=item $child->exit() + +This sends C<'SIGQUIT'> to the child process, notifying the child to exit. +It returns the child object to allow for method chaining. It is important to +join later if not immediately to not leave a zombie or defunct process. + + $child->exit()->join(); + ... + + $child->join(); # later + +=item MCE::Child->exit( 0 ) + +=item MCE::Child->exit( 0, @ret ) + +A child can exit at any time by calling C<MCE::Child->exit()>. Otherwise, the +behavior is the same as C<exit(status)> when called from the main process. +The child process may optionally return data, to be sent via IPC. + +=item MCE::Child->finish() + +This class method is called automatically by C<END>, but may be called +explicitly. An error is emitted via croak if there are active child +processes not yet joined. + + MCE::Child->create( 'task1', $_ ) for 1 .. 4; + $_->join for MCE::Child->list(); + + MCE::Child->create( 'task2', $_ ) for 1 .. 4; + $_->join for MCE::Child->list(); + + MCE::Child->create( 'task3', $_ ) for 1 .. 4; + $_->join for MCE::Child->list(); + + MCE::Child->finish(); + +=item MCE::Child->init( options ) + +The init function accepts a list of MCE::Child options. + + MCE::Child->init( + max_workers => 'auto', # default undef, unlimited + child_timeout => 20, # default undef, no timeout + posix_exit => 1, # default undef, CORE::exit + void_context => 1, # default undef + on_start => sub { + my ( $pid, $ident ) = @_; + ... + }, + on_finish => sub { + my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_; + ... + } + ); + + # Identification given as an option or the 1st argument. + + for my $key ( 'aa' .. 'zz' ) { + MCE::Child->create( { ident => $key }, sub { ... } ); + MCE::Child->create( $key, sub { ... } ); + } + + MCE::Child->wait_all; + +Set C<max_workers> if you want to limit the number of workers by waiting +automatically for an available slot. Specify C<auto> to obtain the number +of logical cores via C<MCE::Util::get_ncpu()>. + +Set C<child_timeout>, in number of seconds, if you want the child process +to terminate after some time. The default is C<0> for no timeout. + +Set C<posix_exit> to avoid all END and destructor processing. Constructing +MCE::Child inside a thread implies 1 or if present CGI, FCGI, Coro, Curses, +Gearman::Util, Gearman::XS, LWP::UserAgent, Mojo::IOLoop, Prima, STFL, +Tk, Wx, or Win32::GUI. + +Set C<void_context> to create the child process in void context for the +return value. Otherwise, the return context is wantarray-aware for +C<join()> and C<result()> and determined when retrieving the data. + +The callback options C<on_start> and C<on_finish> are called in the parent +process after starting the worker and later when terminated. The arguments +for the subroutines were inspired by L<Parallel::ForkManager>. + +The parameters for C<on_start> are the following: + + - pid of the child process + - identification (ident option or 1st arg to create) + +The parameters for C<on_finish> are the following: + + - pid of the child process + - program exit code + - identification (ident option or 1st arg to create) + - exit signal id + - error message from eval inside MCE::Child + - returned data + +=item $child->is_running() + +Returns true if a child is still running. + +=item $child->is_joinable() + +Returns true if the child has finished running and not yet joined. + +=item $child->kill( 'SIG...' ) + +Sends the specified signal to the child. Returns the child object to allow for +method chaining. As with C<exit>, it is important to join eventually if not +immediately to not leave a zombie or defunct process. + + $child->kill('SIG...')->join(); + +The following is a parallel demonstration comparing C<MCE::Shared> against +C<Redis> and C<Redis::Fast> on a Fedora 23 VM. Joining begins after all +workers have been notified to quit. + + use Time::HiRes qw(time); + + use Redis; + use Redis::Fast; + + use MCE::Child; + use MCE::Shared; + + my $redis = Redis->new(); + my $rfast = Redis::Fast->new(); + my $array = MCE::Shared->array(); + + sub parallel_redis { + my ($_redis) = @_; + my ($count, $quit, $len) = (0, 0); + + # instead, use a flag to exit loop + $SIG{'QUIT'} = sub { $quit = 1 }; + + while () { + $len = $_redis->rpush('list', $count++); + last if $quit; + } + + $count; + } + + sub parallel_array { + my ($count, $quit, $len) = (0, 0); + + # do not exit from inside handler + $SIG{'QUIT'} = sub { $quit = 1 }; + + while () { + $len = $array->push($count++); + last if $quit; + } + + $count; + } + + sub benchmark_this { + my ($desc, $num_procs, $timeout, $code, @args) = @_; + my ($start, $total) = (time(), 0); + + MCE::Child->new($code, @args) for 1..$num_procs; + sleep $timeout; + + # joining is not immediate; ok + $_->kill('QUIT') for MCE::Child->list(); + + # joining later; ok + $total += $_->join() for MCE::Child->list(); + + printf "$desc <> duration: %0.03f secs, count: $total\n", + time() - $start; + + sleep 0.2; + } + + benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis); + benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast); + benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array); + +=item MCE::Child->list() + +Returns a list of all child objects not yet joined. + + @procs = MCE::Child->list(); + +=item MCE::Child->list_running() + +Returns a list of all child objects that are still running. + + @procs = MCE::Child->list_running(); + +=item MCE::Child->list_joinable() + +Returns a list of all child objects that have completed running. +Thus, ready to be joined without blocking. + + @procs = MCE::Child->list_joinable(); + +=item MCE::Child->max_workers([ N ]) + +Getter and setter for max_workers. Specify a number or 'auto' to acquire the +total number of cores via MCE::Util::get_ncpu. Specify a false value to set +back to no limit. + +=item MCE::Child->pending() + +Returns a count of all child objects not yet joined. + + $count = MCE::Child->pending(); + +=item $child->result() + +Returns the result obtained by C<join>, C<wait_one>, or C<wait_all>. If the +process has not yet exited, waits for the corresponding child to complete its +execution. + + use MCE::Child; + use Time::HiRes qw(sleep); + + sub task { + my ($id) = @_; + sleep $id * 0.333; + return $id; + } + + MCE::Child->create('task', $_) for ( reverse 1 .. 3 ); + + # 1 while MCE::Child->wait_one(); + + while ( my $child = MCE::Child->wait_one() ) { + my $err = $child->error() || 'no error'; + my $res = $child->result(); + my $pid = $child->pid(); + + print "[$pid] $err : $res\n"; + } + +Like C<join> described above, the context (void, scalar or list) for the +return value(s) is determined at the time C<result> is called and mostly +C<wantarray> aware. + + my $child1 = MCE::Child->create( sub { + my @res = qw(foo bar baz); + return (@res); + }); + + my @res1 = $child1->result(); # ( foo, bar, baz ) + my $res1 = $child1->result(); # baz + + my $child2 = MCE::Child->create( sub { + return 'foo'; + }); + + my @res2 = $child2->result(); # ( foo ) + my $res2 = $child2->result(); # foo + +=item MCE::Child->self() + +Class method that allows a child to obtain it's own I<MCE::Child> object. + +=item $child->pid() + +=item $child->tid() + +Returns the ID of the child. + + pid: $$ process id + tid: $$ alias for pid + +=item MCE::Child->pid() + +=item MCE::Child->tid() + +Class methods that allows a child to obtain its own ID. + + pid: $$ process id + tid: $$ alias for pid + +=item MCE::Child->wait_one() + +=item MCE::Child->wait_all() + +Meaningful for the manager process only, waits for one or all child processes +to complete execution. Afterwards, returns the corresponding child objects. +If a child doesn't exist, returns the C<undef> value or an empty list for +C<wait_one> and C<wait_all> respectively. + +The C<waitone> and C<waitall> methods are aliases respectively. + + use MCE::Child; + use Time::HiRes qw(sleep); + + sub task { + my $id = shift; + sleep $id * 0.333; + return $id; + } + + MCE::Child->create('task', $_) for ( reverse 1 .. 3 ); + + # join, traditional use case + $_->join() for MCE::Child->list(); + + # wait_one, simplistic use case + 1 while MCE::Child->wait_one(); + + # wait_one + while ( my $child = MCE::Child->wait_one() ) { + my $err = $child->error() || 'no error'; + my $res = $child->result(); + my $pid = $child->pid(); + + print "[$pid] $err : $res\n"; + } + + # wait_all + my @procs = MCE::Child->wait_all(); + + for ( @procs ) { + my $err = $_->error() || 'no error'; + my $res = $_->result(); + my $pid = $_->pid(); + + print "[$pid] $err : $res\n"; + } + +=item MCE::Child->yield( [ floating_seconds ] ) + +Give other workers a chance to run, optionally for given time. Yield behaves +similarly to MCE's interval option. It throttles workers from running too fast. +A demonstration is provided in the next section for fetching URLs in parallel. + + # total run time: 1.00 second + + MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4; + MCE::Child->wait_all(); + +=back + +=head1 PARALLEL::FORKMANAGER-like DEMONSTRATION + +MCE::Child behaves similarly to threads for the most part. It also provides +L<Parallel::ForkManager>-like capabilities. The C<Parallel::ForkManager> +example is shown first followed by a version using C<MCE::Child>. + +=over 3 + +=item Parallel::ForkManager + + use strict; + use warnings; + + use Parallel::ForkManager; + use Time::HiRes 'time'; + + my $start = time; + + my $pm = Parallel::ForkManager->new(10); + $pm->set_waitpid_blocking_sleep(0); + + $pm->run_on_finish( sub { + my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_; + print "child $pid completed: $ident => ", $resp->[0], "\n"; + }); + + DATA_LOOP: + foreach my $data ( 1..2000 ) { + # forks and returns the pid for the child + my $pid = $pm->start($data) and next DATA_LOOP; + my $ret = [ $data * 2 ]; + + $pm->finish(0, $ret); + } + + $pm->wait_all_children; + + printf STDERR "duration: %0.03f seconds\n", time - $start; + +=item MCE::Child + + use strict; + use warnings; + + use MCE::Child 1.843; + use Time::HiRes 'time'; + + my $start = time; + + MCE::Child->init( + max_workers => 10, + on_finish => sub { + my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_; + print "child $pid completed: $ident => ", $resp->[0], "\n"; + } + ); + + foreach my $data ( 1..2000 ) { + MCE::Child->create( $data, sub { + [ $data * 2 ]; + }); + } + + MCE::Child->wait_all; + + printf STDERR "duration: %0.03f seconds\n", time - $start; + +=item Time to spin 2,000 workers and obtain results (in seconds). + +Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo Boost). +Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again with Moo loaded +at the top of the script. + + MCE::Hobo uses MCE::Shared to retrieve data during reaping. + MCE::Child uses MCE::Channel, no shared-manager. + + Version Cygwin Windows Linux macOS FreeBSD + + MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s + MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s + P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s + + MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded + MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded + P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used + +=item Set posix_exit to avoid all END and destructor processing. + +This is helpful in reducing overhead when workers exit. Ditto if using a Perl +module not parallel safe. The option is ignored on C<$^O eq 'MSWin32'>. + + MCE::Child->init( posix_exit => 1, ... ); + MCE::Hobo->init( posix_exit => 1, ... ); + + Version Cygwin Windows Linux macOS FreeBSD + + MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded + MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded + +=back + +=head1 PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT + +This demonstration constructs two queues, two handles, starts the +shared-manager process if needed, and spawns four workers. +For this demonstration, am chunking 64 URLs per job. In reality, +one may run with 200 workers and chunk 300 URLs on a 24-way box. + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # perl demo.pl -- all output + # perl demo.pl >/dev/null -- mngr/child output + # perl demo.pl 2>/dev/null -- show results only + # + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + use strict; + use warnings; + + use AnyEvent; + use AnyEvent::HTTP; + use Time::HiRes qw( time ); + + use MCE::Child; + use MCE::Shared; + + # Construct two queues, input and return. + + my $que = MCE::Shared->queue(); + my $ret = MCE::Shared->queue(); + + # Construct shared handles for serializing output from many workers + # writing simultaneously. This prevents garbled output. + + mce_open my $OUT, ">>", \*STDOUT or die "open error: $!"; + mce_open my $ERR, ">>", \*STDERR or die "open error: $!"; + + # Spawn workers early for minimum memory consumption. + + MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4; + + # Obtain or generate input data for workers to process. + + my ( $count, @urls ) = ( 0 ); + + push @urls, map { "http://127.0.0.$_/" } 1..254; + push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total + + while ( @urls ) { + my @chunk = splice(@urls, 0, 64); + $que->enqueue( { ID => ++$count, INPUT => \@chunk } ); + } + + # So that workers leave the loop after consuming the queue. + + $que->end(); + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # Loop for the manager process. The manager may do other work if + # need be and periodically check $ret->pending() not shown here. + # + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + my $start = time; + + printf {$ERR} "Mngr - entering loop\n"; + + while ( $count ) { + my ( $result, $failed ) = $ret->dequeue( 2 ); + + # Remove ID from result, so not treated as a URL item. + + printf {$ERR} "Mngr - received job %s\n", delete $result->{ID}; + + # Display the URL and the size captured. + + foreach my $url ( keys %{ $result } ) { + printf {$OUT} "%s: %d\n", $url, length($result->{$url}) + if $result->{$url}; # url has content + } + + # Display URLs could not reach. + + if ( @{ $failed } ) { + foreach my $url ( @{ $failed } ) { + print {$OUT} "Failed: $url\n"; + } + } + + # Decrement the count. + + $count--; + } + + MCE::Child->wait_all(); + + printf {$ERR} "Mngr - exiting loop\n\n"; + printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start; + + exit; + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # Child processes enqueue two items ( $result and $failed ) per each + # job for the manager process. Likewise, the manager process dequeues + # two items above. Optionally, child processes may include the ID in + # the result. + # + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + sub task { + my ( $id ) = @_; + printf {$ERR} "Child $id entering loop\n"; + + while ( my $job = $que->dequeue() ) { + my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] ); + + # Walk URLs, provide a hash and array refs for data. + + printf {$ERR} "Child $id running job $job->{ID}\n"; + walk( $job, $result, $failed ); + + # Send results to the manager process. + + $ret->enqueue( $result, $failed ); + } + + printf {$ERR} "Child $id exiting loop\n"; + } + + sub walk { + my ( $job, $result, $failed ) = @_; + + # Yielding is critical when running an event loop in parallel. + # Not doing so means that the app may reach contention points + # with the firewall and likely impose unnecessary hardship at + # the OS level. The idea here is not to have multiple workers + # initiate HTTP requests to a batch of URLs at the same time. + # Yielding behaves similarly like scatter to have the child + # process run solo for a fraction of time. + + MCE::Child->yield( 0.03 ); + + my $cv = AnyEvent->condvar(); + + # Populate the hash ref for the URLs it could reach. + # Do not mix AnyEvent timeout with child timeout. + # Therefore, choose event timeout when available. + + foreach my $url ( @{ $job->{INPUT} } ) { + $cv->begin(); + http_get $url, timeout => 2, sub { + my ( $data, $headers ) = @_; + $result->{$url} = $data; + $cv->end(); + }; + } + + $cv->recv(); + + # Populate the array ref for URLs it could not reach. + + foreach my $url ( @{ $job->{INPUT} } ) { + push @{ $failed }, $url unless (exists $result->{ $url }); + } + + return; + } + + __END__ + + $ perl demo.pl + + Child 1 entering loop + Child 2 entering loop + Child 3 entering loop + Mngr - entering loop + Child 2 running job 2 + Child 3 running job 3 + Child 1 running job 1 + Child 4 entering loop + Child 4 running job 4 + Child 2 running job 5 + Mngr - received job 2 + Child 3 running job 6 + Mngr - received job 3 + Child 1 running job 7 + Mngr - received job 1 + Child 4 running job 8 + Mngr - received job 4 + http://192.168.0.1/: 3729 + Child 2 exiting loop + Mngr - received job 5 + Child 3 exiting loop + Mngr - received job 6 + Child 1 exiting loop + Mngr - received job 7 + Child 4 exiting loop + Mngr - received job 8 + Mngr - exiting loop + + Duration: 4.131 seconds + +=head1 CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE + +Making an executable is possible with the L<PAR::Packer> module. +On the Windows platform, threads, threads::shared, and exiting via +threads are necessary for the binary to exit successfully. + + # https://metacpan.org/pod/PAR::Packer + # https://metacpan.org/pod/pp + # + # pp -o demo.exe demo.pl + # ./demo.exe + + use strict; + use warnings; + + use if $^O eq "MSWin32", "threads"; + use if $^O eq "MSWin32", "threads::shared"; + + # Include minimum dependencies for MCE::Child. + # Add other modules required by your application here. + + use Storable (); + use Time::HiRes (); + + # use IO::FDPass (); # optional: for condvar, handle, queue + # use Sereal (); # optional: for faster serialization + + use MCE::Child; + use MCE::Shared; + + # For PAR to work on the Windows platform, one must include manually + # any shared modules used by the application. + + # use MCE::Shared::Array; # if using MCE::Shared->array + # use MCE::Shared::Cache; # if using MCE::Shared->cache + # use MCE::Shared::Condvar; # if using MCE::Shared->condvar + # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open + # use MCE::Shared::Hash; # if using MCE::Shared->hash + # use MCE::Shared::Minidb; # if using MCE::Shared->minidb + # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash + # use MCE::Shared::Queue; # if using MCE::Shared->queue + # use MCE::Shared::Scalar; # if using MCE::Shared->scalar + + # Et cetera. Only load modules needed for your application. + + use MCE::Shared::Sequence; # if using MCE::Shared->sequence + + my $seq = MCE::Shared->sequence( 1, 9 ); + + sub task { + my ( $id ) = @_; + while ( defined ( my $num = $seq->next() ) ) { + print "$id: $num\n"; + sleep 1; + } + } + + sub main { + MCE::Child->new( \&task, $_ ) for 1 .. 3; + MCE::Child->wait_all(); + } + + # Main must run inside a thread on the Windows platform or workers + # will fail duing exiting, causing the exe to crash. The reason is + # that PAR or a dependency isn't multi-process safe. + + ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main(); + + threads->exit(0) if $INC{"threads.pm"}; + +=head1 CREDITS + +The inspiration for C<MCE::Child> comes from wanting C<threads>-like behavior +for processes compatible with Perl 5.8. Both can run side-by-side including +safe-use by MCE workers. Likewise, the documentation resembles C<threads>. + +The inspiration for C<wait_all> and C<wait_one> comes from the +C<Parallel::WorkUnit> module. + +=head1 SEE ALSO + +=over 3 + +=item * L<forks> + +=item * L<forks::BerkeleyDB> + +=item * L<MCE::Hobo> + +=item * L<Parallel::ForkManager> + +=item * L<Parallel::Loops> + +=item * L<Parallel::Prefork> + +=item * L<Parallel::WorkUnit> + +=item * L<Proc::Fork> + +=item * L<Thread::Tie> + +=item * L<threads> + +=back + +=head1 INDEX + +L<MCE|MCE>, L<MCE::Channel>, L<MCE::Shared> + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Core.pod b/lib/MCE/Core.pod index f85ddc4..c7c8f64 100644 --- a/lib/MCE/Core.pod +++ b/lib/MCE/Core.pod @@ -5,7 +5,7 @@ MCE::Core - Documentation describing the core MCE API =head1 VERSION -This document describes MCE::Core version 1.838 +This document describes MCE::Core version 1.843 =head1 SYNOPSIS @@ -219,7 +219,7 @@ Below, a new instance is configured with all available options. progress => sub { ... }, ## Default undef # A code block for receiving info on progress made. - # See section labeled "PROGRESS DEMONSTRATIONS" + # See section labeled "MCE PROGRESS DEMONSTRATIONS" # at the end of this document. user_args => { env => 'test' }, ## Default undef @@ -2137,7 +2137,7 @@ uses 2 workers to minimize the output size. Input is from the sequence option. The interval.pl example above is included with MCE. -=head1 PROGRESS DEMONSTRATIONS +=head1 MCE PROGRESS DEMONSTRATIONS The C<progress> option takes a code block for receiving info on the progress made while processing input data; e.g. C<input_data> or C<sequence>. To make @@ -2412,6 +2412,14 @@ can do is report the size completed thus far. printf "%0.1f kibibytes\n", $completed_size / 1024; } +=head1 SEE ALSO + +=over 3 + +=item * L<MCE::Examples> + +=back + =head1 INDEX L<MCE|MCE> diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm index 9c62cc9..71c2aa2 100644 --- a/lib/MCE/Core/Input/Generator.pm +++ b/lib/MCE/Core/Input/Generator.pm @@ -15,7 +15,7 @@ package MCE::Core::Input::Generator; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm index 15aa365..f570ab1 100644 --- a/lib/MCE/Core/Input/Handle.pm +++ b/lib/MCE/Core/Input/Handle.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Handle; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## Items below are folded into MCE. @@ -38,7 +38,7 @@ sub _systell { # To minimize memory consumption, SEEK_CUR equals 1 on most platforms. # e.g. use Fcntl qw(SEEK_CUR); - MCE::Util::_sysseek($_[0], 0, 1); + sysseek($_[0], 0, 1); } sub _worker_read_handle { @@ -67,15 +67,15 @@ sub _worker_read_handle { $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; # inlined for performance - if ($self->{_data_channels} > 6) { - $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; + if ($self->{_data_channels} > 5) { + $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 5 + 1 )}; } $_dat_ex = sub { MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 unless $_DAT_LOCK->{ $_pid }; }; $_dat_un = sub { - MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } @@ -117,7 +117,7 @@ sub _worker_read_handle { ($_chunk_id, $_offset_pos) = unpack($_que_template, $_next); if ($_offset_pos >= $_data_size) { - MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos)); + syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos)); $_dat_un->() if $_lock_chn; close $_IN_FILE; undef $_IN_FILE; return; @@ -172,7 +172,7 @@ sub _worker_read_handle { } } - MCE::Util::_syswrite( + syswrite( $_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE) ); $_dat_un->() if $_lock_chn; @@ -181,7 +181,7 @@ sub _worker_read_handle { local $/ = $_RS if ($/ ne $_RS); if ($_parallel_io && $_RS eq $LF) { - MCE::Util::_syswrite( + syswrite( $_QUE_W_SOCK, pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size) ); @@ -195,8 +195,8 @@ sub _worker_read_handle { } if ($_proc_type == READ_FILE) { - MCE::Util::_sysseek($_IN_FILE, tell( $_IN_FILE ), 0); - MCE::Util::_sysread($_IN_FILE, $_, $_tmp_cs, $_p); + sysseek($_IN_FILE, tell( $_IN_FILE ), 0); + sysread($_IN_FILE, $_, $_tmp_cs, $_p); seek $_IN_FILE, _systell($_IN_FILE), 0; } else { @@ -207,8 +207,8 @@ sub _worker_read_handle { } else { if ($_proc_type == READ_FILE) { - MCE::Util::_sysseek($_IN_FILE, $_offset_pos, 0); - MCE::Util::_sysread($_IN_FILE, $_, $_chunk_size, $_p); + sysseek($_IN_FILE, $_offset_pos, 0); + sysread($_IN_FILE, $_, $_chunk_size, $_p); seek $_IN_FILE, _systell($_IN_FILE), 0; } else { @@ -218,7 +218,7 @@ sub _worker_read_handle { $_ .= <$_IN_FILE>; - MCE::Util::_syswrite( + syswrite( $_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE) ); $_dat_un->() if $_lock_chn; diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm index 463fc60..dd2ff4f 100644 --- a/lib/MCE/Core/Input/Iterator.pm +++ b/lib/MCE/Core/Input/Iterator.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## Items below are folded into MCE. @@ -59,7 +59,7 @@ sub _worker_user_iterator { unless $_DAT_LOCK->{ $_pid }; }; $_dat_un = sub { - MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm index 0867a79..e56a6c7 100644 --- a/lib/MCE/Core/Input/Request.pm +++ b/lib/MCE/Core/Input/Request.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Request; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## Items below are folded into MCE. @@ -62,7 +62,7 @@ sub _worker_request_chunk { unless $_DAT_LOCK->{ $_pid }; }; $_dat_un = sub { - MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm index d7c0053..d4d8862 100644 --- a/lib/MCE/Core/Input/Sequence.pm +++ b/lib/MCE/Core/Input/Sequence.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## Items below are folded into MCE. @@ -56,15 +56,15 @@ sub _worker_sequence_queue { $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; # inlined for performance - if ($self->{_data_channels} > 6) { - $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; + if ($self->{_data_channels} > 5) { + $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 5 + 1 )}; } $_dat_ex = sub { MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 unless $_DAT_LOCK->{ $_pid }; }; $_dat_un = sub { - MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } @@ -102,12 +102,12 @@ sub _worker_sequence_queue { ($_chunk_id, $_offset) = unpack($_que_template, $_next); if ($_offset >= $_abort) { - MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset)); + syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset)); $_dat_un->() if $_lock_chn; return; } - MCE::Util::_syswrite( + syswrite( $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1) ); diff --git a/lib/MCE/Core/Manager.pm b/lib/MCE/Core/Manager.pm index 1ed32dc..6910360 100644 --- a/lib/MCE/Core/Manager.pm +++ b/lib/MCE/Core/Manager.pm @@ -14,7 +14,7 @@ package MCE::Core::Manager; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -88,7 +88,7 @@ sub _output_loop { $_has_user_tasks, $_sess_dir, $_task_id, $_user_error, $_user_output, $_input_size, $_offset_pos, $_single_dim, @_gather, $_cs_one_flag, $_exit_id, $_exit_pid, $_exit_status, $_exit_wid, $_len, $_sync_cnt, - $_BSB_W_SOCK, $_BSE_W_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR, + $_BSB_W_SOCK, $_BSB_R_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR, $_I_FLG, $_O_FLG, $_I_SEP, $_O_SEP, $_RS, $_RS_FLG, $_MCE_STDOUT, @_delay_wid, $_size_completed, $_win32_ipc ); @@ -148,7 +148,7 @@ sub _output_loop { if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) { if ($_sync_cnt == $_total_running) { for my $_i (1 .. $_total_running) { - MCE::Util::_syswrite($_BSB_W_SOCK, $LF); + syswrite($_BSB_W_SOCK, $LF); } undef $_syn_flag; } @@ -180,7 +180,7 @@ sub _output_loop { if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) { if ($_sync_cnt == $_total_running) { for my $_i (1 .. $_total_running) { - MCE::Util::_syswrite($_BSB_W_SOCK, $LF); + syswrite($_BSB_W_SOCK, $LF); } undef $_syn_flag; } @@ -599,7 +599,7 @@ sub _output_loop { if (++$_sync_cnt == $_total_running) { for my $_i (1 .. $_total_running) { - MCE::Util::_syswrite($_BSB_W_SOCK, $LF); + syswrite($_BSB_W_SOCK, $LF); } undef $_syn_flag; } @@ -614,7 +614,7 @@ sub _output_loop { : $self->{_total_running}; for my $_i (1 .. $_total_running) { - MCE::Util::_syswrite($_BSE_W_SOCK, $LF); + syswrite($_BSB_R_SOCK, $LF); } } @@ -622,7 +622,7 @@ sub _output_loop { }, OUTPUT_S_IPC.$LF => sub { # Change to win32 IPC - MCE::Util::_syswrite($_DAT_R_SOCK, $LF); + syswrite($_DAT_R_SOCK, $LF); $_win32_ipc = 1, goto _LOOP unless $_win32_ipc; @@ -801,12 +801,17 @@ sub _output_loop { ## Output event loop. - my $_func; my $_channels = $self->{_dat_r_sock}; + my $_channels = $self->{_dat_r_sock}; + my $_func; - $_win32_ipc = ( $ENV{'PERL_MCE_IPC'} eq 'win32' || $INC{'MCE/Hobo.pm'} ); + $_win32_ipc = ( + $ENV{'PERL_MCE_IPC'} eq 'win32' || + $INC{'MCE/Child.pm'} || + $INC{'MCE/Hobo.pm'} + ); $_BSB_W_SOCK = $self->{_bsb_w_sock}; - $_BSE_W_SOCK = $self->{_bse_w_sock}; + $_BSB_R_SOCK = $self->{_bsb_r_sock}; $_DAT_R_SOCK = $self->{_dat_r_sock}->[0]; $_RS = $self->{RS} || $/; @@ -869,7 +874,7 @@ sub _output_loop { MCE::Util::_nonblocking($_DAT_R_SOCK, 1) if $_win32_ipc; while ($self->{_total_running}) { - MCE::Util::_sysread($_DAT_R_SOCK, $_func, 8); + MCE::Util::_sysread2($_DAT_R_SOCK, $_func, 8); last() unless length($_func) == 8; $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ]; diff --git a/lib/MCE/Core/Validation.pm b/lib/MCE/Core/Validation.pm index 627b61b..b806be7 100644 --- a/lib/MCE/Core/Validation.pm +++ b/lib/MCE/Core/Validation.pm @@ -14,7 +14,7 @@ package MCE::Core::Validation; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## Items below are folded into MCE. @@ -256,7 +256,7 @@ sub _parse_chunk_size { # Iterators may optionally use chunk_size to determine how much # to return per iteration. The default is 1 for MCE Models, same # as for the Core API. The user_func receives an array_ref - # regardless if 1 or higher. + # regardless if 1 or greater. # # sub make_iter { # ... @@ -351,7 +351,7 @@ sub _parse_max_workers { $_ncpu_ul = 8 if ($_ncpu_ul > 8); if ($1 && $2) { - local $@; $_max_workers = eval "int($_ncpu_ul $1 $2 + 0.5)"; + local $@; $_max_workers = eval "int($_ncpu_ul $1 $2 + 0.5)"; ## no critic $_max_workers = 1 if (!$_max_workers || $_max_workers < 1); $_max_workers = $_ncpu if ($_max_workers > $_ncpu); } diff --git a/lib/MCE/Core/Worker.pm b/lib/MCE/Core/Worker.pm index f4a0a6d..b952937 100644 --- a/lib/MCE/Core/Worker.pm +++ b/lib/MCE/Core/Worker.pm @@ -14,7 +14,7 @@ package MCE::Core::Worker; use strict; use warnings; -our $VERSION = '1.838'; +our $VERSION = '1.843'; my $_has_threads = $INC{'threads.pm'} ? 1 : 0; my $_tid = $_has_threads ? threads->tid() : 0; @@ -288,7 +288,7 @@ use bytes; }; $_dat_un = sub { my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; - MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } @@ -488,8 +488,9 @@ sub _worker_do { delete $self->{_wuf}; - ## Check nested Hobo workers not yet joined. - MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'}; + ## Check for nested workers not yet joined. + MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'}; + MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'}; ## Notify the main process a worker has completed. local $\ = undef if (defined $\); @@ -585,7 +586,7 @@ sub _worker_loop { _worker_do($self, {}), next if ($_response eq "_data\n"); ## Wait here until MCE completes job submission to all workers. - MCE::Util::_sysread($self->{_bse_r_sock}, my($_b), 1); + MCE::Util::_sysread($self->{_bsb_w_sock}, my($_b), 1); ## Normal request. if (defined $_job_delay && $_job_delay > 0.0) { diff --git a/lib/MCE/Examples.pod b/lib/MCE/Examples.pod index 07d94ed..f6a5b8c 100644 --- a/lib/MCE/Examples.pod +++ b/lib/MCE/Examples.pod @@ -5,7 +5,7 @@ MCE::Examples - Various examples and demonstrations =head1 VERSION -This document describes MCE::Examples version 1.838 +This document describes MCE::Examples version 1.843 =head1 INCLUDED WITH THE DISTRIBUTION diff --git a/lib/MCE/Flow.pm b/lib/MCE/Flow.pm index 5ff8d36..d43f11f 100644 --- a/lib/MCE/Flow.pm +++ b/lib/MCE/Flow.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -480,7 +480,7 @@ MCE::Flow - Parallel flow model for building creative applications =head1 VERSION -This document describes MCE::Flow version 1.838 +This document describes MCE::Flow version 1.843 =head1 DESCRIPTION @@ -715,7 +715,11 @@ inside the first block. Hence, the block is called once per each item. ## Array or array_ref mce_flow sub { do_work($_) }, 1..10000; - mce_flow sub { do_work($_) }, [ 1..10000 ]; + mce_flow sub { do_work($_) }, \@list; + + ## Important; pass an array_ref for deeply input data + mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_flow sub { do_work($_) }, \@deeply_list; ## File_path, glob_ref, or scalar_ref mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; @@ -745,10 +749,15 @@ This means having to loop through the chunk from inside the first block. ## Looping inside the block is the same for mce_flow_f and ## mce_flow_s. + ## Array or array_ref mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000; + mce_flow sub { do_work($_) for (@{ $_ }) }, \@list; - ## Same as above, resembles code using the Core API. + ## Important; pass an array_ref for deeply input data + mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list; + ## Resembles code using the core MCE API mce_flow sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -791,6 +800,8 @@ Specify C<Sereal => 0> to use Storable instead. =item MCE::Flow::init { options } +=back + The init function accepts a hash of MCE options. Unlike with MCE::Stream, both gather and bounds_only options may be specified when calling init (not shown below). @@ -833,8 +844,6 @@ both gather and bounds_only options may be specified when calling init 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 10000 -=back - Like with MCE::Flow::init above, MCE options may be specified using an anonymous hash for the first argument. Notice how task_name, max_workers, and use_threads can take an anonymous array for setting uniquely per @@ -891,6 +900,8 @@ equals 1 in order to demonstrate all the possibilities for providing input data. =item mce_flow sub { code }, list +=back + Input data may be defined using a list, an array ref, or a hash ref. Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Flow takes a @@ -899,10 +910,15 @@ needed after the block. # $_ contains the item when chunk_size => 1 - mce_flow sub { $_ }, 1..1000; - mce_flow sub { $_ }, \@list; + mce_flow sub { do_work($_) }, 1..1000; + mce_flow sub { do_work($_) }, \@list; + + # Important; pass an array_ref for deeply input data - # chunking, any chunk_size => 1 or higher + mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_flow sub { do_work($_) }, \@deeply_list; + + # Chunking; any chunk_size => 1 or greater my %res = mce_flow sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -914,7 +930,7 @@ needed after the block. }, \@list; - # input hash, current API available since 1.828 + # Input hash; current API available since 1.828 my %res = mce_flow sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -926,7 +942,7 @@ needed after the block. }, \%hash; - # unlike MCE::Loop, MCE::Flow doesn't need input to run + # Unlike MCE::Loop, MCE::Flow doesn't need input to run mce_flow { max_workers => 4 }, sub { MCE->say( MCE->wid ); @@ -947,7 +963,7 @@ needed after the block. MCE->say( "consumer: ", MCE->wid ); }; - # here, options are specified via init + # Here, options are specified via init MCE::Flow::init { max_workers => [ 1, 3 ], @@ -956,10 +972,14 @@ needed after the block. mce_flow \&producer, \&consumers; +=over 3 + =item MCE::Flow->run_file ( sub { code }, file ) =item mce_flow_f sub { code }, file +=back + The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. @@ -969,7 +989,7 @@ position among themselves with zero interaction by the manager process. mce_flow_f sub { $_ }, $file_handle; mce_flow_f sub { $_ }, \$scalar; - # chunking, any chunk_size => 1 or higher + # chunking, any chunk_size => 1 or greater my %res = mce_flow_f sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -981,10 +1001,14 @@ position among themselves with zero interaction by the manager process. }, "/path/to/file"; +=over 3 + =item MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) =item mce_flow_s sub { code }, $beg, $end [, $step, $fmt ] +=back + Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). @@ -1001,7 +1025,7 @@ optional. The format is passed to sprintf (% may be omitted below). step => $step, format => $fmt }; - # chunking, any chunk_size => 1 or higher + # chunking, any chunk_size => 1 or greater my %res = mce_flow_s sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -1063,10 +1087,14 @@ Time was measured using 1 worker to emphasize the difference. 7500001 .. 8750000 8750001 .. 10000000 +=over 3 + =item MCE::Flow->run ( { input_data => iterator }, sub { code } ) =item mce_flow { input_data => iterator }, sub { code } +=back + An iterator reference may be specified for input_data. The only other way is to specify input_data via MCE::Flow::init. This prevents MCE::Flow from configuring the iterator reference as another user task which will not work. @@ -1079,8 +1107,6 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. mce_flow sub { $_ }; -=back - =head1 GATHERING DATA Unlike MCE::Map where gather and output order are done for you automatically, @@ -1270,6 +1296,8 @@ running. =item MCE::Flow::finish +=back + Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. @@ -1284,8 +1312,6 @@ longer needed. MCE::Flow::finish; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Grep.pm b/lib/MCE/Grep.pm index acefc81..2bbc8ac 100644 --- a/lib/MCE/Grep.pm +++ b/lib/MCE/Grep.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -435,7 +435,7 @@ MCE::Grep - Parallel grep model similar to the native grep function =head1 VERSION -This document describes MCE::Grep version 1.838 +This document describes MCE::Grep version 1.843 =head1 SYNOPSIS @@ -444,18 +444,22 @@ This document describes MCE::Grep version 1.838 ## Array or array_ref my @a = mce_grep { $_ % 5 == 0 } 1..10000; - my @b = mce_grep { $_ % 5 == 0 } [ 1..10000 ]; + my @b = mce_grep { $_ % 5 == 0 } \@list; + + ## Important; pass an array_ref for deeply input data + my @c = mce_grep { $_->[1] % 2 == 0 } [ [ 0, 1 ], [ 0, 2 ], ... ]; + my @d = mce_grep { $_->[1] % 2 == 0 } \@deeply_list; ## File_path, glob_ref, or scalar_ref - my @c = mce_grep_f { /pattern/ } "/path/to/file"; - my @d = mce_grep_f { /pattern/ } $file_handle; - my @e = mce_grep_f { /pattern/ } \$scalar; + my @e = mce_grep_f { /pattern/ } "/path/to/file"; + my @f = mce_grep_f { /pattern/ } $file_handle; + my @g = mce_grep_f { /pattern/ } \$scalar; ## Sequence of numbers (begin, end [, step, format]) - my @f = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5; - my @g = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ]; + my @h = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5; + my @i = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ]; - my @h = mce_grep_s { %_ * 3 == 0 } { + my @j = mce_grep_s { %_ * 3 == 0 } { begin => 1, end => 10000, step => 5, format => undef }; @@ -604,6 +608,8 @@ Specify C<Sereal => 0> to use Storable instead. =item MCE::Grep::init { options } +=back + The init function accepts a hash of MCE options. The gather option, if specified, is ignored due to being used internally by the module. @@ -638,8 +644,6 @@ specified, is ignored due to being used internally by the module. 5 10 15 20 25 30 35 40 45 50 55 60 65 70 75 80 85 90 95 100 -=back - =head1 API DOCUMENTATION =over 3 @@ -648,18 +652,30 @@ specified, is ignored due to being used internally by the module. =item mce_grep { code } list +=back + Input data may be defined using a list or an array reference. Unlike MCE::Loop, Flow, and Step, specifying a hash reference as input data isn't allowed. + ## Array or array_ref my @a = mce_grep { /[2357]/ } 1..1000; my @b = mce_grep { /[2357]/ } \@list; - my @z = mce_grep { /[2357]/ } \%hash; # not supported + ## Important; pass an array_ref for deeply input data + my @c = mce_grep { $_->[1] =~ /[2357]/ } [ [ 0, 1 ], [ 0, 2 ], ... ]; + my @d = mce_grep { $_->[1] =~ /[2357]/ } \@deeply_list; + + ## Not supported + my @z = mce_grep { ... } \%hash; + +=over 3 =item MCE::Grep->run_file ( sub { code }, file ) =item mce_grep_f { code } file +=back + The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. @@ -667,10 +683,14 @@ position among themselves with zero interaction by the manager process. my @d = mce_grep_f { /pattern/ } $file_handle; my @e = mce_grep_f { /pattern/ } \$scalar; +=over 3 + =item MCE::Grep->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) =item mce_grep_s { code } $beg, $end [, $step, $fmt ] +=back + Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). @@ -685,17 +705,19 @@ optional. The format is passed to sprintf (% may be omitted below). step => $step, format => $fmt }; +=over 3 + =item MCE::Grep->run ( sub { code }, iterator ) =item mce_grep { code } iterator +=back + An iterator reference may be specified for input_data. Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. my @a = mce_grep { $_ % 3 == 0 } make_iterator(10, 30, 2); -=back - =head1 MANUAL SHUTDOWN =over 3 @@ -704,6 +726,8 @@ under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. =item MCE::Grep::finish +=back + Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. @@ -718,8 +742,6 @@ longer needed. MCE::Grep::finish; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Loop.pm b/lib/MCE/Loop.pm index 22256bf..5b8e7b3 100644 --- a/lib/MCE/Loop.pm +++ b/lib/MCE/Loop.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -350,7 +350,7 @@ MCE::Loop - MCE model for building parallel loops =head1 VERSION -This document describes MCE::Loop version 1.838 +This document describes MCE::Loop version 1.843 =head1 DESCRIPTION @@ -443,7 +443,11 @@ inside the block. Hence, the block is called once per each item. ## Array or array_ref mce_loop { do_work($_) } 1..10000; - mce_loop { do_work($_) } [ 1..10000 ]; + mce_loop { do_work($_) } \@list; + + ## Important; pass an array_ref for deeply input data + mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_loop { do_work($_) } \@deeply_list; ## File_path, glob_ref, or scalar_ref mce_loop_f { chomp; do_work($_) } "/path/to/file"; @@ -473,10 +477,15 @@ This means having to loop through the chunk from inside the block. ## Looping inside the block is the same for mce_loop_f and ## mce_loop_s. + ## Array or array_ref mce_loop { do_work($_) for (@{ $_ }) } 1..10000; + mce_loop { do_work($_) for (@{ $_ }) } \@list; - ## Same as above, resembles code using the Core API. + ## Important; pass an array_ref for deeply input data + mce_loop { do_work($_) for (@{ $_ }) } [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_loop { do_work($_) for (@{ $_ }) } \@deeply_list; + ## Resembles code using the core MCE API mce_loop { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -519,6 +528,8 @@ Specify C<Sereal => 0> to use Storable instead. =item MCE::Loop::init { options } +=back + The init function accepts a hash of MCE options. use MCE::Loop; @@ -559,8 +570,6 @@ The init function accepts a hash of MCE options. 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 10000 -=back - =head1 API DOCUMENTATION The following assumes chunk_size equals 1 in order to demonstrate all the @@ -572,14 +581,21 @@ possibilities for providing input data. =item mce_loop { code } list +=back + Input data may be defined using a list, an array ref, or a hash ref. # $_ contains the item when chunk_size => 1 - mce_loop { $_ } 1..1000; - mce_loop { $_ } \@list; + mce_loop { do_work($_) } 1..1000; + mce_loop { do_work($_) } \@list; + + # Important; pass an array_ref for deeply input data - # chunking, any chunk_size => 1 or higher + mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_loop { do_work($_) } \@deeply_list; + + # Chunking; any chunk_size => 1 or greater my %res = mce_loop { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -591,7 +607,7 @@ Input data may be defined using a list, an array ref, or a hash ref. } \@list; - # input hash, current API available since 1.828 + # Input hash; current API available since 1.828 my %res = mce_loop { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -603,10 +619,14 @@ Input data may be defined using a list, an array ref, or a hash ref. } \%hash; +=over 3 + =item MCE::Loop->run_file ( sub { code }, file ) =item mce_loop_f { code } file +=back + The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. @@ -616,7 +636,7 @@ position among themselves with zero interaction by the manager process. mce_loop_f { $_ } $file_handle; mce_loop_f { $_ } \$scalar; - # chunking, any chunk_size => 1 or higher + # chunking, any chunk_size => 1 or greater my %res = mce_loop_f { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -628,10 +648,14 @@ position among themselves with zero interaction by the manager process. } "/path/to/file"; +=over 3 + =item MCE::Loop->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) =item mce_loop_s { code } $beg, $end [, $step, $fmt ] +=back + Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). @@ -648,7 +672,7 @@ optional. The format is passed to sprintf (% may be omitted below). step => $step, format => $fmt }; - # chunking, any chunk_size => 1 or higher + # chunking, any chunk_size => 1 or greater my %res = mce_loop_s { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -710,17 +734,19 @@ Time was measured using 1 worker to emphasize the difference. 7500001 .. 8750000 8750001 .. 10000000 +=over 3 + =item MCE::Loop->run ( sub { code }, iterator ) =item mce_loop { code } iterator +=back + An iterator reference may be specified for input_data. Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. mce_loop { $_ } make_iterator(10, 30, 2); -=back - =head1 GATHERING DATA Unlike MCE::Map where gather and output order are done for you automatically, @@ -898,6 +924,8 @@ The following does the same thing using the Core API. =item MCE::Loop::finish +=back + Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. @@ -912,8 +940,6 @@ longer needed. MCE::Loop::finish; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Map.pm b/lib/MCE/Map.pm index 8acab1b..ad90580 100644 --- a/lib/MCE/Map.pm +++ b/lib/MCE/Map.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -435,7 +435,7 @@ MCE::Map - Parallel map model similar to the native map function =head1 VERSION -This document describes MCE::Map version 1.838 +This document describes MCE::Map version 1.843 =head1 SYNOPSIS @@ -444,18 +444,22 @@ This document describes MCE::Map version 1.838 ## Array or array_ref my @a = mce_map { $_ * $_ } 1..10000; - my @b = mce_map { $_ * $_ } [ 1..10000 ]; + my @b = mce_map { $_ * $_ } \@list; + + ## Important; pass an array_ref for deeply input data + my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ]; + my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list; ## File_path, glob_ref, or scalar_ref - my @c = mce_map_f { chomp; $_ } "/path/to/file"; - my @d = mce_map_f { chomp; $_ } $file_handle; - my @e = mce_map_f { chomp; $_ } \$scalar; + my @e = mce_map_f { chomp; $_ } "/path/to/file"; + my @f = mce_map_f { chomp; $_ } $file_handle; + my @g = mce_map_f { chomp; $_ } \$scalar; ## Sequence of numbers (begin, end [, step, format]) - my @f = mce_map_s { $_ * $_ } 1, 10000, 5; - my @g = mce_map_s { $_ * $_ } [ 1, 10000, 5 ]; + my @h = mce_map_s { $_ * $_ } 1, 10000, 5; + my @i = mce_map_s { $_ * $_ } [ 1, 10000, 5 ]; - my @h = mce_map_s { $_ * $_ } { + my @j = mce_map_s { $_ * $_ } { begin => 1, end => 10000, step => 5, format => undef }; @@ -539,6 +543,8 @@ Specify C<Sereal => 0> to use Storable instead. =item MCE::Map::init { options } +=back + The init function accepts a hash of MCE options. The gather option, if specified, is ignored due to being used internally by the module. @@ -580,8 +586,6 @@ specified, is ignored due to being used internally by the module. 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 10000 -=back - =head1 API DOCUMENTATION =over 3 @@ -590,18 +594,30 @@ specified, is ignored due to being used internally by the module. =item mce_map { code } list +=back + Input data may be defined using a list or an array reference. Unlike MCE::Loop, Flow, and Step, specifying a hash reference as input data isn't allowed. + ## Array or array_ref my @a = mce_map { $_ * 2 } 1..1000; my @b = mce_map { $_ * 2 } \@list; - my @z = mce_map { $_ * 2 } \%hash; # not supported + ## Important; pass an array_ref for deeply input data + my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ]; + my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list; + + ## Not supported + my @z = mce_map { ... } \%hash; + +=over 3 =item MCE::Map->run_file ( sub { code }, file ) =item mce_map_f { code } file +=back + The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. @@ -609,10 +625,14 @@ position among themselves with zero interaction by the manager process. my @d = mce_map_f { chomp; $_ . "\r\n" } $file_handle; my @e = mce_map_f { chomp; $_ . "\r\n" } \$scalar; +=over 3 + =item MCE::Map->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) =item mce_map_s { code } $beg, $end [, $step, $fmt ] +=back + Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). @@ -627,17 +647,19 @@ optional. The format is passed to sprintf (% may be omitted below). step => $step, format => $fmt }; +=over 3 + =item MCE::Map->run ( sub { code }, iterator ) =item mce_map { code } iterator +=back + An iterator reference may be specified for input_data. Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. my @a = mce_map { $_ * 2 } make_iterator(10, 30, 2); -=back - =head1 MANUAL SHUTDOWN =over 3 @@ -646,6 +668,8 @@ under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. =item MCE::Map::finish +=back + Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. @@ -660,8 +684,6 @@ longer needed. MCE::Map::finish; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Mutex.pm b/lib/MCE/Mutex.pm index 8ec75cf..19f5db7 100644 --- a/lib/MCE/Mutex.pm +++ b/lib/MCE/Mutex.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -20,18 +20,18 @@ use Carp (); sub new { my ($class, %argv) = @_; + my $impl = defined($argv{'impl'}) + ? $argv{'impl'} : defined($argv{'path'}) ? 'Flock' : 'Channel'; - my $pkg = (defined $argv{'impl'}) - ? $argv{'impl'} : (defined $argv{'path'}) ? 'Flock' : 'Channel'; + $impl = ucfirst( lc $impl ); - $pkg = ucfirst( lc $pkg ); + eval "require MCE::Mutex::$impl; 1" || + Carp::croak("Could not load Mutex implementation '$impl': $@"); - if (eval "require MCE::Mutex::$pkg; 1") { - no strict 'refs'; $pkg = 'MCE::Mutex::'.$pkg; - return $pkg->new(%argv); - } + my $pkg = 'MCE::Mutex::'.$impl; + no strict 'refs'; - Carp::croak("Could not load Mutex implementation $pkg: $@"); + return $pkg->new( %argv ); } ## base class methods @@ -68,7 +68,7 @@ MCE::Mutex - Locking for Many-Core Engine =head1 VERSION -This document describes MCE::Mutex version 1.838 +This document describes MCE::Mutex version 1.843 =head1 SYNOPSIS diff --git a/lib/MCE/Mutex/Channel.pm b/lib/MCE/Mutex/Channel.pm index f74d798..f92080a 100644 --- a/lib/MCE/Mutex/Channel.pm +++ b/lib/MCE/Mutex/Channel.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized once ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; use base 'MCE::Mutex'; use Scalar::Util qw(refaddr weaken); @@ -29,27 +29,38 @@ sub CLONE { sub DESTROY { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); - MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 - if $obj->{ $pid }; + syswrite($obj->{_w_sock}, '0'), $obj->{$pid } = 0 if $obj->{$pid }; + syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'}; - if ($obj->{'_init_pid'} eq $pid) { + if ( $obj->{_init_pid} eq $pid ) { my $addr = refaddr $obj; - ($^O eq 'MSWin32') + ($^O eq 'MSWin32' && $obj->{impl} eq 'Channel') ? MCE::Util::_destroy_pipes($obj, qw(_w_sock _r_sock)) : MCE::Util::_destroy_socks($obj, qw(_w_sock _r_sock)); - @MUTEX = map { refaddr($_) == $addr ? () : $_ } @MUTEX; + if ( ! $has_threads ) { + @MUTEX = map { refaddr($_) == $addr ? () : $_ } @MUTEX; + } } return; } sub _destroy { - # Called by MCE::_exit && MCE::Hobo::_exit. Must iterate a copy. + # Called by { MCE, MCE::Child, and MCE::Hobo }::_exit. + # This must iterate a copy. + if ( @MUTEX ) { local $_; &DESTROY($_) for @{[ @MUTEX ]}; } } +sub _save_for_global_destruction { + if ( ! $has_threads ) { + push @MUTEX, $_[0]; + weaken $MUTEX[-1]; + } +} + ############################################################################### ## ---------------------------------------------------------------------------- ## Public methods. @@ -64,13 +75,15 @@ sub new { ? MCE::Util::_pipe_pair(\%obj, qw(_r_sock _w_sock)) : MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock)); - MCE::Util::_syswrite($obj{_w_sock}, '0'); + syswrite $obj{_w_sock}, '0'; + + bless \%obj, $class; - if (caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/) { - push(@MUTEX, \%obj); weaken($MUTEX[-1]); + if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) { + MCE::Mutex::Channel::_save_for_global_destruction(\%obj); } - return bless(\%obj, $class); + return \%obj; } sub lock { @@ -88,7 +101,7 @@ sub lock { sub unlock { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); - MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 + syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 if $obj->{ $pid }; return; @@ -108,7 +121,7 @@ sub synchronize { ? @ret = wantarray ? $code->(@_) : scalar $code->(@_) : $code->(@_); - MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0; + syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0; return wantarray ? @ret : $ret[-1]; } @@ -131,7 +144,7 @@ MCE::Mutex::Channel - Mutex locking via a pipe or socket =head1 VERSION -This document describes MCE::Mutex::Channel version 1.838 +This document describes MCE::Mutex::Channel version 1.843 =head1 DESCRIPTION diff --git a/lib/MCE/Mutex/Channel2.pm b/lib/MCE/Mutex/Channel2.pm new file mode 100644 index 0000000..eecc8ac --- /dev/null +++ b/lib/MCE/Mutex/Channel2.pm @@ -0,0 +1,162 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## MCE::Mutex::Channel2 - Provides two mutexes using a single channel. +## +############################################################################### + +package MCE::Mutex::Channel2; + +use strict; +use warnings; + +no warnings qw( threads recursion uninitialized once ); + +our $VERSION = '1.843'; + +use base 'MCE::Mutex::Channel'; +use MCE::Util (); + +my $has_threads = $INC{'threads.pm'} ? 1 : 0; +my $tid = $has_threads ? threads->tid() : 0; + +sub CLONE { + $tid = threads->tid() if $has_threads; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Public methods. +## +############################################################################### + +sub new { + my ($class, %obj) = (@_, impl => 'Channel2'); + $obj{'_init_pid'} = $has_threads ? $$ .'.'. $tid : $$; + + MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock)); + + syswrite $obj{_r_sock}, '0'; + syswrite $obj{_w_sock}, '0'; + + bless \%obj, $class; + + if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) { + MCE::Mutex::Channel::_save_for_global_destruction(\%obj); + } + + return \%obj; +} + +sub lock2 { + my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); + + MCE::Util::_sysread($obj->{_w_sock}, my($b), 1), $obj->{ $pid.'b' } = 1 + unless $obj->{ $pid.'b' }; + + return; +} + +*lock_exclusive2 = \&lock2; +*lock_shared2 = \&lock2; + +sub unlock2 { + my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); + + syswrite($obj->{_r_sock}, '0'), $obj->{ $pid.'b' } = 0 + if $obj->{ $pid.'b' }; + + return; +} + +sub synchronize2 { + my ($pid, $obj, $code, @ret) = ( + $has_threads ? $$ .'.'. $tid : $$, shift, shift + ); + return unless ref($code) eq 'CODE'; + + # lock, run, unlock - inlined for performance + MCE::Util::_sysread($obj->{_w_sock}, my($b), 1), $obj->{ $pid.'b' } = 1 + unless $obj->{ $pid.'b' }; + + (defined wantarray) + ? @ret = wantarray ? $code->(@_) : scalar $code->(@_) + : $code->(@_); + + syswrite($obj->{_r_sock}, '0'), $obj->{ $pid.'b' } = 0; + + return wantarray ? @ret : $ret[-1]; +} + +*enter2 = \&synchronize2; + +sub timedwait2 { + my ($obj, $timeout) = @_; + + local $@; local $SIG{'ALRM'} = sub { alarm 0; die "timed out\n" }; + + eval { alarm $timeout || 1; $obj->lock_exclusive2 }; + + alarm 0; + + ( $@ && $@ eq "timed out\n" ) ? '' : 1; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Mutex::Channel2 - Provides two mutexes using a single channel + +=head1 VERSION + +This document describes MCE::Mutex::Channel2 version 1.843 + +=head1 DESCRIPTION + +A socket implementation based on L<MCE::Mutex>. The secondary lock is accessed +by calling methods, described in L<MCE::Mutex>, with the C<2> suffix. + + my $mutex = MCE::Mutex->new( impl => 'Channel2' ); + +=head2 primary lock + +=over 3 + +=item * $mutex->lock + +=item * $mutex->unlock + +=item * $mutex->synchronize + +=item * $mutex->timedwait + +=back + +=head2 secondary lock + +=over 3 + +=item * $mutex->lock2 + +=item * $mutex->unlock2 + +=item * $mutex->synchronize2 + +=item * $mutex->timedwait2 + +=back + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Mutex/Flock.pm b/lib/MCE/Mutex/Flock.pm index 5c56c47..bbdaa98 100644 --- a/lib/MCE/Mutex/Flock.pm +++ b/lib/MCE/Mutex/Flock.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized once ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; use base 'MCE::Mutex'; use Fcntl ':flock'; @@ -36,7 +36,6 @@ sub DESTROY { sub _open { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); - return if exists $obj->{ $pid }; open $obj->{_fh}, '+>>:raw:stdio', $obj->{path} @@ -150,8 +149,7 @@ sub synchronize { my ($pid, $obj, $code, @ret) = ( $has_threads ? $$ .'.'. $tid : $$, shift, shift ); - - return if ref($code) ne 'CODE'; + return unless ref($code) eq 'CODE'; $obj->_open() unless exists $obj->{ $pid }; @@ -186,7 +184,7 @@ MCE::Mutex::Flock - Mutex locking via Fcntl =head1 VERSION -This document describes MCE::Mutex::Flock version 1.838 +This document describes MCE::Mutex::Flock version 1.843 =head1 DESCRIPTION diff --git a/lib/MCE/Queue.pm b/lib/MCE/Queue.pm index 88a99a1..f15b343 100644 --- a/lib/MCE/Queue.pm +++ b/lib/MCE/Queue.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (Subroutines::ProhibitExplicitReturnUndef) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -86,7 +86,7 @@ sub import { use constant { MAX_DQ_DEPTH => 192, # Maximum dequeue notifications - MUTEX_LOCKS => 6, # Number of mutex locks for 1st level defense + MUTEX_LOCKS => 5, # Number of mutex locks for 1st level defense # against many workers waiting to dequeue OUTPUT_W_QUE => 'W~QUE', # Await from the queue @@ -221,7 +221,7 @@ sub new { MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock)) if $_Q->{_await}; if (exists $_argv{queue} && scalar @{ $_argv{queue} }) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } return $_Q; @@ -407,7 +407,7 @@ sub _heap_insert_high { $_Q->{_tsem} = $_t; if ($_Q->pending() <= $_t) { - MCE::Util::_syswrite($_Q->{_aw_sock}, $LF); + syswrite($_Q->{_aw_sock}, $LF); } else { $_Q->{_asem} += 1; } @@ -460,7 +460,7 @@ sub _heap_insert_high { return; } if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } push @{ $_Q->{_datq} }, @{ $_MCE->{thaw}($_buf) }; } @@ -504,7 +504,7 @@ sub _heap_insert_high { return; } if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } push @{ $_Q->{_datq} }, $_; } @@ -564,7 +564,7 @@ sub _heap_insert_high { if ($_pending) { $_pending = MAX_DQ_DEPTH if ($_pending > MAX_DQ_DEPTH); for my $_i (1 .. $_pending) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } } $_Q->{_dsem} = $_pending; @@ -575,11 +575,11 @@ sub _heap_insert_high { } else { ## Otherwise, never to exceed one byte in the channel - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data(); + syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data(); } if (exists $_Q->{_ended} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } if ($_cnt) { @@ -587,7 +587,7 @@ sub _heap_insert_high { print {$_DAU_R_SOCK} length($_buf).'1'.$LF, $_buf; } elsif (defined $_buf) { - if (!looks_like_number $_buf && !ref $_buf) { + if (!ref $_buf) { print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf; } else { $_buf = $_MCE->{freeze}([ $_buf ]); @@ -600,7 +600,7 @@ sub _heap_insert_high { if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) { for my $_i (1 .. $_Q->{_asem}) { - MCE::Util::_syswrite($_Q->{_aw_sock}, $LF); + syswrite($_Q->{_aw_sock}, $LF); } $_Q->{_asem} = 0; } @@ -626,7 +626,7 @@ sub _heap_insert_high { my $_buf = $_Q->_dequeue(); if (defined $_buf) { - if (!looks_like_number $_buf && !ref $_buf) { + if (!ref $_buf) { print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf; } else { $_buf = $_MCE->{freeze}([ $_buf ]); @@ -660,7 +660,7 @@ sub _heap_insert_high { if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) { for my $_i (1 .. $_Q->{_asem}) { - MCE::Util::_syswrite($_Q->{_aw_sock}, $LF); + syswrite($_Q->{_aw_sock}, $LF); } $_Q->{_asem} = 0; } @@ -732,7 +732,7 @@ sub _heap_insert_high { print {$_DAU_R_SOCK} '-1'.$LF; } else { - if (!looks_like_number $_buf && !ref $_buf) { + if (!ref $_buf) { print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf; } else { $_buf = $_MCE->{freeze}([ $_buf ]); @@ -757,7 +757,7 @@ sub _heap_insert_high { print {$_DAU_R_SOCK} '-1'.$LF; } else { - if (!looks_like_number $_buf && !ref $_buf) { + if (!ref $_buf) { print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf; } else { $_buf = $_MCE->{freeze}([ $_buf ]); @@ -858,7 +858,7 @@ sub _mce_m_end { my ($_Q) = @_; if (!exists $_Q->{_ended}) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) unless $_Q->{_nb_flag}; + syswrite($_Q->{_qw_sock}, $LF) unless $_Q->{_nb_flag}; $_Q->{_ended} = undef; } @@ -877,7 +877,7 @@ sub _mce_m_enqueue { return; } if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } ## Append item(s) into the queue. @@ -901,7 +901,7 @@ sub _mce_m_enqueuep { return; } if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } $_Q->_enqueuep($_p, @_); @@ -945,7 +945,7 @@ sub _mce_m_dequeue { if ($_pending) { $_pending = MAX_DQ_DEPTH if ($_pending > MAX_DQ_DEPTH); for my $_i (1 .. $_pending) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } } $_Q->{_dsem} = $_pending; @@ -956,11 +956,11 @@ sub _mce_m_dequeue { } else { ## Otherwise, never to exceed one byte in the channel - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data(); + syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data(); } if (exists $_Q->{_ended} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } $_Q->{_nb_flag} = 0; @@ -1040,7 +1040,7 @@ sub _mce_m_insert { return; } if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } if (abs($_i) > scalar @{ $_Q->{_datq} }) { @@ -1088,7 +1088,7 @@ sub _mce_m_insertp { return; } if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { - MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); + syswrite($_Q->{_qw_sock}, $LF); } if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) { @@ -1275,7 +1275,7 @@ sub _mce_m_heap { }; $_dat_un = sub { my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; - MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } @@ -1355,7 +1355,7 @@ sub _mce_m_heap { my $_buf = $_Q->{_id}.$LF . length($_tmp).$LF; $_req1->(OUTPUT_A_QUE, $_buf, $_tmp); } - elsif (!looks_like_number $_[0] && defined $_[0]) { + elsif (defined $_[0]) { my $_buf = $_Q->{_id}.$LF . length($_[0]).$LF; $_req1->(OUTPUT_S_QUE, $_buf, $_[0]); } @@ -1381,7 +1381,7 @@ sub _mce_m_heap { my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_tmp).$LF; $_req1->(OUTPUT_A_QUP, $_buf, $_tmp); } - elsif (!looks_like_number $_[0] && defined $_[0]) { + elsif (defined $_[0]) { my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_[0]).$LF; $_req1->(OUTPUT_S_QUP, $_buf, $_[0]); } @@ -1497,7 +1497,7 @@ sub _mce_m_heap { my ($_buf, $_tmp); - if (scalar @_ > 1 || looks_like_number $_[0] || ref $_[0] || !defined $_[0]) { + if (scalar @_ > 1 || ref $_[0] || !defined $_[0]) { $_tmp = $_MCE->{freeze}([ @_ ]); $_buf = $_Q->{_id}.$LF . $_i.$LF . (length($_tmp) + 1).$LF . $_tmp.'1'; } else { @@ -1523,7 +1523,7 @@ sub _mce_m_heap { my ($_buf, $_tmp); - if (scalar @_ > 1 || looks_like_number $_[0] || ref $_[0] || !defined $_[0]) { + if (scalar @_ > 1 || ref $_[0] || !defined $_[0]) { $_tmp = $_MCE->{freeze}([ @_ ]); $_buf = $_Q->{_id}.$LF . $_p.$LF . $_i.$LF . (length($_tmp) + 1).$LF . $_tmp.'1'; @@ -1602,7 +1602,7 @@ MCE::Queue - Hybrid (normal and priority) queues =head1 VERSION -This document describes MCE::Queue version 1.838 +This document describes MCE::Queue version 1.843 =head1 SYNOPSIS @@ -2022,21 +2022,21 @@ numbers, not the data. =over 3 -=item L<List::BinarySearch> +=item * L<List::BinarySearch> The bsearch_num_pos method was helpful for accommodating the highest and lowest order in MCE::Queue. -=item L<POE::Queue::Array> +=item * L<POE::Queue::Array> For extra optimization, two if statements were adopted for checking if the item belongs at the end or head of the queue. -=item L<List::Priority> +=item * L<List::Priority> MCE::Queue supports both normal and priority queues. -=item L<Thread::Queue> +=item * L<Thread::Queue> Thread::Queue is used as a template for identifying and documenting the methods. @@ -2051,7 +2051,7 @@ simultaneously; e.g. $q->pending(); # counts both normal/priority queues -=item L<Parallel::DataPipe> +=item * L<Parallel::DataPipe> The recursion example, in the synopsis above, was largely adopted from this module. diff --git a/lib/MCE/Relay.pm b/lib/MCE/Relay.pm index 5df036c..ca3c8c9 100644 --- a/lib/MCE/Relay.pm +++ b/lib/MCE/Relay.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -343,7 +343,7 @@ MCE::Relay - Extends Many-Core Engine with relay capabilities =head1 VERSION -This document describes MCE::Relay version 1.838 +This document describes MCE::Relay version 1.843 =head1 SYNOPSIS @@ -412,6 +412,8 @@ across all platforms. =item MCE::relay { code } +=back + Relay is enabled by specifying the init_relay option which takes a hash or array reference, or a scalar value. Relaying is orderly and driven by chunk_id when processing data, otherwise task_wid. Omitting the code block (e.g. MCE::relay) @@ -519,8 +521,12 @@ Or simply a scalar value. 4: 1003 4012 size +=over 3 + =item MCE->relay_final ( void ) +=back + Call this method to obtain the final relay value(s) after running. See included example findnull.pl for another use case. @@ -550,8 +556,12 @@ example findnull.pl for another use case. 40 : 180 +=over 3 + =item MCE->relay_recv ( void ) +=back + Call this method to obtain the next relay value before relaying. This allows serial-code to be processed orderly between workers. The following is a parallel demonstration for the fasta-benchmark on the web. @@ -718,10 +728,14 @@ demonstration for the fasta-benchmark on the web. say ">THREE Homo sapiens frequency"; make_random_fasta( $homosapiens, $n * 5 ); +=over 3 + =item MCE->relay_lock ( void ) =item MCE->relay_unlock ( void ) +=back + The C<relay_lock> and C<relay_unlock> methods, added to MCE 1.807, are aliases for C<relay_recv> and C<relay> respectively. They allow one to perform an exclusive action prior to actual relaying of data. @@ -920,8 +934,6 @@ Here, workers write exclusively and orderly to C<STDOUT>. say ">THREE Homo sapiens frequency"; make_random_fasta( $homosapiens, $n * 5 ); -=back - =head1 GATHER AND RELAY DEMONSTRATIONS I received a request from John Martel to process a large flat file and expand @@ -959,6 +971,8 @@ while preserving output order. =item Example One +=back + This example configures a custom function for preserving output order. Unfortunately, the sprintf function alone involves extra CPU time causing the manager process to fall behind. Thus, workers may idle while waiting @@ -1019,8 +1033,12 @@ for the manager process to respond to the gather request. MCE::Loop::finish(); close $fh_out; +=over 3 + =item Example Two +=back + In this example, workers obtain the current ID value and increment/relay for the next worker, ordered by chunk ID behind the scene. Workers call sprintf in parallel, allowing the manager process (out_iter_fh) to accommodate up to @@ -1075,8 +1093,6 @@ not call relay more than once per chunk. Doing so will cause IPC to stall. MCE::Loop::finish(); close $fh_out; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Signal.pm b/lib/MCE/Signal.pm index a682e59..64702ae 100644 --- a/lib/MCE/Signal.pm +++ b/lib/MCE/Signal.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) @@ -428,7 +428,7 @@ MCE::Signal - Temporary directory creation/cleanup and signal handling =head1 VERSION -This document describes MCE::Signal version 1.838 +This document describes MCE::Signal version 1.843 =head1 SYNOPSIS diff --git a/lib/MCE/Step.pm b/lib/MCE/Step.pm index 6c9c8cd..d867da9 100644 --- a/lib/MCE/Step.pm +++ b/lib/MCE/Step.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -716,7 +716,7 @@ MCE::Step - Parallel step model for building creative steps =head1 VERSION -This document describes MCE::Step version 1.838 +This document describes MCE::Step version 1.843 =head1 DESCRIPTION @@ -903,7 +903,11 @@ inside the first block. Hence, the block is called once per each item. ## Array or array_ref mce_step sub { do_work($_) }, 1..10000; - mce_step sub { do_work($_) }, [ 1..10000 ]; + mce_step sub { do_work($_) }, \@list; + + ## Important; pass an array_ref for deeply input data + mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_step sub { do_work($_) }, \@deeply_list; ## File_path, glob_ref, or scalar_ref mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; @@ -933,10 +937,15 @@ This means having to loop through the chunk from inside the first block. ## Looping inside the block is the same for mce_step_f and ## mce_step_s. + ## Array or array_ref mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000; + mce_step sub { do_work($_) for (@{ $_ }) }, \@list; - ## Same as above, resembles code using the Core API. + ## Important; pass an array_ref for deeply input data + mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list; + ## Resembles code using the core MCE API mce_step sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -980,6 +989,8 @@ Specify C<Sereal => 0> to use Storable instead. =item MCE::Step::init { options } +=back + The init function accepts a hash of MCE options. Unlike with MCE::Stream, both gather and bounds_only options may be specified when calling init (not shown below). @@ -1022,8 +1033,6 @@ both gather and bounds_only options may be specified when calling init 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 10000 -=back - Like with MCE::Step::init above, MCE options may be specified using an anonymous hash for the first argument. Notice how task_name, max_workers, and use_threads can take an anonymous array for setting uniquely per @@ -1088,6 +1097,8 @@ equals 1 in order to demonstrate all the possibilities for providing input data. =item mce_step sub { code }, list +=back + Input data may be defined using a list, an array ref, or a hash ref. Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Step takes a @@ -1096,10 +1107,15 @@ needed after the block. # $_ contains the item when chunk_size => 1 - mce_step sub { $_ }, 1..1000; - mce_step sub { $_ }, \@list; + mce_step sub { do_work($_) }, 1..1000; + mce_step sub { do_work($_) }, \@list; + + # Important; pass an array_ref for deeply input data + + mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + mce_step sub { do_work($_) }, \@deeply_list; - # chunking, any chunk_size => 1 or higher + # Chunking; any chunk_size => 1 or greater my %res = mce_step sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -1111,7 +1127,7 @@ needed after the block. }, \@list; - # input hash, current API available since 1.828 + # Input hash; current API available since 1.828 my %res = mce_step sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -1123,7 +1139,7 @@ needed after the block. }, \%hash; - # unlike MCE::Loop, MCE::Step doesn't need input to run + # Unlike MCE::Loop, MCE::Step doesn't need input to run mce_step { max_workers => 4 }, sub { MCE->say( MCE->wid ); @@ -1144,7 +1160,7 @@ needed after the block. MCE->say( "consumer: ", MCE->wid ); }; - # here, options are specified via init + # Here, options are specified via init MCE::Step::init { max_workers => [ 1, 3 ], @@ -1153,10 +1169,14 @@ needed after the block. mce_step \&producer, \&consumers; +=over 3 + =item MCE::Step->run_file ( sub { code }, file ) =item mce_step_f sub { code }, file +=back + The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. @@ -1166,7 +1186,7 @@ position among themselves with zero interaction by the manager process. mce_step_f sub { $_ }, $file_handle; mce_step_f sub { $_ }, \$scalar; - # chunking, any chunk_size => 1 or higher + # chunking, any chunk_size => 1 or greater my %res = mce_step_f sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -1178,10 +1198,14 @@ position among themselves with zero interaction by the manager process. }, "/path/to/file"; +=over 3 + =item MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) =item mce_step_s sub { code }, $beg, $end [, $step, $fmt ] +=back + Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). @@ -1198,7 +1222,7 @@ optional. The format is passed to sprintf (% may be omitted below). step => $step, format => $fmt }; - # chunking, any chunk_size => 1 or higher + # chunking, any chunk_size => 1 or greater my %res = mce_step_s sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -1260,10 +1284,14 @@ Time was measured using 1 worker to emphasize the difference. 7500001 .. 8750000 8750001 .. 10000000 +=over 3 + =item MCE::Step->run ( { input_data => iterator }, sub { code } ) =item mce_step { input_data => iterator }, sub { code } +=back + An iterator reference may be specified for input_data. The only other way is to specify input_data via MCE::Step::init. This prevents MCE::Step from configuring the iterator reference as another user task which will not work. @@ -1276,8 +1304,6 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. mce_step sub { $_ }; -=back - =head1 QUEUE-LIKE FEATURES =over 3 @@ -1286,6 +1312,8 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. =item MCE->step ( arg1, arg2, argN ) +=back + The ->step method is the simplest form for passing elements into the next sub-task. @@ -1320,6 +1348,8 @@ sub-task. 2: 18, 0.985448 3: 19, 0.146548 +=over 3 + =item MCE->enq ( task_name, item ) =item MCE->enq ( task_name, [ arg1, arg2, argN ] ) @@ -1332,6 +1362,8 @@ sub-task. =item MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] ) +=back + The MCE 1.7 release enables finer control. Unlike ->step, which take multiple arguments, the ->enq and ->enqp methods push items at the end of the array internally. Passing multiple arguments is possible by enclosing the arguments @@ -1390,8 +1422,12 @@ will cause an error. D6: 28, 0.220465 D8: 29, 0.630111 +=over 3 + =item MCE->await ( task_name, pending_threshold ) +=back + Providers may sometime run faster than consumers. Thus, increasing memory consumption. MCE 1.7 adds the ->await method for pausing momentarily until the receiving sub-task reaches the minimum threshold for the number of @@ -1445,8 +1481,6 @@ items pending in its queue. 2: 28, 0.918173 3: 29, 0.358266 -=back - =head1 GATHERING DATA Unlike MCE::Map where gather and output order are done for you automatically, @@ -1636,6 +1670,8 @@ running. =item MCE::Step::finish +=back + Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. @@ -1650,8 +1686,6 @@ longer needed. MCE::Step::finish; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Stream.pm b/lib/MCE/Stream.pm index 8318e43..4ee21de 100644 --- a/lib/MCE/Stream.pm +++ b/lib/MCE/Stream.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -672,7 +672,7 @@ MCE::Stream - Parallel stream model for chaining multiple maps and greps =head1 VERSION -This document describes MCE::Stream version 1.838 +This document describes MCE::Stream version 1.843 =head1 SYNOPSIS @@ -695,18 +695,22 @@ This document describes MCE::Stream version 1.838 ## Array or array_ref my @a = mce_stream sub { $_ * $_ }, 1..10000; - my @b = mce_stream sub { $_ * $_ }, [ 1..10000 ]; + my @b = mce_stream sub { $_ * $_ }, \@list; + + ## Important; pass an array_ref for deeply input data + my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list; ## File_path, glob_ref, or scalar_ref - my @c = mce_stream_f sub { chomp; $_ }, "/path/to/file"; - my @d = mce_stream_f sub { chomp; $_ }, $file_handle; - my @e = mce_stream_f sub { chomp; $_ }, \$scalar; + my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file"; + my @f = mce_stream_f sub { chomp; $_ }, $file_handle; + my @g = mce_stream_f sub { chomp; $_ }, \$scalar; ## Sequence of numbers (begin, end [, step, format]) - my @f = mce_stream_s sub { $_ * $_ }, 1, 10000, 5; - my @g = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ]; + my @h = mce_stream_s sub { $_ * $_ }, 1, 10000, 5; + my @i = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ]; - my @h = mce_stream_s sub { $_ * $_ }, { + my @j = mce_stream_s sub { $_ * $_ }, { begin => 1, end => 10000, step => 5, format => undef }; @@ -791,6 +795,8 @@ Specify C<Sereal => 0> to use Storable instead. =item MCE::Stream::init { options } +=back + The init function accepts a hash of MCE options. The gather and bounds_only options, if specified, are ignored due to being used internally by the module (not shown below). @@ -833,8 +839,6 @@ module (not shown below). 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 10000 -=back - Like with MCE::Stream::init above, MCE options may be specified using an anonymous hash for the first argument. Notice how both max_workers and task_name can take an anonymous array for setting values uniquely @@ -916,18 +920,30 @@ possibilities for providing input data. =item mce_stream sub { code }, list +=back + Input data may be defined using a list or an array reference. Unlike MCE::Loop, Flow, and Step, specifying a hash reference as input data isn't allowed. + ## Array or array_ref my @a = mce_stream sub { $_ * 2 }, 1..1000; my @b = mce_stream sub { $_ * 2 }, \@list; - my @z = mce_stream sub { $_ * 2 }, \%hash; # not supported + ## Important; pass an array_ref for deeply input data + my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ]; + my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list; + + ## Not supported + my @z = mce_stream sub { ... }, \%hash; + +=over 3 =item MCE::Stream->run_file ( sub { code }, file ) =item mce_stream_f sub { code }, file +=back + The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. @@ -935,10 +951,14 @@ position among themselves with zero interaction by the manager process. my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle; my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar; +=over 3 + =item MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) =item mce_stream_s sub { code }, $beg, $end [, $step, $fmt ] +=back + Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). @@ -952,10 +972,14 @@ optional. The format is passed to sprintf (% may be omitted below). begin => $beg, end => $end, step => $step, format => $fmt }; +=over 3 + =item MCE::Stream->run ( { input_data => iterator }, sub { code } ) =item mce_stream { input_data => iterator }, sub { code } +=back + An iterator reference may be specified for input_data. The only other way is to specify input_data via MCE::Stream::init. This prevents MCE::Stream from configuring the iterator reference as another user task which will @@ -969,8 +993,6 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 }; -=back - =head1 MANUAL SHUTDOWN =over 3 @@ -979,6 +1001,8 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. =item MCE::Stream::finish +=back + Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. @@ -993,8 +1017,6 @@ longer needed. MCE::Stream::finish; -=back - =head1 INDEX L<MCE|MCE>, L<MCE::Core> diff --git a/lib/MCE/Subs.pm b/lib/MCE/Subs.pm index 91bcf86..2a82b39 100644 --- a/lib/MCE/Subs.pm +++ b/lib/MCE/Subs.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (Subroutines::ProhibitSubroutinePrototypes) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -75,7 +75,6 @@ sub mce_status ( ) { return $MCE::MCE->status(); } ## Callable by the worker process only. -sub mce_do (@) { return $MCE::MCE->do(@_); } sub mce_exit (@) { return $MCE::MCE->exit(@_); } sub mce_gather (@) { return $MCE::MCE->gather(@_); } sub mce_last ( ) { return $MCE::MCE->last(); } @@ -89,6 +88,7 @@ sub mce_yield ( ) { return $MCE::MCE->yield(); } ## Callable by both the manager and worker processes. sub mce_abort ( ) { return $MCE::MCE->abort(); } +sub mce_do (@) { return $MCE::MCE->do(@_); } sub mce_freeze (@) { return $MCE::MCE->{freeze}(@_); } sub mce_print (;*@) { return $MCE::MCE->print(@_); } sub mce_printf (;*@) { return $MCE::MCE->printf(@_); } @@ -147,7 +147,6 @@ sub _export_subs { ## Callable by the worker process only. if ($_w_flg) { - *{ $_package . '::mce_do' } = \&mce_do; *{ $_package . '::mce_exit' } = \&mce_exit; *{ $_package . '::mce_gather' } = \&mce_gather; *{ $_package . '::mce_last' } = \&mce_last; @@ -163,6 +162,7 @@ sub _export_subs { if ($_m_flg || $_w_flg) { *{ $_package . '::mce_abort' } = \&mce_abort; + *{ $_package . '::mce_do' } = \&mce_do; *{ $_package . '::mce_freeze' } = \&mce_freeze; *{ $_package . '::mce_print' } = \&mce_print; *{ $_package . '::mce_printf' } = \&mce_printf; @@ -204,7 +204,7 @@ MCE::Subs - Exports functions mapped directly to MCE methods =head1 VERSION -This document describes MCE::Subs version 1.838 +This document describes MCE::Subs version 1.843 =head1 SYNOPSIS @@ -279,6 +279,8 @@ MCE methods are described in L<MCE::Core>. =item * mce_abort +=item * mce_do + =item * mce_forchunk =item * mce_foreach diff --git a/lib/MCE/Util.pm b/lib/MCE/Util.pm index b884885..1d35108 100644 --- a/lib/MCE/Util.pm +++ b/lib/MCE/Util.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized numeric ); -our $VERSION = '1.838'; +our $VERSION = '1.843'; ## no critic (BuiltinFunctions::ProhibitStringyEval) @@ -183,7 +183,7 @@ sub _destroy_socks { for my $_i (0 .. @{ $_obj->{$_p} } - 1) { next unless (defined $_obj->{$_p}[$_i]); if (fileno $_obj->{$_p}[$_i]) { - MCE::Util::_syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv; + syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv; eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) }; close $_obj->{$_p}[$_i]; } @@ -192,7 +192,7 @@ sub _destroy_socks { } else { if (fileno $_obj->{$_p}) { - MCE::Util::_syswrite($_obj->{$_p}, '0') if $_is_winenv; + syswrite($_obj->{$_p}, '0') if $_is_winenv; eval q{ CORE::shutdown($_obj->{$_p}, 2) }; close $_obj->{$_p}; } @@ -262,15 +262,16 @@ sub _sock_pair { sub _sock_ready { my ($_socket, $_timeout) = @_; - return '' if !defined $_timeout && exists $_sock_ready{"$_socket"}; + return '' if !defined $_timeout && $_sock_ready{"$_socket"} > 1; my $_val_bytes = "\x00\x00\x00\x00"; my $_ptr_bytes = unpack('I', pack('P', $_val_bytes)); my ($_delay, $_start) = (0, time); if (!defined $_timeout) { - $_sock_ready{"$_socket"} = undef; - } else { + $_sock_ready{"$_socket"}++; + } + else { $_timeout = undef if $_timeout < 0; $_timeout += $_start if $_timeout; } @@ -288,14 +289,46 @@ sub _sock_ready { } } +sub _sock_ready_w { + my ($_socket) = @_; + return if $_sock_ready{"${_socket}_w"} > 1; + + my $_vec = ''; + $_sock_ready{"${_socket}_w"}++; + + while (1) { + vec($_vec, fileno($_socket), 1) = 1; + return if select(undef, $_vec, undef, 0) > 0; + sleep 0.045; + } + + return; +} + sub _sysread { - my ($_delay, $_start); + my $_bytes; - SYSREAD: ( @_ == 3 - ? sysread($_[0], $_[1], $_[2]) - : sysread($_[0], $_[1], $_[2], $_[3]) + SYSREAD: $_bytes = ( @_ == 3 + ? CORE::sysread($_[0], $_[1], $_[2]) + : CORE::sysread($_[0], $_[1], $_[2], $_[3]) ) or do { - goto SYSREAD if $! == Errno::EINTR(); + return $_bytes if (defined $MCE::Signal::KILLED); + goto SYSREAD if ($! == Errno::EINTR()); + }; + + return $_bytes; +} + +sub _sysread2 { + my ($_bytes, $_delay, $_start); + # called by MCE/Core/Manager.pm + + SYSREAD: $_bytes = ( @_ == 3 + ? CORE::sysread($_[0], $_[1], $_[2]) + : CORE::sysread($_[0], $_[1], $_[2], $_[3]) + ) or do { + return $_bytes if (defined $MCE::Signal::KILLED); + goto SYSREAD if ($! == Errno::EINTR()); # non-blocking operation could not be completed if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) { @@ -309,37 +342,18 @@ sub _sysread { } }; - return; -} - -sub _sysseek { - my $_pos; - - SYSSEEK: $_pos = sysseek($_[0], $_[1], $_[2]) or do { - goto SYSSEEK if $! == Errno::EINTR(); - }; - - return $_pos; -} - -sub _syswrite { - syswrite($_[0], $_[1]) or do { - goto \&_syswrite if $! == Errno::EINTR(); - }; - return; + return $_bytes; } sub _nonblocking { if ($^O eq 'MSWin32') { - my $nonblocking = ( $_[1] ) ? "\x00\x00\x00\x01" : "\x00\x00\x00\x00"; - # MSWin32 FIONBIO - from winsock2.h macro + my $nonblocking = $_[1] ? "\x00\x00\x00\x01" : "\x00\x00\x00\x00"; + ioctl($_[0], 0x8004667e, unpack("I", pack('P', $nonblocking))); } else { - my $nonblocking = ( $_[1] ) ? 0 : 1; - - $_[0]->blocking($nonblocking); + $_[0]->blocking( $_[1] ? 0 : 1 ); } return; @@ -361,7 +375,7 @@ MCE::Util - Utility functions =head1 VERSION -This document describes MCE::Util version 1.838 +This document describes MCE::Util version 1.843 =head1 SYNOPSIS |