summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorintrigeri <intrigeri@boum.org>2019-07-25 02:23:34 +0000
committerintrigeri <intrigeri@boum.org>2019-07-25 02:23:34 +0000
commita0567bc8c39a9f006202a716ca0da1e5b12771a5 (patch)
tree086bfbe53d72fe8722339cd762a8670ed4897419 /lib
parentced942ca55cb90ddce354f56c6696bb34ba8c2ff (diff)
New upstream version 1.843
Diffstat (limited to 'lib')
-rw-r--r--lib/MCE.pm65
-rw-r--r--lib/MCE.pod89
-rw-r--r--lib/MCE/Candy.pm6
-rw-r--r--lib/MCE/Channel.pm658
-rw-r--r--lib/MCE/Channel/Mutex.pm356
-rw-r--r--lib/MCE/Channel/Simple.pm332
-rw-r--r--lib/MCE/Channel/Threads.pm361
-rw-r--r--lib/MCE/Child.pm1940
-rw-r--r--lib/MCE/Core.pod14
-rw-r--r--lib/MCE/Core/Input/Generator.pm2
-rw-r--r--lib/MCE/Core/Input/Handle.pm26
-rw-r--r--lib/MCE/Core/Input/Iterator.pm4
-rw-r--r--lib/MCE/Core/Input/Request.pm4
-rw-r--r--lib/MCE/Core/Input/Sequence.pm12
-rw-r--r--lib/MCE/Core/Manager.pm27
-rw-r--r--lib/MCE/Core/Validation.pm6
-rw-r--r--lib/MCE/Core/Worker.pm11
-rw-r--r--lib/MCE/Examples.pod2
-rw-r--r--lib/MCE/Flow.pm62
-rw-r--r--lib/MCE/Grep.pm54
-rw-r--r--lib/MCE/Loop.pm58
-rw-r--r--lib/MCE/Map.pm54
-rw-r--r--lib/MCE/Mutex.pm20
-rw-r--r--lib/MCE/Mutex/Channel.pm41
-rw-r--r--lib/MCE/Mutex/Channel2.pm162
-rw-r--r--lib/MCE/Mutex/Flock.pm8
-rw-r--r--lib/MCE/Queue.pm68
-rw-r--r--lib/MCE/Relay.pm28
-rw-r--r--lib/MCE/Signal.pm4
-rw-r--r--lib/MCE/Step.pm74
-rw-r--r--lib/MCE/Stream.pm54
-rw-r--r--lib/MCE/Subs.pm10
-rw-r--r--lib/MCE/Util.pm84
33 files changed, 4369 insertions, 327 deletions
diff --git a/lib/MCE.pm b/lib/MCE.pm
index 04b7f56..97667dd 100644
--- a/lib/MCE.pm
+++ b/lib/MCE.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -90,9 +90,9 @@ BEGIN {
## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
## _init_pid _init_total_workers
##
- ## _bsb_r_sock _bsb_w_sock _bse_r_sock _bse_w_sock _com_r_sock _com_w_sock
- ## _dat_r_sock _dat_w_sock _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock
- ## _data_channels _lock_chn _mutex_n
+ ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock
+ ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels
+ ## _lock_chn _mutex_n
%_valid_fields_new = map { $_ => 1 } qw(
max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw
@@ -212,7 +212,7 @@ sub import {
use constant {
# Max data channels. This cannot be greater than 8 on MSWin32.
- DATA_CHANNELS => ($^O eq 'MSWin32') ? 8 : 12,
+ DATA_CHANNELS => ($^O eq 'MSWin32') ? 8 : 10,
# Max GC size. Undef variable when exceeding size.
MAX_GC_SIZE => 1024 * 1024 * 64,
@@ -477,11 +477,14 @@ sub new {
$self{_last_sref} = (ref $self{input_data} eq 'SCALAR')
? refaddr($self{input_data}) : 0;
- my $_data_channels = ("$$.$_tid" eq $_oid) ? DATA_CHANNELS : 2;
+ my $_data_channels = ("$$.$_tid" eq $_oid)
+ ? ( $INC{'MCE/Channel.pm'} ? 6 : DATA_CHANNELS )
+ : 2;
+
my $_total_workers = 0;
if (defined $self{user_tasks}) {
- $_total_workers += $_->{max_workers} for (@{ $self{user_tasks} });
+ $_total_workers += $_->{max_workers} for @{ $self{user_tasks} };
} else {
$_total_workers = $self{max_workers};
}
@@ -492,7 +495,7 @@ sub new {
? $_total_workers : $_data_channels;
$self{_lock_chn} = ($_total_workers > $_data_channels) ? 1 : 0;
- $self{_lock_chn} = 1 if $INC{'MCE/Hobo.pm'};
+ $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'};
$MCE = \%self if ($MCE->{_wid} == 0);
@@ -527,7 +530,7 @@ sub spawn {
local $@; eval 'require Net::HTTP; require Net::HTTPS';
}
- ## Start the shared-manager process if present.
+ ## Start the shared-manager process if not running.
MCE::Shared->start() if $INC{'MCE/Shared.pm'};
## Load input module.
@@ -599,7 +602,6 @@ sub spawn {
## Create sockets for IPC.
MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock)); # sync
- MCE::Util::_sock_pair($self, qw(_bse_r_sock _bse_w_sock)); # sync
MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock)); # core
MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_) # core
for (0 .. $_data_channels);
@@ -1123,9 +1125,7 @@ sub run {
## Insert the first message into the queue if defined.
if (defined $_first_msg) {
- MCE::Util::_syswrite(
- $self->{_que_w_sock}, pack($_que_template, 0, $_first_msg)
- );
+ syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg));
}
## Submit params data to workers.
@@ -1162,11 +1162,11 @@ sub run {
## Notify workers to commence processing.
if ($_is_MSWin32) {
my $_buf = _sprintf("%${_total_workers}s", "");
- MCE::Util::_syswrite($self->{_bse_w_sock}, $_buf);
+ syswrite($self->{_bsb_r_sock}, $_buf);
} else {
- my $_BSE_W_SOCK = $self->{_bse_w_sock};
+ my $_BSB_R_SOCK = $self->{_bsb_r_sock};
for my $_i (1 .. $_total_workers) {
- MCE::Util::_syswrite($_BSE_W_SOCK, $LF);
+ syswrite($_BSB_R_SOCK, $LF);
}
}
}
@@ -1345,9 +1345,8 @@ sub shutdown {
$_COM_R_SOCK = undef;
MCE::Util::_destroy_socks($self, qw(
- _bsb_w_sock _bsb_r_sock _bse_w_sock _bse_r_sock
- _com_w_sock _com_r_sock _dat_w_sock _dat_r_sock
- _rla_w_sock _rla_r_sock
+ _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock
+ _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock
));
($_is_MSWin32)
@@ -1393,7 +1392,7 @@ sub sync {
my $_chn = $self->{_chn};
my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
my $_BSB_R_SOCK = $self->{_bsb_r_sock};
- my $_BSE_R_SOCK = $self->{_bse_r_sock};
+ my $_BSB_W_SOCK = $self->{_bsb_w_sock};
my $_buf;
local $\ = undef if (defined $\);
@@ -1409,7 +1408,7 @@ sub sync {
print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF;
## Wait until all workers from (task_id 0) have un-synced.
- MCE::Util::_sysread($_BSE_R_SOCK, $_buf, 1);
+ MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1);
return;
}
@@ -1460,7 +1459,7 @@ sub abort {
if ($_abort_msg > 0) {
MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size);
- MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg));
+ syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg));
}
if ($self->{_wid} > 0) {
@@ -1503,7 +1502,9 @@ sub exit {
unless ($self->{_exiting}) {
$self->{_exiting} = 1;
- ## Check nested Hobo workers not yet joined.
+ ## Check for nested workers not yet joined.
+ MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
+
MCE::Hobo->finish('MCE')
if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
@@ -1608,17 +1609,20 @@ sub do {
my $self = shift; $self = $MCE unless ref($self);
my $_pkg = caller() eq 'MCE' ? caller(1) : caller();
- _croak('MCE::do: method is not allowed by the manager process')
- unless ($self->{_wid});
_croak('MCE::do: (code ref) is not supported')
if (ref $_[0] eq 'CODE');
-
_croak('MCE::do: (callback) is not specified')
unless (defined ( my $_func = shift ));
$_func = $_pkg.'::'.$_func if (index($_func, ':') < 0);
- return _do_callback($self, $_func, [ @_ ]);
+ if ($self->{_wid}) {
+ return _do_callback($self, $_func, [ @_ ]);
+ }
+ else {
+ no strict 'refs';
+ return $_func->(@_);
+ }
}
## Gather method.
@@ -1855,7 +1859,6 @@ sub _make_sessdir {
sub _sprintf {
my ($_fmt, $_arg) = @_;
-
# remove tainted'ness
($_fmt) = $_fmt =~ /(.*)/;
@@ -1923,7 +1926,9 @@ sub _dispatch {
## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x
@_ = ();
- $ENV{'PERL_MCE_IPC'} = 'win32' if ($_is_MSWin32 && $INC{'MCE/Hobo.pm'});
+ $ENV{'PERL_MCE_IPC'} = 'win32' if (
+ $_is_MSWin32 && ( $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'} )
+ );
if (!$_is_thread && UNIVERSAL::can('Prima', 'cleanup')) {
no warnings 'redefine'; local $@; eval '*Prima::cleanup = sub {}';
@@ -1944,6 +1949,8 @@ sub _dispatch {
MCE::Hobo->_clear()
if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
+
+ MCE::Child->_clear() if $INC{'MCE/Child.pm'};
}
if (!$self->{use_threads} && $INC{'Math/Random.pm'}) {
diff --git a/lib/MCE.pod b/lib/MCE.pod
index b606382..d434111 100644
--- a/lib/MCE.pod
+++ b/lib/MCE.pod
@@ -5,7 +5,7 @@ MCE - Many-Core Engine for Perl providing parallel processing capabilities
=head1 VERSION
-This document describes MCE version 1.838
+This document describes MCE version 1.843
Many-Core Engine (MCE) for Perl helps enable a new level of performance by
maximizing all available cores.
@@ -135,28 +135,35 @@ The next demonstration loops through a sequence of numbers with MCE::Flow.
=head1 CORE MODULES
-Three modules make up the core engine for MCE.
+Four modules make up the core engine for MCE.
=over 3
=item L<MCE::Core>
-Provides the Core API for Many-Core Engine. The various MCE options are
-described here. It includes several demonstrations at the end of the page.
+This is the POD documentation describing the core Many-Core Engine (MCE) API.
+Go here for help with the various MCE options. See also, L<MCE::Examples>
+for additional demonstrations.
+
+=item L<MCE::Mutex>
+
+Provides a simple semaphore implementation supporting threads and processes.
+Two implementations are provided; one via pipes or socket depending on the
+platform and the other using Fcntl.
=item L<MCE::Signal>
-Temporary directory creation, cleanup, and signal handling.
+Provides signal handling, temporary directory creation, and cleanup for MCE.
=item L<MCE::Util>
-Utility functions for Many-Core Engine.
+Provides utility functions for MCE.
=back
=head1 MCE EXTRAS
-There are 4 add-on modules for use with MCE.
+There are 5 add-on modules for use with MCE.
=over 3
@@ -165,11 +172,18 @@ There are 4 add-on modules for use with MCE.
Provides a collection of sugar methods and output iterators for preserving
output order.
-=item L<MCE::Mutex>
+=item L<MCE::Channel>
-Provides a simple semaphore implementation supporting threads and processes.
-Two implementations are provided. One via pipes or socket depending on the
-platform. The other via Fcntl.
+Introduced in MCE 1.839, provides queue-like and two-way communication
+capability. Three implementations C<Simple>, C<Mutex>, and C<Threads> are
+provided. C<Simple> does not involve locking whereas C<Mutex> and C<Threads>
+do locking transparently using C<MCE::Mutex> and C<threads> respectively.
+
+=item L<MCE::Child>
+
+Also introduced in MCE 1.839, provides a threads-like parallelization module
+that is compatible with Perl 5.8. It is a fork of L<MCE::Hobo>. The difference
+is using a common C<MCE::Channel> object when yielding and joining.
=item L<MCE::Queue>
@@ -180,51 +194,53 @@ threads.
=item L<MCE::Relay>
-Enables workers to receive and pass on information orderly with zero
-involvement by the manager process while running.
+Provides workers the ability to receive and pass information orderly with zero
+involvement by the manager process. This module is loaded automatically by
+MCE when specifying the C<init_relay> MCE option.
=back
=head1 MCE MODELS
-The models take Many-Core Engine to a new level for ease of use. Two options
-(chunk_size and max_workers) are configured automatically as well as spawning
-and shutdown.
+The MCE models are sugar syntax on top of the L<MCE::Core> API. Two MCE options
+(chunk_size and max_workers) are configured automatically. Moreover, spawning
+workers and later shutdown occur transparently behind the scene.
+
+Choosing a MCE Model largely depends on the application. It all boils down
+to how much automation you need MCE to handle transparently. Or if you prefer,
+constructing the MCE object and running using the core MCE API is fine too.
=over 3
-=item L<MCE::Loop>
+=item L<MCE::Grep>
-Provides a MCE model for building parallel loops.
+Provides a parallel grep implementation similar to the native grep function.
-=item L<MCE::Flow>
+=item L<MCE::Map>
-A parallel flow model for building creative applications. This makes use of
-user_tasks in MCE. The author has full control when utilizing this model.
-MCE::Flow is similar to MCE::Loop, but allows for multiple code blocks to
-run in parallel with a slight change to syntax.
+Provides a parallel map implementation similar to the native map function.
-=item L<MCE::Grep>
+=item L<MCE::Loop>
-Provides a parallel grep implementation similar to the native grep function.
+Provides a parallel for loop implementation.
-=item L<MCE::Map>
+=item L<MCE::Flow>
-Provides a parallel map model similar to the native map function.
+Like C<MCE::Loop>, but with support for multiple pools of workers. The pool
+of workers are configured transparently via the MCE C<user_tasks> option.
=item L<MCE::Step>
-Provides a parallel step implementation utilizing MCE::Queue between user
-tasks. MCE::Step is a spin off from MCE::Flow with a touch of MCE::Stream.
-This model, introduced in 1.506, allows one to pass data from one sub-task
-into the next transparently.
+Like C<MCE::Flow>, but adds a C<MCE::Queue> object between each pool of
+workers. This model, introduced in 1.506, allows one to pass data forward
+(left to right) from one sub-task into another with little effort.
=item L<MCE::Stream>
-Provides an efficient parallel implementation for chaining multiple maps
-and greps together through user_tasks and MCE::Queue. Like with MCE::Flow,
-MCE::Stream can run multiple code blocks in parallel with a slight change
-to syntax from MCE::Map and MCE::Grep.
+This provides an efficient parallel implementation for chaining multiple maps
+and greps transparently. Like C<MCE::Flow> and C<MCE::Step>, it too supports
+multiple pools of workers. The distinction is that C<MCE::Stream> passes
+data from right to left and done for you transparently.
=back
@@ -266,7 +282,8 @@ The source, cookbook, and examples are hosted at GitHub.
=head1 SEE ALSO
C<MCE::Shared> provides data sharing capabilities for C<MCE>. It includes
-C<MCE::Hobo> for running code asynchronously.
+C<MCE::Hobo> for running code asynchronously with the IPC handled by the
+shared-manager process.
=over 3
diff --git a/lib/MCE/Candy.pm b/lib/MCE/Candy.pm
index 5facf29..6fa20eb 100644
--- a/lib/MCE/Candy.pm
+++ b/lib/MCE/Candy.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
our @CARP_NOT = qw( MCE );
@@ -219,7 +219,7 @@ MCE::Candy - Sugar methods and output iterators
=head1 VERSION
-This document describes MCE::Candy version 1.838
+This document describes MCE::Candy version 1.843
=head1 DESCRIPTION
@@ -458,7 +458,7 @@ processing input data.
my @results;
mce_flow {
- max_workers => 'auto', ## Note that 'auto' is never higher than 8
+ max_workers => 'auto', ## Note that 'auto' is never greater than 8
gather => MCE::Candy::out_iter_array(\@results)
},
sub {
diff --git a/lib/MCE/Channel.pm b/lib/MCE/Channel.pm
new file mode 100644
index 0000000..aa90d81
--- /dev/null
+++ b/lib/MCE/Channel.pm
@@ -0,0 +1,658 @@
+###############################################################################
+## ----------------------------------------------------------------------------
+## Queue-like and two-way communication capability.
+##
+###############################################################################
+
+package MCE::Channel;
+
+use strict;
+use warnings;
+
+no warnings qw( uninitialized once );
+
+our $VERSION = '1.843';
+
+## no critic (BuiltinFunctions::ProhibitStringyEval)
+## no critic (TestingAndDebugging::ProhibitNoStrict)
+
+use if $^O eq 'MSWin32', 'threads';
+use if $^O eq 'MSWin32', 'threads::shared';
+
+use Carp ();
+
+$Carp::Internal{ (__PACKAGE__) }++;
+
+my ( $freeze, $thaw );
+
+BEGIN {
+ if ( ! defined $INC{'PDL.pm'} ) {
+ local $@; eval '
+ use Sereal::Encoder 3.015 qw( encode_sereal );
+ use Sereal::Decoder 3.015 qw( decode_sereal );
+ ';
+ if ( ! $@ ) {
+ my $encoder_ver = int( Sereal::Encoder->VERSION() );
+ my $decoder_ver = int( Sereal::Decoder->VERSION() );
+ if ( $encoder_ver - $decoder_ver == 0 ) {
+ $freeze = \&encode_sereal;
+ $thaw = \&decode_sereal;
+ }
+ }
+ }
+
+ if ( ! defined $freeze ) {
+ require Storable;
+ $freeze = \&Storable::freeze;
+ $thaw = \&Storable::thaw;
+ }
+}
+
+use MCE::Util ();
+use bytes;
+
+my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
+my $has_threads = $INC{'threads.pm'} ? 1 : 0;
+my $tid = $has_threads ? threads->tid() : 0;
+
+sub new {
+ my ( $class, %argv ) = @_;
+ my $impl = defined( $argv{impl} ) ? ucfirst( lc $argv{impl} ) : 'Mutex';
+
+ $impl = 'Threads' if ( $^O eq 'MSWin32' && $impl eq 'Mutex' );
+
+ eval "require MCE::Channel::$impl; 1" ||
+ Carp::croak("Could not load Channel implementation '$impl': $@");
+
+ my $pkg = 'MCE::Channel::'.$impl;
+ no strict 'refs';
+
+ $pkg->new(%argv);
+}
+
+sub CLONE {
+ $tid = threads->tid if $has_threads;
+}
+
+sub DESTROY {
+ my ( $pid, $self ) = ( $has_threads ? $$ .'.'. $tid : $$, @_ );
+
+ if ( $self->{'init_pid'} && $self->{'init_pid'} eq $pid ) {
+ MCE::Util::_destroy_socks( $self, qw(c_sock p_sock) );
+ delete $self->{c_mutex};
+ delete $self->{p_mutex};
+ }
+
+ return;
+}
+
+sub impl {
+ $_[0]->{'impl'} || 'Not defined';
+}
+
+sub _get_freeze { $freeze; }
+sub _get_thaw { $thaw; }
+
+sub _ended {
+ warn "WARNING: ($_[0]) called on a channel that has been 'end'ed\n";
+
+ return;
+}
+
+sub _read {
+ my $bytes = MCE::Util::_sysread( $_[0], $_[1], my $len = $_[2] );
+ my $read = $bytes;
+
+ while ( $bytes && $read != $len ) {
+ $bytes = MCE::Util::_sysread( $_[0], $_[1], $len - $read, length($_[1]) );
+ $read += $bytes if $bytes;
+ }
+
+ return;
+}
+
+sub _pid {
+ $has_threads ? $$ .'.'. $tid : $$;
+}
+
+1;
+
+__END__
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Module usage.
+##
+###############################################################################
+
+=head1 NAME
+
+MCE::Channel - Queue-like and two-way communication capability
+
+=head1 VERSION
+
+This document describes MCE::Channel version 1.843
+
+=head1 SYNOPSIS
+
+ use MCE::Channel;
+
+ ########################
+ # Construction
+ ########################
+
+ # A single producer and many consumers supporting processes and threads
+
+ my $c1 = MCE::Channel->new( impl => 'Mutex' ); # default implementation
+ my $c2 = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
+
+ # Set the mp flag if two or more workers (many producers) will be calling
+ # enqueue/send or recv2/recv2_nb on the left end of the channel
+
+ my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
+ my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
+
+ # Tuned for one producer and one consumer, no locking
+
+ my $c5 = MCE::Channel->new( impl => 'Simple' );
+
+ ########################
+ # Queue-like behavior
+ ########################
+
+ # Send data to consumers
+ $c1->enqueue('item');
+ $c1->enqueue(qw/item1 item2 item3 itemN/);
+
+ # Receive data
+ my $item = $c1->dequeue(); # item
+ my @items = $c1->dequeue(2); # (item1, item2)
+
+ # Receive, non-blocking
+ my $item = $c1->dequeue_nb(); # item
+ my @items = $c1->dequeue_nb(2); # (item1, item2)
+
+ # Signal that there is no more work to be sent
+ $c1->end();
+
+ ########################
+ # Two-way communication
+ ########################
+
+ # Producer(s) sending data
+ $c3->send('message');
+ $c3->send(qw/arg1 arg2 arg3/);
+
+ # Consumer(s) receiving data
+ my $mesg = $c3->recv(); # message
+ my @args = $c3->recv(); # (arg1, arg2, arg3)
+
+ # Alternatively, non-blocking
+ my $mesg = $c3->recv_nb(); # message
+ my @args = $c3->recv_nb(); # (arg1, arg2, arg3)
+
+ # A producer signaling no more work to be sent
+ $c3->end();
+
+ # Consumers(s) sending data
+ $c3->send2('message');
+ $c3->send2(qw/arg1 arg2 arg3/);
+
+ # Producer(s) receiving data
+ my $mesg = $c3->recv2(); # message
+ my @args = $c3->recv2(); # (arg1, arg2, arg3)
+
+ # Alternatively, non-blocking
+ my $mesg = $c3->recv2_nb(); # message
+ my @args = $c3->recv2_nb(); # (arg1, arg2, arg3)
+
+=head1 DESCRIPTION
+
+A MCE::Channel object is a container for sending and receiving data using
+socketpair handles. Serialization is provided by L<Sereal> if available.
+Defaults to L<Storable> otherwise. Excluding the C<Simple> implementation,
+both ends of the C<channel> support many workers concurrently (with mp => 1).
+
+=head2 new ( impl => STRING, mp => BOOLEAN )
+
+This creates a new channel. Three implementations are provided C<Mutex> (default),
+C<Threads>, and C<Simple> indicating the locking mechanism to use C<MCE::Mutex>,
+C<threads::shared>, and no locking respectively.
+
+ $chnl = MCE::Channel->new(); # default: impl => 'Mutex', mp => 0
+
+The C<Mutex> channel implementation supports processes and threads whereas the
+C<Threads> channel implementation is suited for threads only.
+
+ $chnl = MCE::Channel->new( impl => 'Mutex' ); # MCE::Mutex locking
+ $chnl = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
+
+Set the C<mp> (m)any (p)roducers option to a true value if there will be two
+or more workers calling C<enqueue>, <send>, C<recv2>, or C<recv2_nb> on the
+left end of the channel. This is important to not incur a race condition.
+
+ $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
+ $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
+
+The C<Simple> implementation is optimized for one producer and one consumer max.
+It omits locking for maximum performance. This implementation is preferred for
+parent to child communication not shared by another worker.
+
+ $chnl = MCE::Channel->new( impl => 'Simple' );
+
+=head1 QUEUE-LIKE BEHAVIOR
+
+=head2 enqueue ( ITEM1 [, ITEM2, ... ] )
+
+Appends a list of items onto the left end of the channel. This will block once
+the internal socket buffer becomes full (i.e. awaiting workers to dequeue on the
+other end). This prevents producer(s) from running faster than consumer(s).
+
+Object (de)serialization is handled automatically using L<Sereal> if available
+or defaults to L<Storable> otherwise.
+
+ $chnl->enqueue('item1');
+ $chnl->enqueue(qw/item2 item3 .../);
+
+ $chnl->enqueue([ array_ref1 ]);
+ $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
+
+ $chnl->enqueue({ hash_ref1 });
+ $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
+
+=head2 dequeue
+
+=head2 dequeue ( COUNT )
+
+Removes the requested number of items (default 1) from the right end of the
+channel. If the channel contains fewer than the requested number of items,
+the method will block (i.e. until other producer(s) enqueue more items).
+
+ $item = $chnl->dequeue(); # item1
+ @items = $chnl->dequeue(2); # ( item2, item3 )
+
+=head2 dequeue_nb
+
+=head2 dequeue_nb ( COUNT )
+
+Removes the requested number of items (default 1) from the right end of the
+channel. If the channel contains fewer than the requested number of items,
+the method will return what it was able to retrieve and return immediately.
+If the channel is empty, then returns C<an empty list> in list context or
+C<undef> in scalar context.
+
+ $item = $chnl->dequeue_nb(); # array_ref1
+ @items = $chnl->dequeue_nb(2); # ( array_ref2, array_ref3 )
+
+=head2 end
+
+This is called by a producer to signal that there is no more work to be sent.
+Once ended, no more items may be sent by the producer. Calling C<end> by
+multiple producers is not supported.
+
+ $chnl->end;
+
+=head1 TWO-WAY IPC - PRODUCER TO CONSUMER
+
+=head2 send ( ARG1 [, ARG2, ... ] )
+
+Append data onto the left end of the channel. Unlike C<enqueue>, the values
+are kept together for the receiving consumer, similarly to calling a method.
+Object (de)serialization is handled automatically.
+
+ $chnl->send('item');
+ $chnl->send([ list_ref ]);
+ $chnl->send([ hash_ref ]);
+
+ $chnl->send(qw/item1 item2 .../);
+ $chnl->send($id, [ list_ref ]);
+ $chnl->send($id, { hash_ref });
+
+=head2 recv
+
+=head2 recv_nb
+
+Blocking and non-blocking fetch methods from the right end of the channel.
+For the latter and when the channel is empty, returns C<an empty list> in
+list context or C<undef> in scalar context.
+
+ $item = $chnl->recv();
+ $array_ref = $chnl->recv();
+ $hash_ref = $chnl->recv();
+
+ ($item1, $item2) = $chnl->recv_nb();
+ ($id, $array_ref) = $chnl->recv_nb();
+ ($id, $hash_ref) = $chnl->recv_nb();
+
+=head1 TWO-WAY IPC - CONSUMER TO PRODUCER
+
+=head2 send2 ( ARG1 [, ARG2, ... ] )
+
+Append data onto the right end of the channel. Unlike C<enqueue>, the values
+are kept together for the receiving producer, similarly to calling a method.
+Object (de)serialization is handled automatically.
+
+ $chnl->send2('item');
+ $chnl->send2([ list_ref ]);
+ $chnl->send2([ hash_ref ]);
+
+ $chnl->send2(qw/item1 item2 .../);
+ $chnl->send2($id, [ list_ref ]);
+ $chnl->send2($id, { hash_ref });
+
+=head2 recv2
+
+=head2 recv2_nb
+
+Blocking and non-blocking fetch methods from the left end of the channel.
+For the latter and when the channel is empty, returns C<an empty list> in
+list context or C<undef> in scalar context.
+
+ $item = $chnl->recv2();
+ $array_ref = $chnl->recv2();
+ $hash_ref = $chnl->recv2();
+
+ ($item1, $item2) = $chnl->recv2_nb();
+ ($id, $array_ref) = $chnl->recv2_nb();
+ ($id, $hash_ref) = $chnl->recv2_nb();
+
+=head1 DEMONSTRATIONS
+
+=head2 Example 1 - threads
+
+C<MCE::Channel> was made to work efficiently with L<threads>. The reason is from
+using L<threads::shared> for locking versus L<MCE::Mutex>.
+
+ use strict;
+ use warnings;
+
+ use threads;
+ use MCE::Channel;
+
+ my $queue = MCE::Channel->new( impl => 'Threads' );
+ my $num_consumers = 10;
+
+ sub consumer {
+ # receive items
+ my $count = 0;
+ while ( my ($item1, $item2) = $queue->dequeue(2) ) {
+ $count += 2;
+ }
+ # send result
+ $queue->send2( threads->tid => $count );
+ }
+
+ threads->create('consumer') for 1 .. $num_consumers;
+
+ ## producer
+
+ $queue->enqueue($_, $_ * 2) for 1 .. 40000;
+ $queue->end;
+
+ my %results;
+ my $total = 0;
+
+ for ( 1 .. $num_consumers ) {
+ my ($id, $count) = $queue->recv2;
+ $results{$id} = $count;
+ $total += $count;
+ }
+
+ $_->join for threads->list;
+
+ print $results{$_}, "\n" for keys %results;
+ print "$total total\n\n";
+
+ __END__
+
+ # output
+
+ 8034
+ 8008
+ 8036
+ 8058
+ 7990
+ 7948
+ 8068
+ 7966
+ 7960
+ 7932
+ 80000 total
+
+=head2 Example 2 - MCE::Child
+
+The following is similarly threads-like for Perl lacking threads support.
+It spawns processes instead, thus requires the C<Mutex> channel implementation
+which is the default if omitted.
+
+ use strict;
+ use warnings;
+
+ use MCE::Child;
+ use MCE::Channel;
+
+ my $queue = MCE::Channel->new( impl => 'Mutex' );
+ my $num_consumers = 10;
+
+ sub consumer {
+ # receive items
+ my $count = 0;
+ while ( my ($item1, $item2) = $queue->dequeue(2) ) {
+ $count += 2;
+ }
+ # send result
+ $queue->send2( MCE::Child->pid => $count );
+ }
+
+ MCE::Child->create('consumer') for 1 .. $num_consumers;
+
+ ## producer
+
+ $queue->enqueue($_, $_ * 2) for 1 .. 40000;
+ $queue->end;
+
+ my %results;
+ my $total = 0;
+
+ for ( 1 .. $num_consumers ) {
+ my ($id, $count) = $queue->recv2;
+ $results{$id} = $count;
+ $total += $count;
+ }
+
+ $_->join for MCE::Child->list;
+
+ print $results{$_}, "\n" for keys %results;
+ print "$total total\n\n";
+
+=head2 Example 3 - Many producers
+
+Running with 2 or more producers requires setting the C<mp> option. Internally,
+this enables locking support for the left end of the channel. The C<mp> option
+applies to C<Mutex> and C<Threads> channel implementations only.
+
+Here, using the MCE facility for gathering the final count.
+
+ use strict;
+ use warnings;
+
+ use MCE::Flow;
+ use MCE::Channel;
+
+ my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
+ my $num_consumers = 10;
+
+ sub consumer {
+ # receive items
+ my $count = 0;
+ while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
+ $count += 2;
+ }
+ # send result
+ MCE->gather( MCE->wid => $count );
+ }
+
+ sub producer {
+ $queue->enqueue($_, $_ * 2) for 1 .. 20000;
+ }
+
+ ## run 2 producers and many consumers
+
+ MCE::Flow::init(
+ max_workers => [ 2, $num_consumers ],
+ task_name => [ 'producer', 'consumer' ],
+ task_end => sub {
+ my ($mce, $task_id, $task_name) = @_;
+ if ( $task_name eq 'producer' ) {
+ $queue->end;
+ }
+ }
+ );
+
+ # consumers call gather above (i.e. send a key-value pair),
+ # have MCE append to a hash
+
+ my %results = mce_flow \&producer, \&consumer;
+
+ MCE::Flow::finish;
+
+ my $total = 0;
+
+ for ( keys %results ) {
+ $total += $results{$_};
+ print $results{$_}, "\n";
+ }
+
+ print "$total total\n\n";
+
+=head2 Example 4 - Request input
+
+This demonstration configures a channel per consumer. Plus, a common channel
+for consumers to request the next input item. The C<Simple> implementation is
+specified for the individual channels whereas locking may be necessary for the
+C<$ready> channel. However, consumers do not incur reading and what is written
+is very small (i.e. atomic write is guaranteed by the OS). Thus, am safely
+choosing the C<Simple> implementation versus C<Mutex>.
+
+ use strict;
+ use warnings;
+
+ use MCE::Flow;
+ use MCE::Channel;
+
+ my $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g;
+ my $input_size = shift || 3000;
+
+ unless ($input_size =~ /\A\d+\z/) {
+ print {*STDERR} "usage: $prog_name [ size ]\n";
+ exit 1;
+ }
+
+ my $consumers = 4;
+
+ my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
+
+ my $ready = MCE::Channel->new( impl => 'Simple' );
+
+ sub producer {
+ my $id = 0;
+
+ # send the next input item upon request
+ for ( 0 .. $input_size - 1 ) {
+ my $chnl_num = $ready->recv2;
+ $chnls[ $chnl_num ]->send( ++$id, $_ );
+ }
+
+ # signal no more work
+ $_->send( 0, undef ) for @chnls;
+ }
+
+ sub consumer {
+ my $chnl_num = MCE->task_wid - 1;
+
+ while () {
+ # notify the producer ready for input
+ $ready->send2( $chnl_num );
+
+ # retrieve input data
+ my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
+
+ # leave loop if no more work
+ last unless $id;
+
+ # compute and send the result to the manager process
+ # ordered output requires an id (must be 1st argument)
+ MCE->gather( $id, [ $item, sqrt($item) ] );
+ }
+ }
+
+ # A custom 'ordered' output iterator for MCE's gather facility.
+ # It returns a closure block, expecting an ID for 1st argument.
+
+ sub output_iterator {
+ my %tmp; my $order_id = 1;
+
+ return sub {
+ my ( $id, $result ) = @_;
+ $tmp{ $id } = $result;
+
+ while () {
+ last unless exists $tmp{ $order_id };
+ $result = delete $tmp{ $order_id };
+ printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
+ $order_id++;
+ }
+ };
+ }
+
+ # Run one producer and many consumers.
+ # Output to be sent orderly to STDOUT.
+
+ MCE::Flow->init(
+ gather => output_iterator(),
+ max_workers => [ 1, $consumers ],
+ );
+
+ MCE::Flow->run( \&producer, \&consumer );
+ MCE::Flow->finish;
+
+ __END__
+
+ # Output
+
+ n: 0 sqrt(n): 0.000000
+ n: 1 sqrt(n): 1.000000
+ n: 2 sqrt(n): 1.414214
+ n: 3 sqrt(n): 1.732051
+ n: 4 sqrt(n): 2.000000
+ n: 5 sqrt(n): 2.236068
+ n: 6 sqrt(n): 2.449490
+ n: 7 sqrt(n): 2.645751
+ n: 8 sqrt(n): 2.828427
+ n: 9 sqrt(n): 3.000000
+ ...
+
+=head1 SEE ALSO
+
+=over 3
+
+=item * L<https://github.com/marioroy/mce-examples/tree/master/chameneos>
+
+=item * L<threads::lite>
+
+=back
+
+=head1 AUTHOR
+
+Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
+
+=head1 COPYRIGHT AND LICENSE
+
+Copyright (C) 2019 by Mario E. Roy
+
+MCE::Shared is released under the same license as Perl.
+
+See L<http://dev.perl.org/licenses/> for more information.
+
+=cut
+
diff --git a/lib/MCE/Channel/Mutex.pm b/lib/MCE/Channel/Mutex.pm
new file mode 100644
index 0000000..27696ef
--- /dev/null
+++ b/lib/MCE/Channel/Mutex.pm
@@ -0,0 +1,356 @@
+###############################################################################
+## ----------------------------------------------------------------------------
+## Channel for producer(s) and many consumers supporting processes and threads.
+##
+###############################################################################
+
+package MCE::Channel::Mutex;
+
+use strict;
+use warnings;
+
+no warnings qw( uninitialized once );
+
+our $VERSION = '1.843';
+
+use base 'MCE::Channel';
+use MCE::Mutex ();
+use bytes;
+
+my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
+my $freeze = MCE::Channel::_get_freeze();
+my $thaw = MCE::Channel::_get_thaw();
+
+sub new {
+ my ( $class, %obj ) = ( @_, impl => 'Mutex' );
+
+ $obj{init_pid} = MCE::Channel::_pid();
+ MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
+
+ # locking for the consumer side of the channel
+ $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
+
+ # optionally, support many-producers writing and reading
+ $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
+
+ bless \%obj, $class;
+
+ if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) {
+ MCE::Mutex::Channel::_save_for_global_destruction($obj{c_mutex});
+ MCE::Mutex::Channel::_save_for_global_destruction($obj{p_mutex})
+ if $obj{mp};
+ }
+
+ return \%obj;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Queue-like methods.
+##
+###############################################################################
+
+sub end {
+ my ( $self ) = @_;
+ return if $self->{ended};
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+ print { $self->{p_sock} } pack('i', -1);
+
+ $self->{ended} = 1;
+}
+
+sub enqueue {
+ my $self = shift;
+ return MCE::Channel::_ended('enqueue') if $self->{ended};
+
+ my $p_mutex = $self->{p_mutex};
+ $p_mutex->lock2 if $p_mutex;
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+
+ while ( @_ ) {
+ my $data;
+ if ( ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ shift ]), $data .= '1';
+ } else {
+ $data = shift, $data .= '0';
+ }
+ print { $self->{p_sock} } pack('i', length $data), $data;
+ }
+
+ $p_mutex->unlock2 if $p_mutex;
+
+ return 1;
+}
+
+sub dequeue {
+ my ( $self, $count ) = @_;
+ $count = 1 if ( !$count || $count < 1 );
+
+ if ( $count == 1 ) {
+ ( my $c_mutex = $self->{c_mutex} )->lock;
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+ MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end, $c_mutex->unlock;
+ return wantarray ? () : undef;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ $c_mutex->unlock;
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+ }
+ else {
+ my ( $plen, @ret );
+
+ ( my $c_mutex = $self->{c_mutex} )->lock;
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+
+ while ( $count-- ) {
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ last;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ push @ret, chop($data) ? @{ $thaw->($data) } : $data;
+ }
+
+ $c_mutex->unlock;
+
+ wantarray ? @ret : $ret[-1];
+ }
+}
+
+sub dequeue_nb {
+ my ( $self, $count ) = @_;
+ $count = 1 if ( !$count || $count < 1 );
+
+ my ( $plen, @ret );
+ ( my $c_mutex = $self->{c_mutex} )->lock;
+
+ while ( $count-- ) {
+ MCE::Util::_nonblocking( $self->{c_sock}, 1 );
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+ MCE::Util::_nonblocking( $self->{c_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len || $len < 0 ) {
+ $self->end if defined $len && $len < 0;
+ last;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ push @ret, chop($data) ? @{ $thaw->($data) } : $data;
+ }
+
+ $c_mutex->unlock;
+
+ wantarray ? @ret : $ret[-1];
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Methods for two-way communication; producer to consumer.
+##
+###############################################################################
+
+sub send {
+ my $self = shift;
+ return MCE::Channel::_ended('send') if $self->{ended};
+
+ my $data;
+ if ( @_ > 1 || ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ @_ ]), $data .= '1';
+ } else {
+ $data = $_[0], $data .= '0';
+ }
+
+ my $p_mutex = $self->{p_mutex};
+ $p_mutex->lock2 if $p_mutex;
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+ print { $self->{p_sock} } pack('i', length $data), $data;
+
+ $p_mutex->unlock2 if $p_mutex;
+
+ return 1;
+}
+
+sub recv {
+ my ( $self ) = @_;
+
+ ( my $c_mutex = $self->{c_mutex} )->lock;
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+ MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end, $c_mutex->unlock;
+ return wantarray ? () : undef;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ $c_mutex->unlock;
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+sub recv_nb {
+ my ( $self ) = @_;
+
+ ( my $c_mutex = $self->{c_mutex} )->lock;
+ MCE::Util::_nonblocking( $self->{c_sock}, 1 );
+ MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
+ MCE::Util::_nonblocking( $self->{c_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len || $len < 0 ) {
+ $self->end if defined $len && $len < 0;
+ $c_mutex->unlock;
+ return wantarray ? () : undef;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ $c_mutex->unlock;
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Methods for two-way communication; consumer to producer.
+##
+###############################################################################
+
+sub send2 {
+ my $self = shift;
+
+ my $data;
+ if ( @_ > 1 || ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ @_ ]), $data .= '1';
+ } else {
+ $data = $_[0], $data .= '0';
+ }
+
+ ( my $c_mutex = $self->{c_mutex} )->lock2;
+ MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
+ print { $self->{c_sock} } pack('i', length $data), $data;
+ $c_mutex->unlock2;
+
+ return 1;
+}
+
+sub recv2 {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ my $p_mutex = $self->{p_mutex};
+ $p_mutex->lock if $p_mutex;
+
+ MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
+
+ ( $p_mutex || $is_MSWin32 )
+ ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
+ : read( $self->{p_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+
+ ( $p_mutex || $is_MSWin32 )
+ ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
+ : read( $self->{p_sock}, $data, $len );
+
+ $p_mutex->unlock if $p_mutex;
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+sub recv2_nb {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ my $p_mutex = $self->{p_mutex};
+ $p_mutex->lock if $p_mutex;
+
+ MCE::Util::_nonblocking( $self->{p_sock}, 1 );
+
+ ( $p_mutex || $is_MSWin32 )
+ ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
+ : read( $self->{p_sock}, $plen, 4 );
+
+ MCE::Util::_nonblocking( $self->{p_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len ) {
+ $p_mutex->unlock if $p_mutex;
+ return wantarray ? () : undef;
+ }
+
+ ( $p_mutex || $is_MSWin32 )
+ ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
+ : read( $self->{p_sock}, $data, $len );
+
+ $p_mutex->unlock if $p_mutex;
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+1;
+
+__END__
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Module usage.
+##
+###############################################################################
+
+=head1 NAME
+
+MCE::Channel::Mutex - Channel for producer(s) and many consumers
+
+=head1 VERSION
+
+This document describes MCE::Channel::Mutex version 1.843
+
+=head1 DESCRIPTION
+
+A channel class providing queue-like and two-way communication
+for processes and threads. Locking is handled using MCE::Mutex.
+
+ use MCE::Channel;
+
+ # The default is tuned for one producer and many consumers.
+ my $chnl_a = MCE::Channel->new( impl => 'Mutex' );
+
+ # Specify the 'mp' option for safe use by two or more producers
+ # sending or recieving on the left side of the channel.
+ # E.g. C<->enqueue/->send> or C<->recv2/->recv2_nb>
+
+ my $chnl_b = MCE::Channel->new( impl => 'Mutex', mp => 1 );
+
+The API is described in L<MCE::Channel>.
+
+=head1 AUTHOR
+
+Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
+
+=cut
+
diff --git a/lib/MCE/Channel/Simple.pm b/lib/MCE/Channel/Simple.pm
new file mode 100644
index 0000000..7593a36
--- /dev/null
+++ b/lib/MCE/Channel/Simple.pm
@@ -0,0 +1,332 @@
+###############################################################################
+## ----------------------------------------------------------------------------
+## Channel tuned for one producer and one consumer involving no locking.
+##
+###############################################################################
+
+package MCE::Channel::Simple;
+
+use strict;
+use warnings;
+
+no warnings qw( uninitialized once );
+
+our $VERSION = '1.843';
+
+use base 'MCE::Channel';
+use bytes;
+
+my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
+my $freeze = MCE::Channel::_get_freeze();
+my $thaw = MCE::Channel::_get_thaw();
+
+sub new {
+ my ( $class, %obj ) = ( @_, impl => 'Simple' );
+
+ $obj{init_pid} = MCE::Channel::_pid();
+ MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
+
+ return bless \%obj, $class;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Queue-like methods.
+##
+###############################################################################
+
+sub end {
+ my ( $self ) = @_;
+ return if $self->{ended};
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+ print { $self->{p_sock} } pack('i', -1);
+
+ $self->{ended} = 1;
+}
+
+sub enqueue {
+ my $self = shift;
+ return MCE::Channel::_ended('enqueue') if $self->{ended};
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+
+ while ( @_ ) {
+ my $data;
+ if ( ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ shift ]), $data .= '1';
+ } else {
+ $data = shift, $data .= '0';
+ }
+ print { $self->{p_sock} } pack('i', length $data) . $data;
+ }
+
+ return 1;
+}
+
+sub dequeue {
+ my ( $self, $count ) = @_;
+ $count = 1 if ( !$count || $count < 1 );
+
+ if ( $count == 1 ) {
+ my ( $plen, $data );
+
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+
+ $is_MSWin32
+ ? sysread( $self->{c_sock}, $plen, 4 )
+ : read( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ return wantarray ? () : undef;
+ }
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
+ : read( $self->{c_sock}, $data, $len );
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+ }
+ else {
+ my ( $plen, @ret );
+
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+
+ while ( $count-- ) {
+ my $data;
+
+ $is_MSWin32
+ ? sysread( $self->{c_sock}, $plen, 4 )
+ : read( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ last;
+ }
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
+ : read( $self->{c_sock}, $data, $len );
+
+ push @ret, chop($data) ? @{ $thaw->($data) } : $data;
+ }
+
+ wantarray ? @ret : $ret[-1];
+ }
+}
+
+sub dequeue_nb {
+ my ( $self, $count ) = @_;
+ $count = 1 if ( !$count || $count < 1 );
+
+ my ( $plen, @ret );
+
+ while ( $count-- ) {
+ my $data;
+
+ MCE::Util::_nonblocking( $self->{c_sock}, 1 );
+
+ $is_MSWin32
+ ? sysread( $self->{c_sock}, $plen, 4 )
+ : read( $self->{c_sock}, $plen, 4 );
+
+ MCE::Util::_nonblocking( $self->{c_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len || $len < 0 ) {
+ $self->end if defined $len && $len < 0;
+ last;
+ }
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
+ : read( $self->{c_sock}, $data, $len );
+
+ push @ret, chop($data) ? @{ $thaw->($data) } : $data;
+ }
+
+ wantarray ? @ret : $ret[-1];
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Methods for two-way communication; producer(s) to consumers.
+##
+###############################################################################
+
+sub send {
+ my $self = shift;
+ return MCE::Channel::_ended('send') if $self->{ended};
+
+ my $data;
+ if ( @_ > 1 || ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ @_ ]), $data .= '1';
+ } else {
+ $data = $_[0], $data .= '0';
+ }
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+ print { $self->{p_sock} } pack('i', length $data) . $data;
+
+ return 1;
+}
+
+sub recv {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+
+ $is_MSWin32
+ ? sysread( $self->{c_sock}, $plen, 4 )
+ : read( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ return wantarray ? () : undef;
+ }
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
+ : read( $self->{c_sock}, $data, $len );
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+sub recv_nb {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ MCE::Util::_nonblocking( $self->{c_sock}, 1 );
+
+ $is_MSWin32
+ ? sysread( $self->{c_sock}, $plen, 4 )
+ : read( $self->{c_sock}, $plen, 4 );
+
+ MCE::Util::_nonblocking( $self->{c_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len || $len < 0 ) {
+ $self->end if defined $len && $len < 0;
+ return wantarray ? () : undef;
+ }
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
+ : read( $self->{c_sock}, $data, $len );
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Methods for two-way communication; consumers to producer(s).
+##
+###############################################################################
+
+sub send2 {
+ my $self = shift;
+
+ my $data;
+ if ( @_ > 1 || ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ @_ ]), $data .= '1';
+ } else {
+ $data = $_[0], $data .= '0';
+ }
+
+ MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
+ print { $self->{c_sock} } pack('i', length $data) . $data;
+
+ return 1;
+}
+
+sub recv2 {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
+
+ $is_MSWin32
+ ? sysread( $self->{p_sock}, $plen, 4 )
+ : read( $self->{p_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
+ : read( $self->{p_sock}, $data, $len );
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+sub recv2_nb {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ MCE::Util::_nonblocking( $self->{p_sock}, 1 );
+
+ $is_MSWin32
+ ? sysread( $self->{p_sock}, $plen, 4 )
+ : read( $self->{p_sock}, $plen, 4 );
+
+ MCE::Util::_nonblocking( $self->{p_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ return wantarray ? () : undef unless $len;
+
+ $is_MSWin32
+ ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
+ : read( $self->{p_sock}, $data, $len );
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+1;
+
+__END__
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Module usage.
+##
+###############################################################################
+
+=head1 NAME
+
+MCE::Channel::Simple - Channel tuned for one producer and one consumer
+
+=head1 VERSION
+
+This document describes MCE::Channel::Simple version 1.843
+
+=head1 DESCRIPTION
+
+A channel class providing queue-like and two-way communication
+for one process or thread on either end; no locking needed.
+
+ use MCE::Channel;
+
+ my $chnl = MCE::Channel->new( impl => 'Simple' );
+
+The API is described in L<MCE::Channel>.
+
+=head1 AUTHOR
+
+Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
+
+=cut
+
diff --git a/lib/MCE/Channel/Threads.pm b/lib/MCE/Channel/Threads.pm
new file mode 100644
index 0000000..e41b606
--- /dev/null
+++ b/lib/MCE/Channel/Threads.pm
@@ -0,0 +1,361 @@
+###############################################################################
+## ----------------------------------------------------------------------------
+## Channel for producer(s) and many consumers supporting threads only.
+##
+###############################################################################
+
+package MCE::Channel::Threads;
+
+use strict;
+use warnings;
+
+no warnings qw( uninitialized once );
+
+our $VERSION = '1.843';
+
+use threads;
+use threads::shared;
+
+use base 'MCE::Channel';
+use bytes;
+
+my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
+my $freeze = MCE::Channel::_get_freeze();
+my $thaw = MCE::Channel::_get_thaw();
+
+sub new {
+ my ( $class, %obj ) = ( @_, impl => 'Threads' );
+
+ $obj{init_pid} = MCE::Channel::_pid();
+ MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
+
+ # locking for the consumer side of the channel
+ $obj{cr_mutex} = threads::shared::share( my $cr_mutex );
+ $obj{cw_mutex} = threads::shared::share( my $cw_mutex );
+
+ # optionally, support many-producers writing and reading
+ $obj{pr_mutex} = threads::shared::share( my $pr_mutex ) if $obj{mp};
+ $obj{pw_mutex} = threads::shared::share( my $pw_mutex ) if $obj{mp};
+
+ return bless \%obj, $class;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Queue-like methods.
+##
+###############################################################################
+
+sub end {
+ my ( $self ) = @_;
+ return if $self->{ended};
+
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+ print { $self->{p_sock} } pack('i', -1);
+
+ $self->{ended} = 1;
+}
+
+sub enqueue {
+ my $self = shift;
+ return MCE::Channel::_ended('enqueue') if $self->{ended};
+
+ {
+ CORE::lock $self->{pw_mutex} if $self->{pw_mutex};
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+
+ while ( @_ ) {
+ my $data;
+ if ( ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ shift ]), $data .= '1';
+ } else {
+ $data = shift, $data .= '0';
+ }
+ print { $self->{p_sock} } pack('i', length $data), $data;
+ }
+ }
+
+ return 1;
+}
+
+sub dequeue {
+ my ( $self, $count ) = @_;
+ $count = 1 if ( !$count || $count < 1 );
+
+ if ( $count == 1 ) {
+ my ( $plen, $data );
+
+ {
+ CORE::lock $self->{cr_mutex};
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ return wantarray ? () : undef;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, $data, $len );
+ }
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+ }
+ else {
+ my ( $plen, @ret );
+
+ {
+ CORE::lock $self->{cr_mutex};
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+
+ while ( $count-- ) {
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ last;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ push @ret, chop($data) ? @{ $thaw->($data) } : $data;
+ }
+ }
+
+ wantarray ? @ret : $ret[-1];
+ }
+}
+
+sub dequeue_nb {
+ my ( $self, $count ) = @_;
+ $count = 1 if ( !$count || $count < 1 );
+
+ my ( $plen, @ret );
+
+ {
+ CORE::lock $self->{cr_mutex};
+
+ while ( $count-- ) {
+ MCE::Util::_nonblocking( $self->{c_sock}, 1 );
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+ MCE::Util::_nonblocking( $self->{c_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len || $len < 0 ) {
+ $self->end if defined $len && $len < 0;
+ last;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, my($data), $len );
+ push @ret, chop($data) ? @{ $thaw->($data) } : $data;
+ }
+ }
+
+ wantarray ? @ret : $ret[-1];
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Methods for two-way communication; producer(s) to consumers.
+##
+###############################################################################
+
+sub send {
+ my $self = shift;
+ return MCE::Channel::_ended('send') if $self->{ended};
+
+ my $data;
+ if ( @_ > 1 || ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ @_ ]), $data .= '1';
+ } else {
+ $data = $_[0], $data .= '0';
+ }
+
+ {
+ CORE::lock $self->{pw_mutex} if $self->{pw_mutex};
+ MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
+ print { $self->{p_sock} } pack('i', length $data), $data;
+ }
+
+ return 1;
+}
+
+sub recv {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ {
+ CORE::lock $self->{cr_mutex};
+ MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+ if ( $len < 0 ) {
+ $self->end;
+ return wantarray ? () : undef;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, $data, $len );
+ }
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+sub recv_nb {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ {
+ CORE::lock $self->{cr_mutex};
+ MCE::Util::_nonblocking( $self->{c_sock}, 1 );
+ MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
+ MCE::Util::_nonblocking( $self->{c_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+ if ( !$len || $len < 0 ) {
+ $self->end if defined $len && $len < 0;
+ return wantarray ? () : undef;
+ }
+
+ MCE::Channel::_read( $self->{c_sock}, $data, $len );
+ }
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Methods for two-way communication; consumers to producer(s).
+##
+###############################################################################
+
+sub send2 {
+ my $self = shift;
+
+ my $data;
+ if ( @_ > 1 || ref $_[0] || !defined $_[0] ) {
+ $data = $freeze->([ @_ ]), $data .= '1';
+ } else {
+ $data = $_[0], $data .= '0';
+ }
+
+ {
+ CORE::lock $self->{cw_mutex};
+ MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
+ print { $self->{c_sock} } pack('i', length $data), $data;
+ }
+
+ return 1;
+}
+
+sub recv2 {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ {
+ my $pr_mutex = $self->{pr_mutex};
+ CORE::lock $pr_mutex if $pr_mutex;
+
+ MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
+
+ ( $pr_mutex || $is_MSWin32 )
+ ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
+ : read( $self->{p_sock}, $plen, 4 );
+
+ my $len = unpack('i', $plen);
+
+ ( $pr_mutex || $is_MSWin32 )
+ ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
+ : read( $self->{p_sock}, $data, $len );
+ }
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+sub recv2_nb {
+ my ( $self ) = @_;
+ my ( $plen, $data );
+
+ {
+ my $pr_mutex = $self->{pr_mutex};
+ CORE::lock $pr_mutex if $pr_mutex;
+
+ MCE::Util::_nonblocking( $self->{p_sock}, 1 );
+
+ ( $pr_mutex || $is_MSWin32 )
+ ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
+ : read( $self->{p_sock}, $plen, 4 );
+
+ MCE::Util::_nonblocking( $self->{p_sock}, 0 );
+
+ my $len; $len = unpack('i', $plen) if $plen;
+
+ return wantarray ? () : undef unless $len;
+
+ ( $pr_mutex || $is_MSWin32 )
+ ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
+ : read( $self->{p_sock}, $data, $len );
+ }
+
+ chop( $data )
+ ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1]
+ : wantarray ? ( $data ) : $data;
+}
+
+1;
+
+__END__
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Module usage.
+##
+###############################################################################
+
+=head1 NAME
+
+MCE::Channel::Threads - Channel for producer(s) and many consumers
+
+=head1 VERSION
+
+This document describes MCE::Channel::Threads version 1.843
+
+=head1 DESCRIPTION
+
+A channel class providing queue-like and two-way communication
+for threads only. Locking is handled using threads::shared.
+
+ use MCE::Channel;
+
+ # The default is tuned for one producer and many consumers.
+ my $chnl_a = MCE::Channel->new( impl => 'Threads' );
+
+ # Specify the 'mp' option for safe use by two or more producers
+ # sending or recieving on the left side of the channel.
+ # E.g. C<->enqueue/->send> or C<->recv2/->recv2_nb>
+
+ my $chnl_b = MCE::Channel->new( impl => 'Threads', mp => 1 );
+
+The API is described in L<MCE::Channel>.
+
+=head1 LIMITATIONS
+
+The t/04_channel_threads tests are disabled on Unix platforms for Perl
+less than 5.10.1. Basically, the MCE::Channel::Threads implementation
+is not supported on older Perls unless the OS vendor applied upstream
+patches (i.e. works on RedHat/CentOS 5.x running Perl 5.8.x).
+
+=head1 AUTHOR
+
+Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
+
+=cut
+
diff --git a/lib/MCE/Child.pm b/lib/MCE/Child.pm
new file mode 100644
index 0000000..38aebf2
--- /dev/null
+++ b/lib/MCE/Child.pm
@@ -0,0 +1,1940 @@
+###############################################################################
+## ----------------------------------------------------------------------------
+## A threads-like parallelization module compatible with Perl 5.8.
+##
+###############################################################################
+
+use strict;
+use warnings;
+
+no warnings qw( threads recursion uninitialized once redefine );
+
+package MCE::Child;
+
+our $VERSION = '1.843';
+
+## no critic (BuiltinFunctions::ProhibitStringyEval)
+## no critic (Subroutines::ProhibitExplicitReturnUndef)
+## no critic (Subroutines::ProhibitSubroutinePrototypes)
+## no critic (TestingAndDebugging::ProhibitNoStrict)
+
+use MCE::Signal ();
+use MCE::Channel;
+use Time::HiRes 'sleep';
+use bytes;
+
+use overload (
+ q(==) => \&equal,
+ q(!=) => sub { !equal(@_) },
+ fallback => 1
+);
+
+sub import {
+ no strict 'refs'; no warnings 'redefine';
+ *{ caller().'::mce_child' } = \&child;
+ return;
+}
+
+## The POSIX module has many symbols. Try not loading it simply
+## to have WNOHANG. The following covers most platforms.
+
+use constant {
+ _WNOHANG => ( $INC{'POSIX.pm'} )
+ ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
+};
+
+my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
+
+my $_freeze = MCE::Channel::_get_freeze();
+my $_thaw = MCE::Channel::_get_thaw();
+
+my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
+my $_has_threads = $INC{'threads.pm'} ? 1 : 0;
+my $_tid = $_has_threads ? threads->tid() : 0;
+
+sub CLONE {
+ $_tid = threads->tid(), &_clear() if $_has_threads;
+}
+
+sub _clear {
+ %{ $_LIST } = ();
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Init routine.
+##
+###############################################################################
+
+bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
+
+sub init {
+ shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
+
+ # -- options ----------------------------------------------------------
+ # max_workers child_timeout posix_exit on_start on_finish void_context
+ # ---------------------------------------------------------------------
+
+ my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
+ my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
+
+ @_ = ();
+
+ $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
+ $mngd->{WRK_ID} = $$;
+
+ &_force_reap($pkg), $_DATA->{$pkg}->clear() if exists $_LIST->{$pkg};
+
+ # Start the shared-manager process if not running.
+ MCE::Shared->start() if $INC{'MCE/Shared.pm'};
+
+ if ( !exists $_LIST->{$pkg} ) {
+ my $chnl = MCE::Channel->new( impl => 'Mutex' );
+ $_LIST->{ $pkg } = MCE::Child::_ordhash->new();
+ $_DELY->{ $pkg } = MCE::Child::_delay->new( $chnl );
+ $_DATA->{ $pkg } = MCE::Child::_hash->new( $chnl );
+ $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
+ $_DATA->{"$pkg:id" } = 0;
+ }
+
+ if ( !exists $mngd->{posix_exit} ) {
+ $mngd->{posix_exit} = 1 if (
+ ( $_has_threads && $_tid ) || $INC{'Mojo/IOLoop.pm'} ||
+ $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
+ $INC{'Prima.pm'} || $INC{'Tk.pm'} || $INC{'Wx.pm'} ||
+ $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'} ||
+ $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} ||
+ $INC{'Win32/GUI.pm'} || $INC{'stfl.pm'}
+ );
+ }
+
+ if ( $mngd->{max_workers} ) {
+ my $cpus = $mngd->{max_workers};
+ $cpus = MCE::Util::get_ncpu() if $cpus eq 'auto';
+ $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
+ $mngd->{max_workers} = int($cpus);
+ }
+
+ if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
+ local $@; eval 'require Net::HTTP; require Net::HTTPS';
+ }
+
+ require POSIX
+ if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
+
+ return;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## 'new', 'child (mce_child)', and 'create' for threads-like similarity.
+##
+###############################################################################
+
+## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
+
+*new = \&create, *tid = \&pid;
+
+## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
+## Tip found in threads::async.
+
+sub child (&;@) {
+ goto &create;
+}
+
+sub create {
+ my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
+ # construct mngd internally on first use unless defined
+ init(); $_MNGD->{ "$$.$_tid.".caller() };
+ };
+
+ shift if ( $_[0] eq __PACKAGE__ );
+
+ # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
+
+ my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
+
+ $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
+ $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
+
+ my $func = shift; $func = caller().'::'.$func
+ if ( !ref $func && length $func && index($func,':') < 0 );
+
+ if ( !defined $func ) {
+ local $\; print {*STDERR} "code function is not specified or valid\n";
+ return undef;
+ }
+
+ my ( $list, $max_workers, $pkg ) = (
+ $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
+ );
+
+ $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) > 2e9 );
+
+ if ( $max_workers ) {
+ local $!;
+
+ # Reap completed child processes.
+ $_DATA->{$pkg}->reapdata;
+
+ for my $wrk_id ( keys %{ $list->[0] } ) {
+ waitpid($wrk_id, _WNOHANG) or next;
+ _reap_child($list->del($wrk_id), 0);
+ }
+
+ # Wait for a slot if saturated.
+ if ( keys(%{ $list->[0] }) >= $max_workers ) {
+ my $count = keys(%{ $list->[0] }) - $max_workers + 1;
+ _wait_one($pkg) for 1 .. $count;
+ }
+ }
+
+ # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
+
+ local $SIG{TTIN} unless $_is_MSWin32;
+ local $SIG{TTOU} unless $_is_MSWin32;
+ local $SIG{WINCH} unless $_is_MSWin32;
+
+ my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
+ my $pid = fork();
+
+ if ( !defined $pid ) { # error
+ local $\; print {*STDERR} "fork error: $!\n";
+
+ return undef;
+ }
+ elsif ( $pid ) { # parent
+ $self->{WRK_ID} = $pid, $list->set($pid, $self);
+ $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
+
+ return $self;
+ }
+
+ %{ $_LIST } = (), $_SELF = $self; # child
+
+ if ( UNIVERSAL::can('Prima', 'cleanup') ) {
+ no warnings 'redefine'; local $@; eval '*Prima::cleanup = sub {}';
+ }
+
+ MCE::Shared::init($id) if $INC{'MCE/Shared.pm'};
+
+ # Sets the seed of the base generator uniquely between workers.
+ # The new seed is computed using the current seed and ID value.
+ # One may set the seed at the application level for predictable
+ # results. Ditto for Math::Prime::Util, Math::Random, and
+ # Math::Random::MT::Auto.
+
+ srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
+
+ if ( $INC{'Math/Prime/Util.pm'} ) {
+ Math::Prime::Util::srand(
+ abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
+ );
+ }
+
+ if ( $INC{'Math/Random.pm'} ) {
+ my $cur_seed = Math::Random::random_get_seed();
+ my $new_seed = ($cur_seed < 1073741781)
+ ? $cur_seed + ((abs($id) * 10000) % 1073741780)
+ : $cur_seed - ((abs($id) * 10000) % 1073741780);
+
+ Math::Random::random_set_seed($new_seed, $new_seed);
+ }
+
+ if ( $INC{'Math/Random/MT/Auto.pm'} ) {
+ my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
+ my $new_seed = ($cur_seed < 1073741781)
+ ? $cur_seed + ((abs($id) * 10000) % 1073741780)
+ : $cur_seed - ((abs($id) * 10000) % 1073741780);
+
+ Math::Random::MT::Auto::set_seed($new_seed);
+ }
+
+ _dispatch($mngd, $func, \@args);
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Public methods.
+##
+###############################################################################
+
+sub equal {
+ return 0 unless ( ref $_[0] && ref $_[1] );
+ $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
+}
+
+sub error {
+ _croak('Usage: $child->error()') unless ref( my $self = $_[0] );
+ $self->join() if ( !exists $self->{JOINED} );
+ $self->{ERROR} || undef;
+}
+
+sub exit {
+ shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
+
+ my ( $self ) = ( ref $_[0] ? shift : $_SELF );
+ my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
+
+ if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
+ MCE::Child->finish('MCE'); CORE::exit(@_);
+ }
+ elsif ( $wrk_id == $$ ) {
+ alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
+ $_DATA->{$pkg}->set('R'.$wrk_id, @res ? $_freeze->(\@res) : '');
+ die "Child exited ($?)\n";
+ _exit($?); # not reached
+ }
+
+ return $self if ( exists $self->{JOINED} );
+
+ if ( exists $_DATA->{$pkg} ) {
+ sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
+ } else {
+ sleep 0.030;
+ }
+
+ if ($_is_MSWin32) {
+ CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
+ } else {
+ CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
+ }
+
+ $self;
+}
+
+sub finish {
+ _croak('Usage: MCE::Child->finish()') if ref($_[0]);
+ shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
+
+ my $pkg = defined($_[0]) ? $_[0] : caller();
+
+ if ( $pkg eq 'MCE' ) {
+ for my $key ( keys %{ $_LIST } ) { MCE::Child->finish($key); }
+ }
+ elsif ( exists $_LIST->{$pkg} ) {
+ return if $MCE::Signal::KILLED;
+
+ if ( exists $_DELY->{$pkg} ) {
+ &_force_reap($pkg);
+ delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
+ delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
+ delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
+ }
+ }
+
+ @_ = ();
+
+ return;
+}
+
+sub is_joinable {
+ _croak('Usage: $child->is_joinable()') unless ref( my $self = $_[0] );
+ my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
+
+ if ( $wrk_id == $$ ) {
+ '';
+ }
+ elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
+ return undef if ( exists $self->{JOINED} );
+ local $!; $_DATA->{$pkg}->reapdata;
+ ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
+ _reap_child($_LIST->{$pkg}->del($self->{WRK_ID}), 0);
+ 1;
+ };
+ }
+ else {
+ return undef if ( exists $self->{JOINED} );
+ $_DATA->{$pkg}->exists('R'.$wrk_id) ? 1 : '';
+ }
+}
+
+sub is_running {
+ _croak('Usage: $child->is_running()') unless ref( my $self = $_[0] );
+ my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
+
+ if ( $wrk_id == $$ ) {
+ 1;
+ }
+ elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
+ return undef if ( exists $self->{JOINED} );
+ local $!; $_DATA->{$pkg}->reapdata;
+ ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
+ _reap_child($_LIST->{$pkg}->del($self->{WRK_ID}), 0);
+ '';
+ };
+ }
+ else {
+ return undef if ( exists $self->{JOINED} );
+ $_DATA->{$pkg}->exists('R'.$wrk_id) ? '' : 1;
+ }
+}
+
+sub join {
+ _croak('Usage: $child->join()') unless ref( my $self = $_[0] );
+ my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
+
+ if ( exists $self->{JOINED} ) {
+ _croak('Child already joined') unless exists( $self->{RESULT} );
+
+ return ( defined wantarray )
+ ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
+ : ();
+ }
+
+ if ( $wrk_id == $$ ) {
+ _croak('Cannot join self');
+ }
+ elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
+ _reap_child($_LIST->{$pkg}->del($wrk_id), 1);
+ }
+ else {
+ sleep 0.3 until ( $_DATA->{$pkg}->exists('R'.$wrk_id) );
+ _reap_child($self, 0);
+ }
+
+ ( defined wantarray )
+ ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
+ : ();
+}
+
+sub kill {
+ _croak('Usage: $child->kill()') unless ref( my $self = $_[0] );
+ my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
+
+ if ( $wrk_id == $$ ) {
+ CORE::kill($signal || 'INT', $$);
+ return $self;
+ }
+ if ( $self->{MGR_ID} eq "$$.$_tid" ) {
+ return $self if ( exists $self->{JOINED} );
+ if ( exists $_DATA->{$pkg} ) {
+ sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
+ } else {
+ sleep 0.030;
+ }
+ }
+
+ CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
+
+ $self;
+}
+
+sub list {
+ _croak('Usage: MCE::Child->list()') if ref($_[0]);
+ my $pkg = "$$.$_tid.".caller();
+
+ ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
+}
+
+sub list_joinable {
+ _croak('Usage: MCE::Child->list_joinable()') if ref($_[0]);
+ my $pkg = "$$.$_tid.".caller();
+
+ return () unless ( my $list = $_LIST->{$pkg} );
+ local ($!, $?, $_);
+
+ $_DATA->{$pkg}->reapdata;
+
+ map {
+ ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
+ _reap_child($list->del($_->{WRK_ID}), 0);
+ $_;
+ };
+ }
+ $list->vals();
+}
+
+sub list_running {
+ _croak('Usage: MCE::Child->list_running()') if ref($_[0]);
+ my $pkg = "$$.$_tid.".caller();
+
+ return () unless ( my $list = $_LIST->{$pkg} );
+ local ($!, $?, $_);
+
+ $_DATA->{$pkg}->reapdata;
+
+ map {
+ ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
+ _reap_child($list->del($_->{WRK_ID}), 0);
+ ();
+ };
+ }
+ $list->vals();
+}
+
+sub max_workers {
+ _croak('Usage: MCE::Child->max_workers()') if ref($_[0]);
+ my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
+ # construct mngd internally on first use unless defined
+ init(); $_MNGD->{ "$$.$_tid.".caller() };
+ };
+ shift if ( $_[0] eq __PACKAGE__ );
+
+ if ( @_ ) {
+ $mngd->{max_workers} = shift;
+ if ( $mngd->{max_workers} ) {
+ my $cpus = $mngd->{max_workers};
+ $cpus = MCE::Util::get_ncpu() if $cpus eq 'auto';
+ $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
+ $mngd->{max_workers} = int($cpus);
+ }
+ }
+
+ $mngd->{max_workers};
+}
+
+sub pending {
+ _croak('Usage: MCE::Child->pending()') if ref($_[0]);
+ my $pkg = "$$.$_tid.".caller();
+
+ ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
+}
+
+sub pid {
+ ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
+}
+
+sub result {
+ _croak('Usage: $child->result()') unless ref( my $self = $_[0] );
+ return $self->join() if ( !exists $self->{JOINED} );
+
+ _croak('Child already joined') unless exists( $self->{RESULT} );
+ wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
+}
+
+sub self {
+ ref($_[0]) ? $_[0] : $_SELF;
+}
+
+sub wait_all {
+ _croak('Usage: MCE::Child->wait_all()') if ref($_[0]);
+ my $pkg = "$$.$_tid.".caller();
+
+ return wantarray ? () : 0
+ if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
+
+ local $_; ( wantarray )
+ ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
+ : map { $_->join(); () } $_LIST->{$pkg}->vals();
+}
+
+*waitall = \&wait_all; # compatibility
+
+sub wait_one {
+ _croak('Usage: MCE::Child->wait_one()') if ref($_[0]);
+ my $pkg = "$$.$_tid.".caller();
+
+ return undef
+ if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
+
+ _wait_one($pkg);
+}
+
+*waitone = \&wait_one; # compatibility
+
+sub yield {
+ _croak('Usage: MCE::Child->yield()') if ref($_[0]);
+ shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
+ my $pkg = $_SELF->{PKG};
+
+ return unless ( my $mngd = $_MNGD->{$pkg} );
+
+ ( $INC{'Coro/AnyEvent.pm'} )
+ ? Coro::AnyEvent::sleep( $_DELY->{$pkg}->seconds(@_) )
+ : sleep $_DELY->{$pkg}->seconds(@_);
+
+ return;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Private methods.
+##
+###############################################################################
+
+sub _croak {
+ if ( $INC{'MCE.pm'} ) {
+ goto &MCE::_croak;
+ }
+ else {
+ $SIG{__DIE__} = \&MCE::Signal::_die_handler;
+ $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
+
+ $\ = undef; goto &Carp::croak;
+ }
+}
+
+sub _dispatch {
+ my ( $mngd, $func, $args ) = @_;
+ $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$;
+
+ $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
+ $SIG{TERM} = $SIG{INT} = $SIG{HUP} = \&_trap;
+ $SIG{QUIT} = \&_quit;
+
+ {
+ local $!;
+ (*STDERR)->autoflush(1) if defined( fileno *STDERR );
+ (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
+ }
+
+ # Run task.
+ $_DATA->{ $_SELF->{PKG} }->set('S'.$$, ''), $? = 0;
+
+ my $child_timeout = ( exists $_SELF->{child_timeout} )
+ ? $_SELF->{child_timeout} : $mngd->{child_timeout};
+
+ my $void_context = ( exists $_SELF->{void_context} )
+ ? $_SELF->{void_context} : $mngd->{void_context};
+
+ my @res; local $SIG{'ALRM'} = sub { alarm 0; die "Child timed out\n" };
+
+ if ( $void_context ) {
+ no strict 'refs';
+ eval {
+ alarm( $child_timeout || 0 );
+ $func->( @{ $args } );
+ };
+ }
+ else {
+ no strict 'refs';
+ @res = eval {
+ alarm( $child_timeout || 0 );
+ $func->( @{ $args } );
+ };
+ }
+
+ alarm 0; _exit($?) if ( $@ && $@ =~ /^Child exited \(\S+\)$/ );
+
+ if ( $@ ) {
+ my $err = $@;
+ $? = 1, $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err);
+ warn "Child $$ terminated abnormally: reason $err\n" if (
+ $err ne "Child timed out" && !$mngd->{on_finish}
+ );
+ }
+
+ $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? $_freeze->(\@res) : '');
+
+ _exit($?);
+}
+
+sub _exit {
+ my ( $exit_status ) = @_;
+
+ # Check for nested workers not yet joined.
+ if ( !$_SELF->{SIGNALED} ) {
+ MCE::Child->finish('MCE') if ( keys %{ $_LIST } > 0 );
+ MCE::Hobo->finish('MCE') if ( $INC{'MCE/Hobo.pm'} );
+ }
+
+ # Exit child process.
+ $SIG{__DIE__} = sub { } unless $_tid;
+ $SIG{__WARN__} = sub { };
+
+ threads->exit($exit_status) if ( $_has_threads && $_is_MSWin32 );
+
+ if ( ! $_tid ) {
+ $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub {
+ $SIG{$_[0]} = $SIG{INT} = sub { };
+ CORE::kill($_[0], getppid()) if ( $_[0] eq 'INT' && !$_is_MSWin32 );
+ CORE::kill('KILL', $$);
+ };
+ }
+
+ my $posix_exit = ( exists $_SELF->{posix_exit} )
+ ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
+
+ if ( $posix_exit && !$_is_MSWin32 ) {
+ eval { MCE::Mutex::Channel::_destroy() };
+ POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
+ CORE::kill('KILL', $$);
+ }
+
+ CORE::exit($exit_status);
+}
+
+sub _force_reap {
+ my ( $count, $pkg ) = ( 0, @_ );
+ return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
+
+ for my $child ( $_LIST->{$pkg}->vals() ) {
+ if ( $child->is_running() ) {
+ CORE::kill('KILL', $child->pid())
+ if CORE::kill('ZERO', $child->pid());
+ $count++;
+ }
+ }
+
+ $_LIST->{$pkg}->clear();
+
+ warn "Finished with active child processes [$pkg] ($count)\n"
+ if ( $count && !$_is_MSWin32 );
+
+ return;
+}
+
+sub _quit {
+ my ( $name ) = @_;
+ $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
+
+ $SIG{$name} = sub {}, CORE::kill($name, -$$)
+ if ( exists $SIG{$name} );
+
+ _exit(0);
+}
+
+sub _reap_child {
+ my ( $child, $wait_flag ) = @_;
+ local @_ = $_DATA->{ $child->{PKG} }->get( $child->{WRK_ID}, $wait_flag );
+
+ ( $child->{ERROR}, $child->{RESULT}, $child->{JOINED} ) =
+ ( pop || '', length $_[0] ? $_thaw->(pop) : [], 1 );
+
+ if ( my $on_finish = $_MNGD->{ $child->{PKG} }{on_finish} ) {
+ my ( $exit, $err ) = ( $? || 0, $child->{ERROR} );
+ my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
+
+ if ( ( $code > 100 || $sig == 9 ) && !$err ) {
+ $code = 2, $sig = 1, $err = 'received SIGHUP' if $code == 101;
+ $code = 2, $sig = 2, $err = 'received SIGINT' if $code == 102;
+ $code = 2, $sig = 15, $err = 'received SIGTERM' if $code == 115;
+ $code = 2, $sig = 9, $err = 'received SIGKILL' if $sig == 9;
+ }
+
+ $on_finish->(
+ $child->{WRK_ID}, $code, $child->{ident}, $sig, $err,
+ @{ $child->{RESULT} }
+ );
+ }
+
+ return;
+}
+
+sub _trap {
+ my ( $exit_status, $name ) = ( 2, @_ );
+ $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
+
+ $SIG{$name} = sub {}, CORE::kill($name, -$$)
+ if ( exists $SIG{$name} );
+
+ if ( $name eq 'HUP' ) { $exit_status = 101 }
+ elsif ( $name eq 'INT' ) { $exit_status = 102 }
+ elsif ( $name eq 'TERM' ) { $exit_status = 115 }
+
+ _exit($exit_status);
+}
+
+sub _wait_one {
+ my ( $pkg ) = @_;
+ my ( $list, $data ) = ( $_LIST->{$pkg}, $_DATA->{$pkg} );
+ my ( $self, $wrk_id, $found ); local $!;
+
+ while () {
+ for my $child ( $list->vals ) {
+ $wrk_id = $child->{WRK_ID};
+ $found = $data->exists('R'.$wrk_id);
+ waitpid($wrk_id, 0), $self = $list->del($wrk_id), last if $found;
+
+ $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
+ }
+ last if $self;
+ sleep 0.015;
+ }
+
+ _reap_child($self, 0);
+
+ $self;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Delay implementation suited for MCE::Child.
+##
+###############################################################################
+
+package # hide from rpm
+ MCE::Child::_delay;
+
+sub new {
+ my ( $class, $chnl, $delay ) = @_;
+
+ if ( !defined $delay ) {
+ $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
+ }
+
+ $chnl->send(undef);
+
+ bless [ $delay, $chnl ], $class;
+}
+
+sub seconds {
+ my ( $self, $how_long ) = @_;
+ my ( $delay, $time ) = ( $how_long || $self->[0], Time::HiRes::time() );
+ my ( $lapse ) = $self->[1]->recv();
+
+ if ( !$lapse || $time >= $lapse ) {
+ $self->[1]->send($time + $delay);
+ return $delay;
+ }
+
+ $self->[1]->send( $lapse += $delay );
+
+ return $lapse - $time;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Hash and ordhash implementations suited for MCE::Child.
+##
+###############################################################################
+
+package # hide from rpm
+ MCE::Child::_hash;
+
+use Time::HiRes 'sleep';
+
+use constant {
+ _WNOHANG => ( $INC{'POSIX.pm'} )
+ ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
+};
+
+sub new {
+ my ( $class, $chnl ) = @_;
+
+ bless [ {}, $chnl ], shift;
+}
+
+sub clear {
+ my ( $self ) = @_;
+
+ 1 while ( $self->[1]->recv2_nb() );
+
+ %{ $self->[0] } = ();
+}
+
+sub exists {
+ my ( $self, $key ) = @_;
+
+ while ( my $data = $self->[1]->recv2_nb() ) {
+ $self->[0]{ $data->[0] } = $data->[1];
+ }
+
+ CORE::exists $self->[0]{ $key };
+}
+
+sub get {
+ my ( $self, $wrk_id, $wait_flag ) = @_;
+
+ if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
+ while ( my $data = $self->[1]->recv2_nb() ) {
+ $self->[0]{ $data->[0] } = $data->[1];
+ }
+ }
+
+ if ( $wait_flag ) {
+ local $!;
+ ( CORE::exists $self->[0]{ 'R'.$wrk_id } ) ? waitpid($wrk_id, 0) : do {
+ while () {
+ my $data = $self->[1]->recv2_nb();
+ if ( !defined $data ) {
+ last if waitpid($wrk_id, _WNOHANG);
+ sleep(0.015), next;
+ }
+ $self->[0]{ $data->[0] } = $data->[1];
+ waitpid($wrk_id, 0), last if $data->[0] eq 'R'.$wrk_id;
+ }
+ };
+ }
+
+ if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } ) {
+ sleep 0.015; # retry
+ while ( my $data = $self->[1]->recv2_nb() ) {
+ $self->[0]{ $data->[0] } = $data->[1];
+ }
+ }
+
+ my $result = delete $self->[0]{ 'R'.$wrk_id };
+ my $error = delete $self->[0]{ 'S'.$wrk_id };
+
+ $result = '' unless defined $result;
+ $error = '' unless defined $error;
+
+ return ( $result, $error );
+}
+
+sub reapdata {
+ my ( $self ) = @_;
+
+ while ( my $data = $self->[1]->recv2_nb() ) {
+ $self->[0]{ $data->[0] } = $data->[1];
+ }
+
+ return;
+}
+
+sub set {
+ $_[0]->[1]->send2([ $_[1], $_[2] ]);
+}
+
+package # hide from rpm
+ MCE::Child::_ordhash;
+
+sub new { my $gcnt = 0; bless [ {}, [], {}, \$gcnt ], shift; }
+sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
+sub get { $_[0]->[0]{ $_[1] }; }
+sub len { scalar keys %{ $_[0]->[0] }; }
+
+sub clear {
+ %{ $_[0]->[0] } = @{ $_[0]->[1] } = %{ $_[0]->[2] } = ();
+ ${ $_[0]->[3] } = 0;
+
+ return;
+}
+
+sub del {
+ my ( $data, $keys, $indx, $gcnt ) = @{ $_[0] };
+ my $pos = delete $indx->{ $_[1] };
+ return undef unless defined $pos;
+
+ $keys->[ $pos ] = undef;
+
+ if ( ++${ $gcnt } > @{ $keys } * 0.667 ) {
+ my $i; $i = ${ $gcnt } = 0;
+ for my $k ( @{ $keys } ) {
+ $keys->[ $i ] = $k, $indx->{ $k } = $i++ if ( defined $k );
+ }
+ splice @{ $keys }, $i;
+ }
+
+ delete $data->{ $_[1] };
+}
+
+sub set {
+ my ( $key, $data, $keys, $indx ) = ( $_[1], @{ $_[0] } );
+ $data->{ $key } = $_[2], $indx->{ $key } = @{ $keys };
+ push @{ $keys }, "$key";
+
+ return;
+}
+
+sub vals {
+ my ( $self ) = @_;
+
+ ${ $self->[3] }
+ ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
+ : @{ $self->[0] }{ @{ $self->[1] } };
+}
+
+1;
+
+__END__
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Module usage.
+##
+###############################################################################
+
+=head1 NAME
+
+MCE::Child - A threads-like parallelization module compatible with Perl 5.8
+
+=head1 VERSION
+
+This document describes MCE::Child version 1.843
+
+=head1 SYNOPSIS
+
+ use MCE::Child;
+
+ MCE::Child->init(
+ max_workers => 'auto', # default undef, unlimited
+ child_timeout => 20, # default undef, no timeout
+ posix_exit => 1, # default undef, CORE::exit
+ void_context => 1, # default undef
+ on_start => sub {
+ my ( $pid, $ident ) = @_;
+ ...
+ },
+ on_finish => sub {
+ my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
+ ...
+ }
+ );
+
+ MCE::Child->create( sub { print "Hello from child\n" } )->join();
+
+ sub parallel {
+ my ($arg1) = @_;
+ print "Hello again, $arg1\n" if defined($arg1);
+ print "Hello again, $_\n"; # same thing
+ }
+
+ MCE::Child->create( \&parallel, $_ ) for 1 .. 3;
+
+ my @procs = MCE::Child->list();
+ my @running = MCE::Child->list_running();
+ my @joinable = MCE::Child->list_joinable();
+ my @count = MCE::Child->pending();
+
+ # Joining is orderly, e.g. child1 is joined first, child2, child3.
+ $_->join() for @procs;
+
+ # Joining occurs immediately as child processes complete execution.
+ 1 while MCE::Child->wait_one();
+
+ my $child = mce_child { foreach (@files) { ... } };
+
+ $child->join();
+
+ if ( my $err = $child->error() ) {
+ warn "Child error: $err\n";
+ }
+
+ # Get a child's object
+ $child = MCE::Child->self();
+
+ # Get a child's ID
+ $pid = MCE::Child->pid(); # $$
+ $pid = $child->pid();
+ $pid = MCE::Child->tid(); # tid is an alias for pid
+ $pid = $child->tid();
+
+ # Test child objects
+ if ( $child1 == $child2 ) {
+ ...
+ }
+
+ # Give other workers a chance to run
+ MCE::Child->yield();
+ MCE::Child->yield(0.05);
+
+ # Return context, wantarray aware
+ my ($value1, $value2) = $child->join();
+ my $value = $child->join();
+
+ # Check child's state
+ if ( $child->is_running() ) {
+ sleep 1;
+ }
+ if ( $child->is_joinable() ) {
+ $child->join();
+ }
+
+ # Send a signal to a child
+ $child->kill('SIGUSR1');
+
+ # Exit a child
+ MCE::Child->exit(0);
+ MCE::Child->exit(0, @ret);
+
+=head1 DESCRIPTION
+
+L<MCE::Child> is a fork of L<MCE::Hobo> for compatibility with Perl 5.8.
+
+A child is a migratory worker inside the machine that carries the asynchronous
+gene. Child processes are equipped with C<threads>-like capability for running
+code asynchronously. Unlike threads, each child is a unique process to the
+underlying OS. The IPC is handled via C<MCE::Channel>, which runs on all the
+major platforms including Cygwin and Strawberry Perl.
+
+C<MCE::Child> may be used as a standalone or together with C<MCE> including
+running alongside C<threads>.
+
+ use MCE::Child;
+ use MCE::Shared;
+
+ # synopsis: head -20 file.txt | perl script.pl
+
+ my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
+ my $ofh = MCE::Shared->handle( ">", \*STDOUT );
+ my $ary = MCE::Shared->array();
+
+ sub parallel_task {
+ my ( $id ) = @_;
+ while ( <$ifh> ) {
+ printf {$ofh} "[ %4d ] %s", $., $_;
+ # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
+ $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
+ }
+ }
+
+ my $child1 = MCE::Child->new( "parallel_task", 1 );
+ my $child2 = MCE::Child->new( \&parallel_task, 2 );
+ my $child3 = MCE::Child->new( sub { parallel_task(3) } );
+
+ $_->join for MCE::Child->list(); # ditto: MCE::Child->wait_all();
+
+ # search array (total one round-trip via IPC)
+ my @vals = $ary->vals( "val =~ / ID 2 /" );
+
+ print {*STDERR} join("", @vals);
+
+=head1 API DOCUMENTATION
+
+=over 3
+
+=item $child = MCE::Child->create( FUNCTION, ARGS )
+
+=item $child = MCE::Child->new( FUNCTION, ARGS )
+
+This will create a new child process that will begin execution with function
+as the entry point, and optionally ARGS for list of parameters. It will return
+the corresponding MCE::Child object, or undef if child creation failed.
+
+I<FUNCTION> may either be the name of a function, an anonymous subroutine, or
+a code ref.
+
+ my $child = MCE::Child->create( "func_name", ... );
+ # or
+ my $child = MCE::Child->create( sub { ... }, ... );
+ # or
+ my $child = MCE::Child->create( \&func, ... );
+
+=item $child = MCE::Child->create( { options }, FUNCTION, ARGS )
+
+=item $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
+
+Options, excluding C<ident>, may be specified globally via the C<init> function.
+Otherwise, C<ident>, C<child_timeout>, C<posix_exit>, and C<void_context> may
+be set uniquely.
+
+The C<ident> option is used by callback functions C<on_start> and C<on_finish>
+for identifying the started and finished child process respectively.
+
+ my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
+ ...
+ } );
+
+ $child1->join;
+
+ my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
+ sleep 1 for ( 1 .. 9 );
+ } );
+
+ $child2->join;
+
+ if ( $child2->error() eq "Child timed out\n" ) {
+ ...
+ }
+
+The C<new()> method is an alias for C<create()>.
+
+=item mce_child { BLOCK } ARGS;
+
+=item mce_child { BLOCK };
+
+C<mce_child> runs the block asynchronously similarly to C<MCE::Child->create()>.
+It returns the child object, or undef if child creation failed.
+
+ my $child = mce_child { foreach (@files) { ... } };
+
+ $child->join();
+
+ if ( my $err = $child->error() ) {
+ warn("Child error: $err\n");
+ }
+
+=item $child->join()
+
+This will wait for the corresponding child process to complete its execution.
+In non-voided context, C<join()> will return the value(s) of the entry point
+function.
+
+The context (void, scalar or list) for the return value(s) for C<join> is
+determined at the time of joining and mostly C<wantarray> aware.
+
+ my $child1 = MCE::Child->create( sub {
+ my @res = qw(foo bar baz);
+ return (@res);
+ });
+
+ my @res1 = $child1->join(); # ( foo, bar, baz )
+ my $res1 = $child1->join(); # baz
+
+ my $child2 = MCE::Child->create( sub {
+ return 'foo';
+ });
+
+ my @res2 = $child2->join(); # ( foo )
+ my $res2 = $child2->join(); # foo
+
+=item $child1->equal( $child2 )
+
+Tests if two child objects are the same child or not. Child comparison is based
+on process IDs. This is overloaded to the more natural forms.
+
+ if ( $child1 == $child2 ) {
+ print("Child objects are the same\n");
+ }
+ # or
+ if ( $child1 != $child2 ) {
+ print("Child objects differ\n");
+ }
+
+=item $child->error()
+
+Child processes are executed in an C<eval> context. This method will return
+C<undef> if the child terminates I<normally>. Otherwise, it returns the value
+of C<$@> associated with the child's execution status in its C<eval> context.
+
+=item $child->exit()
+
+This sends C<'SIGQUIT'> to the child process, notifying the child to exit.
+It returns the child object to allow for method chaining. It is important to
+join later if not immediately to not leave a zombie or defunct process.
+
+ $child->exit()->join();
+ ...
+
+ $child->join(); # later
+
+=item MCE::Child->exit( 0 )
+
+=item MCE::Child->exit( 0, @ret )
+
+A child can exit at any time by calling C<MCE::Child->exit()>. Otherwise, the
+behavior is the same as C<exit(status)> when called from the main process.
+The child process may optionally return data, to be sent via IPC.
+
+=item MCE::Child->finish()
+
+This class method is called automatically by C<END>, but may be called
+explicitly. An error is emitted via croak if there are active child
+processes not yet joined.
+
+ MCE::Child->create( 'task1', $_ ) for 1 .. 4;
+ $_->join for MCE::Child->list();
+
+ MCE::Child->create( 'task2', $_ ) for 1 .. 4;
+ $_->join for MCE::Child->list();
+
+ MCE::Child->create( 'task3', $_ ) for 1 .. 4;
+ $_->join for MCE::Child->list();
+
+ MCE::Child->finish();
+
+=item MCE::Child->init( options )
+
+The init function accepts a list of MCE::Child options.
+
+ MCE::Child->init(
+ max_workers => 'auto', # default undef, unlimited
+ child_timeout => 20, # default undef, no timeout
+ posix_exit => 1, # default undef, CORE::exit
+ void_context => 1, # default undef
+ on_start => sub {
+ my ( $pid, $ident ) = @_;
+ ...
+ },
+ on_finish => sub {
+ my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
+ ...
+ }
+ );
+
+ # Identification given as an option or the 1st argument.
+
+ for my $key ( 'aa' .. 'zz' ) {
+ MCE::Child->create( { ident => $key }, sub { ... } );
+ MCE::Child->create( $key, sub { ... } );
+ }
+
+ MCE::Child->wait_all;
+
+Set C<max_workers> if you want to limit the number of workers by waiting
+automatically for an available slot. Specify C<auto> to obtain the number
+of logical cores via C<MCE::Util::get_ncpu()>.
+
+Set C<child_timeout>, in number of seconds, if you want the child process
+to terminate after some time. The default is C<0> for no timeout.
+
+Set C<posix_exit> to avoid all END and destructor processing. Constructing
+MCE::Child inside a thread implies 1 or if present CGI, FCGI, Coro, Curses,
+Gearman::Util, Gearman::XS, LWP::UserAgent, Mojo::IOLoop, Prima, STFL,
+Tk, Wx, or Win32::GUI.
+
+Set C<void_context> to create the child process in void context for the
+return value. Otherwise, the return context is wantarray-aware for
+C<join()> and C<result()> and determined when retrieving the data.
+
+The callback options C<on_start> and C<on_finish> are called in the parent
+process after starting the worker and later when terminated. The arguments
+for the subroutines were inspired by L<Parallel::ForkManager>.
+
+The parameters for C<on_start> are the following:
+
+ - pid of the child process
+ - identification (ident option or 1st arg to create)
+
+The parameters for C<on_finish> are the following:
+
+ - pid of the child process
+ - program exit code
+ - identification (ident option or 1st arg to create)
+ - exit signal id
+ - error message from eval inside MCE::Child
+ - returned data
+
+=item $child->is_running()
+
+Returns true if a child is still running.
+
+=item $child->is_joinable()
+
+Returns true if the child has finished running and not yet joined.
+
+=item $child->kill( 'SIG...' )
+
+Sends the specified signal to the child. Returns the child object to allow for
+method chaining. As with C<exit>, it is important to join eventually if not
+immediately to not leave a zombie or defunct process.
+
+ $child->kill('SIG...')->join();
+
+The following is a parallel demonstration comparing C<MCE::Shared> against
+C<Redis> and C<Redis::Fast> on a Fedora 23 VM. Joining begins after all
+workers have been notified to quit.
+
+ use Time::HiRes qw(time);
+
+ use Redis;
+ use Redis::Fast;
+
+ use MCE::Child;
+ use MCE::Shared;
+
+ my $redis = Redis->new();
+ my $rfast = Redis::Fast->new();
+ my $array = MCE::Shared->array();
+
+ sub parallel_redis {
+ my ($_redis) = @_;
+ my ($count, $quit, $len) = (0, 0);
+
+ # instead, use a flag to exit loop
+ $SIG{'QUIT'} = sub { $quit = 1 };
+
+ while () {
+ $len = $_redis->rpush('list', $count++);
+ last if $quit;
+ }
+
+ $count;
+ }
+
+ sub parallel_array {
+ my ($count, $quit, $len) = (0, 0);
+
+ # do not exit from inside handler
+ $SIG{'QUIT'} = sub { $quit = 1 };
+
+ while () {
+ $len = $array->push($count++);
+ last if $quit;
+ }
+
+ $count;
+ }
+
+ sub benchmark_this {
+ my ($desc, $num_procs, $timeout, $code, @args) = @_;
+ my ($start, $total) = (time(), 0);
+
+ MCE::Child->new($code, @args) for 1..$num_procs;
+ sleep $timeout;
+
+ # joining is not immediate; ok
+ $_->kill('QUIT') for MCE::Child->list();
+
+ # joining later; ok
+ $total += $_->join() for MCE::Child->list();
+
+ printf "$desc <> duration: %0.03f secs, count: $total\n",
+ time() - $start;
+
+ sleep 0.2;
+ }
+
+ benchmark_this('Redis ', 8, 5.0, \&parallel_redis, $redis);
+ benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
+ benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
+
+=item MCE::Child->list()
+
+Returns a list of all child objects not yet joined.
+
+ @procs = MCE::Child->list();
+
+=item MCE::Child->list_running()
+
+Returns a list of all child objects that are still running.
+
+ @procs = MCE::Child->list_running();
+
+=item MCE::Child->list_joinable()
+
+Returns a list of all child objects that have completed running.
+Thus, ready to be joined without blocking.
+
+ @procs = MCE::Child->list_joinable();
+
+=item MCE::Child->max_workers([ N ])
+
+Getter and setter for max_workers. Specify a number or 'auto' to acquire the
+total number of cores via MCE::Util::get_ncpu. Specify a false value to set
+back to no limit.
+
+=item MCE::Child->pending()
+
+Returns a count of all child objects not yet joined.
+
+ $count = MCE::Child->pending();
+
+=item $child->result()
+
+Returns the result obtained by C<join>, C<wait_one>, or C<wait_all>. If the
+process has not yet exited, waits for the corresponding child to complete its
+execution.
+
+ use MCE::Child;
+ use Time::HiRes qw(sleep);
+
+ sub task {
+ my ($id) = @_;
+ sleep $id * 0.333;
+ return $id;
+ }
+
+ MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
+
+ # 1 while MCE::Child->wait_one();
+
+ while ( my $child = MCE::Child->wait_one() ) {
+ my $err = $child->error() || 'no error';
+ my $res = $child->result();
+ my $pid = $child->pid();
+
+ print "[$pid] $err : $res\n";
+ }
+
+Like C<join> described above, the context (void, scalar or list) for the
+return value(s) is determined at the time C<result> is called and mostly
+C<wantarray> aware.
+
+ my $child1 = MCE::Child->create( sub {
+ my @res = qw(foo bar baz);
+ return (@res);
+ });
+
+ my @res1 = $child1->result(); # ( foo, bar, baz )
+ my $res1 = $child1->result(); # baz
+
+ my $child2 = MCE::Child->create( sub {
+ return 'foo';
+ });
+
+ my @res2 = $child2->result(); # ( foo )
+ my $res2 = $child2->result(); # foo
+
+=item MCE::Child->self()
+
+Class method that allows a child to obtain it's own I<MCE::Child> object.
+
+=item $child->pid()
+
+=item $child->tid()
+
+Returns the ID of the child.
+
+ pid: $$ process id
+ tid: $$ alias for pid
+
+=item MCE::Child->pid()
+
+=item MCE::Child->tid()
+
+Class methods that allows a child to obtain its own ID.
+
+ pid: $$ process id
+ tid: $$ alias for pid
+
+=item MCE::Child->wait_one()
+
+=item MCE::Child->wait_all()
+
+Meaningful for the manager process only, waits for one or all child processes
+to complete execution. Afterwards, returns the corresponding child objects.
+If a child doesn't exist, returns the C<undef> value or an empty list for
+C<wait_one> and C<wait_all> respectively.
+
+The C<waitone> and C<waitall> methods are aliases respectively.
+
+ use MCE::Child;
+ use Time::HiRes qw(sleep);
+
+ sub task {
+ my $id = shift;
+ sleep $id * 0.333;
+ return $id;
+ }
+
+ MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
+
+ # join, traditional use case
+ $_->join() for MCE::Child->list();
+
+ # wait_one, simplistic use case
+ 1 while MCE::Child->wait_one();
+
+ # wait_one
+ while ( my $child = MCE::Child->wait_one() ) {
+ my $err = $child->error() || 'no error';
+ my $res = $child->result();
+ my $pid = $child->pid();
+
+ print "[$pid] $err : $res\n";
+ }
+
+ # wait_all
+ my @procs = MCE::Child->wait_all();
+
+ for ( @procs ) {
+ my $err = $_->error() || 'no error';
+ my $res = $_->result();
+ my $pid = $_->pid();
+
+ print "[$pid] $err : $res\n";
+ }
+
+=item MCE::Child->yield( [ floating_seconds ] )
+
+Give other workers a chance to run, optionally for given time. Yield behaves
+similarly to MCE's interval option. It throttles workers from running too fast.
+A demonstration is provided in the next section for fetching URLs in parallel.
+
+ # total run time: 1.00 second
+
+ MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
+ MCE::Child->wait_all();
+
+=back
+
+=head1 PARALLEL::FORKMANAGER-like DEMONSTRATION
+
+MCE::Child behaves similarly to threads for the most part. It also provides
+L<Parallel::ForkManager>-like capabilities. The C<Parallel::ForkManager>
+example is shown first followed by a version using C<MCE::Child>.
+
+=over 3
+
+=item Parallel::ForkManager
+
+ use strict;
+ use warnings;
+
+ use Parallel::ForkManager;
+ use Time::HiRes 'time';
+
+ my $start = time;
+
+ my $pm = Parallel::ForkManager->new(10);
+ $pm->set_waitpid_blocking_sleep(0);
+
+ $pm->run_on_finish( sub {
+ my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
+ print "child $pid completed: $ident => ", $resp->[0], "\n";
+ });
+
+ DATA_LOOP:
+ foreach my $data ( 1..2000 ) {
+ # forks and returns the pid for the child
+ my $pid = $pm->start($data) and next DATA_LOOP;
+ my $ret = [ $data * 2 ];
+
+ $pm->finish(0, $ret);
+ }
+
+ $pm->wait_all_children;
+
+ printf STDERR "duration: %0.03f seconds\n", time - $start;
+
+=item MCE::Child
+
+ use strict;
+ use warnings;
+
+ use MCE::Child 1.843;
+ use Time::HiRes 'time';
+
+ my $start = time;
+
+ MCE::Child->init(
+ max_workers => 10,
+ on_finish => sub {
+ my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
+ print "child $pid completed: $ident => ", $resp->[0], "\n";
+ }
+ );
+
+ foreach my $data ( 1..2000 ) {
+ MCE::Child->create( $data, sub {
+ [ $data * 2 ];
+ });
+ }
+
+ MCE::Child->wait_all;
+
+ printf STDERR "duration: %0.03f seconds\n", time - $start;
+
+=item Time to spin 2,000 workers and obtain results (in seconds).
+
+Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo Boost).
+Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again with Moo loaded
+at the top of the script.
+
+ MCE::Hobo uses MCE::Shared to retrieve data during reaping.
+ MCE::Child uses MCE::Channel, no shared-manager.
+
+ Version Cygwin Windows Linux macOS FreeBSD
+
+ MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
+ MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
+ P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
+
+ MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
+ MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
+ P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
+
+=item Set posix_exit to avoid all END and destructor processing.
+
+This is helpful in reducing overhead when workers exit. Ditto if using a Perl
+module not parallel safe. The option is ignored on C<$^O eq 'MSWin32'>.
+
+ MCE::Child->init( posix_exit => 1, ... );
+ MCE::Hobo->init( posix_exit => 1, ... );
+
+ Version Cygwin Windows Linux macOS FreeBSD
+
+ MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
+ MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
+
+=back
+
+=head1 PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT
+
+This demonstration constructs two queues, two handles, starts the
+shared-manager process if needed, and spawns four workers.
+For this demonstration, am chunking 64 URLs per job. In reality,
+one may run with 200 workers and chunk 300 URLs on a 24-way box.
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # perl demo.pl -- all output
+ # perl demo.pl >/dev/null -- mngr/child output
+ # perl demo.pl 2>/dev/null -- show results only
+ #
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ use strict;
+ use warnings;
+
+ use AnyEvent;
+ use AnyEvent::HTTP;
+ use Time::HiRes qw( time );
+
+ use MCE::Child;
+ use MCE::Shared;
+
+ # Construct two queues, input and return.
+
+ my $que = MCE::Shared->queue();
+ my $ret = MCE::Shared->queue();
+
+ # Construct shared handles for serializing output from many workers
+ # writing simultaneously. This prevents garbled output.
+
+ mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
+ mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
+
+ # Spawn workers early for minimum memory consumption.
+
+ MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
+
+ # Obtain or generate input data for workers to process.
+
+ my ( $count, @urls ) = ( 0 );
+
+ push @urls, map { "http://127.0.0.$_/" } 1..254;
+ push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
+
+ while ( @urls ) {
+ my @chunk = splice(@urls, 0, 64);
+ $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
+ }
+
+ # So that workers leave the loop after consuming the queue.
+
+ $que->end();
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # Loop for the manager process. The manager may do other work if
+ # need be and periodically check $ret->pending() not shown here.
+ #
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ my $start = time;
+
+ printf {$ERR} "Mngr - entering loop\n";
+
+ while ( $count ) {
+ my ( $result, $failed ) = $ret->dequeue( 2 );
+
+ # Remove ID from result, so not treated as a URL item.
+
+ printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
+
+ # Display the URL and the size captured.
+
+ foreach my $url ( keys %{ $result } ) {
+ printf {$OUT} "%s: %d\n", $url, length($result->{$url})
+ if $result->{$url}; # url has content
+ }
+
+ # Display URLs could not reach.
+
+ if ( @{ $failed } ) {
+ foreach my $url ( @{ $failed } ) {
+ print {$OUT} "Failed: $url\n";
+ }
+ }
+
+ # Decrement the count.
+
+ $count--;
+ }
+
+ MCE::Child->wait_all();
+
+ printf {$ERR} "Mngr - exiting loop\n\n";
+ printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
+
+ exit;
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # Child processes enqueue two items ( $result and $failed ) per each
+ # job for the manager process. Likewise, the manager process dequeues
+ # two items above. Optionally, child processes may include the ID in
+ # the result.
+ #
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ sub task {
+ my ( $id ) = @_;
+ printf {$ERR} "Child $id entering loop\n";
+
+ while ( my $job = $que->dequeue() ) {
+ my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
+
+ # Walk URLs, provide a hash and array refs for data.
+
+ printf {$ERR} "Child $id running job $job->{ID}\n";
+ walk( $job, $result, $failed );
+
+ # Send results to the manager process.
+
+ $ret->enqueue( $result, $failed );
+ }
+
+ printf {$ERR} "Child $id exiting loop\n";
+ }
+
+ sub walk {
+ my ( $job, $result, $failed ) = @_;
+
+ # Yielding is critical when running an event loop in parallel.
+ # Not doing so means that the app may reach contention points
+ # with the firewall and likely impose unnecessary hardship at
+ # the OS level. The idea here is not to have multiple workers
+ # initiate HTTP requests to a batch of URLs at the same time.
+ # Yielding behaves similarly like scatter to have the child
+ # process run solo for a fraction of time.
+
+ MCE::Child->yield( 0.03 );
+
+ my $cv = AnyEvent->condvar();
+
+ # Populate the hash ref for the URLs it could reach.
+ # Do not mix AnyEvent timeout with child timeout.
+ # Therefore, choose event timeout when available.
+
+ foreach my $url ( @{ $job->{INPUT} } ) {
+ $cv->begin();
+ http_get $url, timeout => 2, sub {
+ my ( $data, $headers ) = @_;
+ $result->{$url} = $data;
+ $cv->end();
+ };
+ }
+
+ $cv->recv();
+
+ # Populate the array ref for URLs it could not reach.
+
+ foreach my $url ( @{ $job->{INPUT} } ) {
+ push @{ $failed }, $url unless (exists $result->{ $url });
+ }
+
+ return;
+ }
+
+ __END__
+
+ $ perl demo.pl
+
+ Child 1 entering loop
+ Child 2 entering loop
+ Child 3 entering loop
+ Mngr - entering loop
+ Child 2 running job 2
+ Child 3 running job 3
+ Child 1 running job 1
+ Child 4 entering loop
+ Child 4 running job 4
+ Child 2 running job 5
+ Mngr - received job 2
+ Child 3 running job 6
+ Mngr - received job 3
+ Child 1 running job 7
+ Mngr - received job 1
+ Child 4 running job 8
+ Mngr - received job 4
+ http://192.168.0.1/: 3729
+ Child 2 exiting loop
+ Mngr - received job 5
+ Child 3 exiting loop
+ Mngr - received job 6
+ Child 1 exiting loop
+ Mngr - received job 7
+ Child 4 exiting loop
+ Mngr - received job 8
+ Mngr - exiting loop
+
+ Duration: 4.131 seconds
+
+=head1 CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE
+
+Making an executable is possible with the L<PAR::Packer> module.
+On the Windows platform, threads, threads::shared, and exiting via
+threads are necessary for the binary to exit successfully.
+
+ # https://metacpan.org/pod/PAR::Packer
+ # https://metacpan.org/pod/pp
+ #
+ # pp -o demo.exe demo.pl
+ # ./demo.exe
+
+ use strict;
+ use warnings;
+
+ use if $^O eq "MSWin32", "threads";
+ use if $^O eq "MSWin32", "threads::shared";
+
+ # Include minimum dependencies for MCE::Child.
+ # Add other modules required by your application here.
+
+ use Storable ();
+ use Time::HiRes ();
+
+ # use IO::FDPass (); # optional: for condvar, handle, queue
+ # use Sereal (); # optional: for faster serialization
+
+ use MCE::Child;
+ use MCE::Shared;
+
+ # For PAR to work on the Windows platform, one must include manually
+ # any shared modules used by the application.
+
+ # use MCE::Shared::Array; # if using MCE::Shared->array
+ # use MCE::Shared::Cache; # if using MCE::Shared->cache
+ # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
+ # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
+ # use MCE::Shared::Hash; # if using MCE::Shared->hash
+ # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
+ # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
+ # use MCE::Shared::Queue; # if using MCE::Shared->queue
+ # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
+
+ # Et cetera. Only load modules needed for your application.
+
+ use MCE::Shared::Sequence; # if using MCE::Shared->sequence
+
+ my $seq = MCE::Shared->sequence( 1, 9 );
+
+ sub task {
+ my ( $id ) = @_;
+ while ( defined ( my $num = $seq->next() ) ) {
+ print "$id: $num\n";
+ sleep 1;
+ }
+ }
+
+ sub main {
+ MCE::Child->new( \&task, $_ ) for 1 .. 3;
+ MCE::Child->wait_all();
+ }
+
+ # Main must run inside a thread on the Windows platform or workers
+ # will fail duing exiting, causing the exe to crash. The reason is
+ # that PAR or a dependency isn't multi-process safe.
+
+ ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
+
+ threads->exit(0) if $INC{"threads.pm"};
+
+=head1 CREDITS
+
+The inspiration for C<MCE::Child> comes from wanting C<threads>-like behavior
+for processes compatible with Perl 5.8. Both can run side-by-side including
+safe-use by MCE workers. Likewise, the documentation resembles C<threads>.
+
+The inspiration for C<wait_all> and C<wait_one> comes from the
+C<Parallel::WorkUnit> module.
+
+=head1 SEE ALSO
+
+=over 3
+
+=item * L<forks>
+
+=item * L<forks::BerkeleyDB>
+
+=item * L<MCE::Hobo>
+
+=item * L<Parallel::ForkManager>
+
+=item * L<Parallel::Loops>
+
+=item * L<Parallel::Prefork>
+
+=item * L<Parallel::WorkUnit>
+
+=item * L<Proc::Fork>
+
+=item * L<Thread::Tie>
+
+=item * L<threads>
+
+=back
+
+=head1 INDEX
+
+L<MCE|MCE>, L<MCE::Channel>, L<MCE::Shared>
+
+=head1 AUTHOR
+
+Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
+
+=cut
+
diff --git a/lib/MCE/Core.pod b/lib/MCE/Core.pod
index f85ddc4..c7c8f64 100644
--- a/lib/MCE/Core.pod
+++ b/lib/MCE/Core.pod
@@ -5,7 +5,7 @@ MCE::Core - Documentation describing the core MCE API
=head1 VERSION
-This document describes MCE::Core version 1.838
+This document describes MCE::Core version 1.843
=head1 SYNOPSIS
@@ -219,7 +219,7 @@ Below, a new instance is configured with all available options.
progress => sub { ... }, ## Default undef
# A code block for receiving info on progress made.
- # See section labeled "PROGRESS DEMONSTRATIONS"
+ # See section labeled "MCE PROGRESS DEMONSTRATIONS"
# at the end of this document.
user_args => { env => 'test' }, ## Default undef
@@ -2137,7 +2137,7 @@ uses 2 workers to minimize the output size. Input is from the sequence option.
The interval.pl example above is included with MCE.
-=head1 PROGRESS DEMONSTRATIONS
+=head1 MCE PROGRESS DEMONSTRATIONS
The C<progress> option takes a code block for receiving info on the progress
made while processing input data; e.g. C<input_data> or C<sequence>. To make
@@ -2412,6 +2412,14 @@ can do is report the size completed thus far.
printf "%0.1f kibibytes\n", $completed_size / 1024;
}
+=head1 SEE ALSO
+
+=over 3
+
+=item * L<MCE::Examples>
+
+=back
+
=head1 INDEX
L<MCE|MCE>
diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm
index 9c62cc9..71c2aa2 100644
--- a/lib/MCE/Core/Input/Generator.pm
+++ b/lib/MCE/Core/Input/Generator.pm
@@ -15,7 +15,7 @@ package MCE::Core::Input::Generator;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm
index 15aa365..f570ab1 100644
--- a/lib/MCE/Core/Input/Handle.pm
+++ b/lib/MCE/Core/Input/Handle.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Handle;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## Items below are folded into MCE.
@@ -38,7 +38,7 @@ sub _systell {
# To minimize memory consumption, SEEK_CUR equals 1 on most platforms.
# e.g. use Fcntl qw(SEEK_CUR);
- MCE::Util::_sysseek($_[0], 0, 1);
+ sysseek($_[0], 0, 1);
}
sub _worker_read_handle {
@@ -67,15 +67,15 @@ sub _worker_read_handle {
$_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
# inlined for performance
- if ($self->{_data_channels} > 6) {
- $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )};
+ if ($self->{_data_channels} > 5) {
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 5 + 1 )};
}
$_dat_ex = sub {
MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
unless $_DAT_LOCK->{ $_pid };
};
$_dat_un = sub {
- MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
if $_DAT_LOCK->{ $_pid };
};
}
@@ -117,7 +117,7 @@ sub _worker_read_handle {
($_chunk_id, $_offset_pos) = unpack($_que_template, $_next);
if ($_offset_pos >= $_data_size) {
- MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos));
+ syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos));
$_dat_un->() if $_lock_chn;
close $_IN_FILE; undef $_IN_FILE;
return;
@@ -172,7 +172,7 @@ sub _worker_read_handle {
}
}
- MCE::Util::_syswrite(
+ syswrite(
$_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE)
);
$_dat_un->() if $_lock_chn;
@@ -181,7 +181,7 @@ sub _worker_read_handle {
local $/ = $_RS if ($/ ne $_RS);
if ($_parallel_io && $_RS eq $LF) {
- MCE::Util::_syswrite(
+ syswrite(
$_QUE_W_SOCK,
pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size)
);
@@ -195,8 +195,8 @@ sub _worker_read_handle {
}
if ($_proc_type == READ_FILE) {
- MCE::Util::_sysseek($_IN_FILE, tell( $_IN_FILE ), 0);
- MCE::Util::_sysread($_IN_FILE, $_, $_tmp_cs, $_p);
+ sysseek($_IN_FILE, tell( $_IN_FILE ), 0);
+ sysread($_IN_FILE, $_, $_tmp_cs, $_p);
seek $_IN_FILE, _systell($_IN_FILE), 0;
}
else {
@@ -207,8 +207,8 @@ sub _worker_read_handle {
}
else {
if ($_proc_type == READ_FILE) {
- MCE::Util::_sysseek($_IN_FILE, $_offset_pos, 0);
- MCE::Util::_sysread($_IN_FILE, $_, $_chunk_size, $_p);
+ sysseek($_IN_FILE, $_offset_pos, 0);
+ sysread($_IN_FILE, $_, $_chunk_size, $_p);
seek $_IN_FILE, _systell($_IN_FILE), 0;
}
else {
@@ -218,7 +218,7 @@ sub _worker_read_handle {
$_ .= <$_IN_FILE>;
- MCE::Util::_syswrite(
+ syswrite(
$_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE)
);
$_dat_un->() if $_lock_chn;
diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm
index 463fc60..dd2ff4f 100644
--- a/lib/MCE/Core/Input/Iterator.pm
+++ b/lib/MCE/Core/Input/Iterator.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## Items below are folded into MCE.
@@ -59,7 +59,7 @@ sub _worker_user_iterator {
unless $_DAT_LOCK->{ $_pid };
};
$_dat_un = sub {
- MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
if $_DAT_LOCK->{ $_pid };
};
}
diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm
index 0867a79..e56a6c7 100644
--- a/lib/MCE/Core/Input/Request.pm
+++ b/lib/MCE/Core/Input/Request.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Request;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## Items below are folded into MCE.
@@ -62,7 +62,7 @@ sub _worker_request_chunk {
unless $_DAT_LOCK->{ $_pid };
};
$_dat_un = sub {
- MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
if $_DAT_LOCK->{ $_pid };
};
}
diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm
index d7c0053..d4d8862 100644
--- a/lib/MCE/Core/Input/Sequence.pm
+++ b/lib/MCE/Core/Input/Sequence.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## Items below are folded into MCE.
@@ -56,15 +56,15 @@ sub _worker_sequence_queue {
$_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
# inlined for performance
- if ($self->{_data_channels} > 6) {
- $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )};
+ if ($self->{_data_channels} > 5) {
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 5 + 1 )};
}
$_dat_ex = sub {
MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
unless $_DAT_LOCK->{ $_pid };
};
$_dat_un = sub {
- MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
if $_DAT_LOCK->{ $_pid };
};
}
@@ -102,12 +102,12 @@ sub _worker_sequence_queue {
($_chunk_id, $_offset) = unpack($_que_template, $_next);
if ($_offset >= $_abort) {
- MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset));
+ syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset));
$_dat_un->() if $_lock_chn;
return;
}
- MCE::Util::_syswrite(
+ syswrite(
$_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1)
);
diff --git a/lib/MCE/Core/Manager.pm b/lib/MCE/Core/Manager.pm
index 1ed32dc..6910360 100644
--- a/lib/MCE/Core/Manager.pm
+++ b/lib/MCE/Core/Manager.pm
@@ -14,7 +14,7 @@ package MCE::Core::Manager;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -88,7 +88,7 @@ sub _output_loop {
$_has_user_tasks, $_sess_dir, $_task_id, $_user_error, $_user_output,
$_input_size, $_offset_pos, $_single_dim, @_gather, $_cs_one_flag,
$_exit_id, $_exit_pid, $_exit_status, $_exit_wid, $_len, $_sync_cnt,
- $_BSB_W_SOCK, $_BSE_W_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR,
+ $_BSB_W_SOCK, $_BSB_R_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR,
$_I_FLG, $_O_FLG, $_I_SEP, $_O_SEP, $_RS, $_RS_FLG, $_MCE_STDOUT,
@_delay_wid, $_size_completed, $_win32_ipc
);
@@ -148,7 +148,7 @@ sub _output_loop {
if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
if ($_sync_cnt == $_total_running) {
for my $_i (1 .. $_total_running) {
- MCE::Util::_syswrite($_BSB_W_SOCK, $LF);
+ syswrite($_BSB_W_SOCK, $LF);
}
undef $_syn_flag;
}
@@ -180,7 +180,7 @@ sub _output_loop {
if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
if ($_sync_cnt == $_total_running) {
for my $_i (1 .. $_total_running) {
- MCE::Util::_syswrite($_BSB_W_SOCK, $LF);
+ syswrite($_BSB_W_SOCK, $LF);
}
undef $_syn_flag;
}
@@ -599,7 +599,7 @@ sub _output_loop {
if (++$_sync_cnt == $_total_running) {
for my $_i (1 .. $_total_running) {
- MCE::Util::_syswrite($_BSB_W_SOCK, $LF);
+ syswrite($_BSB_W_SOCK, $LF);
}
undef $_syn_flag;
}
@@ -614,7 +614,7 @@ sub _output_loop {
: $self->{_total_running};
for my $_i (1 .. $_total_running) {
- MCE::Util::_syswrite($_BSE_W_SOCK, $LF);
+ syswrite($_BSB_R_SOCK, $LF);
}
}
@@ -622,7 +622,7 @@ sub _output_loop {
},
OUTPUT_S_IPC.$LF => sub { # Change to win32 IPC
- MCE::Util::_syswrite($_DAT_R_SOCK, $LF);
+ syswrite($_DAT_R_SOCK, $LF);
$_win32_ipc = 1, goto _LOOP unless $_win32_ipc;
@@ -801,12 +801,17 @@ sub _output_loop {
## Output event loop.
- my $_func; my $_channels = $self->{_dat_r_sock};
+ my $_channels = $self->{_dat_r_sock};
+ my $_func;
- $_win32_ipc = ( $ENV{'PERL_MCE_IPC'} eq 'win32' || $INC{'MCE/Hobo.pm'} );
+ $_win32_ipc = (
+ $ENV{'PERL_MCE_IPC'} eq 'win32' ||
+ $INC{'MCE/Child.pm'} ||
+ $INC{'MCE/Hobo.pm'}
+ );
$_BSB_W_SOCK = $self->{_bsb_w_sock};
- $_BSE_W_SOCK = $self->{_bse_w_sock};
+ $_BSB_R_SOCK = $self->{_bsb_r_sock};
$_DAT_R_SOCK = $self->{_dat_r_sock}->[0];
$_RS = $self->{RS} || $/;
@@ -869,7 +874,7 @@ sub _output_loop {
MCE::Util::_nonblocking($_DAT_R_SOCK, 1) if $_win32_ipc;
while ($self->{_total_running}) {
- MCE::Util::_sysread($_DAT_R_SOCK, $_func, 8);
+ MCE::Util::_sysread2($_DAT_R_SOCK, $_func, 8);
last() unless length($_func) == 8;
$_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
diff --git a/lib/MCE/Core/Validation.pm b/lib/MCE/Core/Validation.pm
index 627b61b..b806be7 100644
--- a/lib/MCE/Core/Validation.pm
+++ b/lib/MCE/Core/Validation.pm
@@ -14,7 +14,7 @@ package MCE::Core::Validation;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## Items below are folded into MCE.
@@ -256,7 +256,7 @@ sub _parse_chunk_size {
# Iterators may optionally use chunk_size to determine how much
# to return per iteration. The default is 1 for MCE Models, same
# as for the Core API. The user_func receives an array_ref
- # regardless if 1 or higher.
+ # regardless if 1 or greater.
#
# sub make_iter {
# ...
@@ -351,7 +351,7 @@ sub _parse_max_workers {
$_ncpu_ul = 8 if ($_ncpu_ul > 8);
if ($1 && $2) {
- local $@; $_max_workers = eval "int($_ncpu_ul $1 $2 + 0.5)";
+ local $@; $_max_workers = eval "int($_ncpu_ul $1 $2 + 0.5)"; ## no critic
$_max_workers = 1 if (!$_max_workers || $_max_workers < 1);
$_max_workers = $_ncpu if ($_max_workers > $_ncpu);
}
diff --git a/lib/MCE/Core/Worker.pm b/lib/MCE/Core/Worker.pm
index f4a0a6d..b952937 100644
--- a/lib/MCE/Core/Worker.pm
+++ b/lib/MCE/Core/Worker.pm
@@ -14,7 +14,7 @@ package MCE::Core::Worker;
use strict;
use warnings;
-our $VERSION = '1.838';
+our $VERSION = '1.843';
my $_has_threads = $INC{'threads.pm'} ? 1 : 0;
my $_tid = $_has_threads ? threads->tid() : 0;
@@ -288,7 +288,7 @@ use bytes;
};
$_dat_un = sub {
my $_pid = $_has_threads ? $$ .'.'. $_tid : $$;
- MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
if $_DAT_LOCK->{ $_pid };
};
}
@@ -488,8 +488,9 @@ sub _worker_do {
delete $self->{_wuf};
- ## Check nested Hobo workers not yet joined.
- MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
+ ## Check for nested workers not yet joined.
+ MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
+ MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
## Notify the main process a worker has completed.
local $\ = undef if (defined $\);
@@ -585,7 +586,7 @@ sub _worker_loop {
_worker_do($self, {}), next if ($_response eq "_data\n");
## Wait here until MCE completes job submission to all workers.
- MCE::Util::_sysread($self->{_bse_r_sock}, my($_b), 1);
+ MCE::Util::_sysread($self->{_bsb_w_sock}, my($_b), 1);
## Normal request.
if (defined $_job_delay && $_job_delay > 0.0) {
diff --git a/lib/MCE/Examples.pod b/lib/MCE/Examples.pod
index 07d94ed..f6a5b8c 100644
--- a/lib/MCE/Examples.pod
+++ b/lib/MCE/Examples.pod
@@ -5,7 +5,7 @@ MCE::Examples - Various examples and demonstrations
=head1 VERSION
-This document describes MCE::Examples version 1.838
+This document describes MCE::Examples version 1.843
=head1 INCLUDED WITH THE DISTRIBUTION
diff --git a/lib/MCE/Flow.pm b/lib/MCE/Flow.pm
index 5ff8d36..d43f11f 100644
--- a/lib/MCE/Flow.pm
+++ b/lib/MCE/Flow.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -480,7 +480,7 @@ MCE::Flow - Parallel flow model for building creative applications
=head1 VERSION
-This document describes MCE::Flow version 1.838
+This document describes MCE::Flow version 1.843
=head1 DESCRIPTION
@@ -715,7 +715,11 @@ inside the first block. Hence, the block is called once per each item.
## Array or array_ref
mce_flow sub { do_work($_) }, 1..10000;
- mce_flow sub { do_work($_) }, [ 1..10000 ];
+ mce_flow sub { do_work($_) }, \@list;
+
+ ## Important; pass an array_ref for deeply input data
+ mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_flow sub { do_work($_) }, \@deeply_list;
## File_path, glob_ref, or scalar_ref
mce_flow_f sub { chomp; do_work($_) }, "/path/to/file";
@@ -745,10 +749,15 @@ This means having to loop through the chunk from inside the first block.
## Looping inside the block is the same for mce_flow_f and
## mce_flow_s.
+ ## Array or array_ref
mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
+ mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
- ## Same as above, resembles code using the Core API.
+ ## Important; pass an array_ref for deeply input data
+ mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
+ ## Resembles code using the core MCE API
mce_flow sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -791,6 +800,8 @@ Specify C<Sereal => 0> to use Storable instead.
=item MCE::Flow::init { options }
+=back
+
The init function accepts a hash of MCE options. Unlike with MCE::Stream,
both gather and bounds_only options may be specified when calling init
(not shown below).
@@ -833,8 +844,6 @@ both gather and bounds_only options may be specified when calling init
7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
10000
-=back
-
Like with MCE::Flow::init above, MCE options may be specified using an
anonymous hash for the first argument. Notice how task_name, max_workers,
and use_threads can take an anonymous array for setting uniquely per
@@ -891,6 +900,8 @@ equals 1 in order to demonstrate all the possibilities for providing input data.
=item mce_flow sub { code }, list
+=back
+
Input data may be defined using a list, an array ref, or a hash ref.
Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Flow takes a
@@ -899,10 +910,15 @@ needed after the block.
# $_ contains the item when chunk_size => 1
- mce_flow sub { $_ }, 1..1000;
- mce_flow sub { $_ }, \@list;
+ mce_flow sub { do_work($_) }, 1..1000;
+ mce_flow sub { do_work($_) }, \@list;
+
+ # Important; pass an array_ref for deeply input data
- # chunking, any chunk_size => 1 or higher
+ mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_flow sub { do_work($_) }, \@deeply_list;
+
+ # Chunking; any chunk_size => 1 or greater
my %res = mce_flow sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -914,7 +930,7 @@ needed after the block.
},
\@list;
- # input hash, current API available since 1.828
+ # Input hash; current API available since 1.828
my %res = mce_flow sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -926,7 +942,7 @@ needed after the block.
},
\%hash;
- # unlike MCE::Loop, MCE::Flow doesn't need input to run
+ # Unlike MCE::Loop, MCE::Flow doesn't need input to run
mce_flow { max_workers => 4 }, sub {
MCE->say( MCE->wid );
@@ -947,7 +963,7 @@ needed after the block.
MCE->say( "consumer: ", MCE->wid );
};
- # here, options are specified via init
+ # Here, options are specified via init
MCE::Flow::init {
max_workers => [ 1, 3 ],
@@ -956,10 +972,14 @@ needed after the block.
mce_flow \&producer, \&consumers;
+=over 3
+
=item MCE::Flow->run_file ( sub { code }, file )
=item mce_flow_f sub { code }, file
+=back
+
The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves with zero interaction by the manager process.
@@ -969,7 +989,7 @@ position among themselves with zero interaction by the manager process.
mce_flow_f sub { $_ }, $file_handle;
mce_flow_f sub { $_ }, \$scalar;
- # chunking, any chunk_size => 1 or higher
+ # chunking, any chunk_size => 1 or greater
my %res = mce_flow_f sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -981,10 +1001,14 @@ position among themselves with zero interaction by the manager process.
},
"/path/to/file";
+=over 3
+
=item MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
=item mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
+=back
+
Sequence may be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).
@@ -1001,7 +1025,7 @@ optional. The format is passed to sprintf (% may be omitted below).
step => $step, format => $fmt
};
- # chunking, any chunk_size => 1 or higher
+ # chunking, any chunk_size => 1 or greater
my %res = mce_flow_s sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -1063,10 +1087,14 @@ Time was measured using 1 worker to emphasize the difference.
7500001 .. 8750000
8750001 .. 10000000
+=over 3
+
=item MCE::Flow->run ( { input_data => iterator }, sub { code } )
=item mce_flow { input_data => iterator }, sub { code }
+=back
+
An iterator reference may be specified for input_data. The only other way
is to specify input_data via MCE::Flow::init. This prevents MCE::Flow from
configuring the iterator reference as another user task which will not work.
@@ -1079,8 +1107,6 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
mce_flow sub { $_ };
-=back
-
=head1 GATHERING DATA
Unlike MCE::Map where gather and output order are done for you automatically,
@@ -1270,6 +1296,8 @@ running.
=item MCE::Flow::finish
+=back
+
Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.
@@ -1284,8 +1312,6 @@ longer needed.
MCE::Flow::finish;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Grep.pm b/lib/MCE/Grep.pm
index acefc81..2bbc8ac 100644
--- a/lib/MCE/Grep.pm
+++ b/lib/MCE/Grep.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -435,7 +435,7 @@ MCE::Grep - Parallel grep model similar to the native grep function
=head1 VERSION
-This document describes MCE::Grep version 1.838
+This document describes MCE::Grep version 1.843
=head1 SYNOPSIS
@@ -444,18 +444,22 @@ This document describes MCE::Grep version 1.838
## Array or array_ref
my @a = mce_grep { $_ % 5 == 0 } 1..10000;
- my @b = mce_grep { $_ % 5 == 0 } [ 1..10000 ];
+ my @b = mce_grep { $_ % 5 == 0 } \@list;
+
+ ## Important; pass an array_ref for deeply input data
+ my @c = mce_grep { $_->[1] % 2 == 0 } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ my @d = mce_grep { $_->[1] % 2 == 0 } \@deeply_list;
## File_path, glob_ref, or scalar_ref
- my @c = mce_grep_f { /pattern/ } "/path/to/file";
- my @d = mce_grep_f { /pattern/ } $file_handle;
- my @e = mce_grep_f { /pattern/ } \$scalar;
+ my @e = mce_grep_f { /pattern/ } "/path/to/file";
+ my @f = mce_grep_f { /pattern/ } $file_handle;
+ my @g = mce_grep_f { /pattern/ } \$scalar;
## Sequence of numbers (begin, end [, step, format])
- my @f = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5;
- my @g = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ];
+ my @h = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5;
+ my @i = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ];
- my @h = mce_grep_s { %_ * 3 == 0 } {
+ my @j = mce_grep_s { %_ * 3 == 0 } {
begin => 1, end => 10000, step => 5, format => undef
};
@@ -604,6 +608,8 @@ Specify C<Sereal => 0> to use Storable instead.
=item MCE::Grep::init { options }
+=back
+
The init function accepts a hash of MCE options. The gather option, if
specified, is ignored due to being used internally by the module.
@@ -638,8 +644,6 @@ specified, is ignored due to being used internally by the module.
5 10 15 20 25 30 35 40 45 50 55 60 65 70 75 80 85 90 95 100
-=back
-
=head1 API DOCUMENTATION
=over 3
@@ -648,18 +652,30 @@ specified, is ignored due to being used internally by the module.
=item mce_grep { code } list
+=back
+
Input data may be defined using a list or an array reference. Unlike MCE::Loop,
Flow, and Step, specifying a hash reference as input data isn't allowed.
+ ## Array or array_ref
my @a = mce_grep { /[2357]/ } 1..1000;
my @b = mce_grep { /[2357]/ } \@list;
- my @z = mce_grep { /[2357]/ } \%hash; # not supported
+ ## Important; pass an array_ref for deeply input data
+ my @c = mce_grep { $_->[1] =~ /[2357]/ } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ my @d = mce_grep { $_->[1] =~ /[2357]/ } \@deeply_list;
+
+ ## Not supported
+ my @z = mce_grep { ... } \%hash;
+
+=over 3
=item MCE::Grep->run_file ( sub { code }, file )
=item mce_grep_f { code } file
+=back
+
The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves with zero interaction by the manager process.
@@ -667,10 +683,14 @@ position among themselves with zero interaction by the manager process.
my @d = mce_grep_f { /pattern/ } $file_handle;
my @e = mce_grep_f { /pattern/ } \$scalar;
+=over 3
+
=item MCE::Grep->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
=item mce_grep_s { code } $beg, $end [, $step, $fmt ]
+=back
+
Sequence may be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).
@@ -685,17 +705,19 @@ optional. The format is passed to sprintf (% may be omitted below).
step => $step, format => $fmt
};
+=over 3
+
=item MCE::Grep->run ( sub { code }, iterator )
=item mce_grep { code } iterator
+=back
+
An iterator reference may be specified for input_data. Iterators are described
under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
my @a = mce_grep { $_ % 3 == 0 } make_iterator(10, 30, 2);
-=back
-
=head1 MANUAL SHUTDOWN
=over 3
@@ -704,6 +726,8 @@ under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
=item MCE::Grep::finish
+=back
+
Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.
@@ -718,8 +742,6 @@ longer needed.
MCE::Grep::finish;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Loop.pm b/lib/MCE/Loop.pm
index 22256bf..5b8e7b3 100644
--- a/lib/MCE/Loop.pm
+++ b/lib/MCE/Loop.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -350,7 +350,7 @@ MCE::Loop - MCE model for building parallel loops
=head1 VERSION
-This document describes MCE::Loop version 1.838
+This document describes MCE::Loop version 1.843
=head1 DESCRIPTION
@@ -443,7 +443,11 @@ inside the block. Hence, the block is called once per each item.
## Array or array_ref
mce_loop { do_work($_) } 1..10000;
- mce_loop { do_work($_) } [ 1..10000 ];
+ mce_loop { do_work($_) } \@list;
+
+ ## Important; pass an array_ref for deeply input data
+ mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_loop { do_work($_) } \@deeply_list;
## File_path, glob_ref, or scalar_ref
mce_loop_f { chomp; do_work($_) } "/path/to/file";
@@ -473,10 +477,15 @@ This means having to loop through the chunk from inside the block.
## Looping inside the block is the same for mce_loop_f and
## mce_loop_s.
+ ## Array or array_ref
mce_loop { do_work($_) for (@{ $_ }) } 1..10000;
+ mce_loop { do_work($_) for (@{ $_ }) } \@list;
- ## Same as above, resembles code using the Core API.
+ ## Important; pass an array_ref for deeply input data
+ mce_loop { do_work($_) for (@{ $_ }) } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_loop { do_work($_) for (@{ $_ }) } \@deeply_list;
+ ## Resembles code using the core MCE API
mce_loop {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -519,6 +528,8 @@ Specify C<Sereal => 0> to use Storable instead.
=item MCE::Loop::init { options }
+=back
+
The init function accepts a hash of MCE options.
use MCE::Loop;
@@ -559,8 +570,6 @@ The init function accepts a hash of MCE options.
7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
10000
-=back
-
=head1 API DOCUMENTATION
The following assumes chunk_size equals 1 in order to demonstrate all the
@@ -572,14 +581,21 @@ possibilities for providing input data.
=item mce_loop { code } list
+=back
+
Input data may be defined using a list, an array ref, or a hash ref.
# $_ contains the item when chunk_size => 1
- mce_loop { $_ } 1..1000;
- mce_loop { $_ } \@list;
+ mce_loop { do_work($_) } 1..1000;
+ mce_loop { do_work($_) } \@list;
+
+ # Important; pass an array_ref for deeply input data
- # chunking, any chunk_size => 1 or higher
+ mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_loop { do_work($_) } \@deeply_list;
+
+ # Chunking; any chunk_size => 1 or greater
my %res = mce_loop {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -591,7 +607,7 @@ Input data may be defined using a list, an array ref, or a hash ref.
}
\@list;
- # input hash, current API available since 1.828
+ # Input hash; current API available since 1.828
my %res = mce_loop {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -603,10 +619,14 @@ Input data may be defined using a list, an array ref, or a hash ref.
}
\%hash;
+=over 3
+
=item MCE::Loop->run_file ( sub { code }, file )
=item mce_loop_f { code } file
+=back
+
The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves with zero interaction by the manager process.
@@ -616,7 +636,7 @@ position among themselves with zero interaction by the manager process.
mce_loop_f { $_ } $file_handle;
mce_loop_f { $_ } \$scalar;
- # chunking, any chunk_size => 1 or higher
+ # chunking, any chunk_size => 1 or greater
my %res = mce_loop_f {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -628,10 +648,14 @@ position among themselves with zero interaction by the manager process.
}
"/path/to/file";
+=over 3
+
=item MCE::Loop->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
=item mce_loop_s { code } $beg, $end [, $step, $fmt ]
+=back
+
Sequence may be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).
@@ -648,7 +672,7 @@ optional. The format is passed to sprintf (% may be omitted below).
step => $step, format => $fmt
};
- # chunking, any chunk_size => 1 or higher
+ # chunking, any chunk_size => 1 or greater
my %res = mce_loop_s {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -710,17 +734,19 @@ Time was measured using 1 worker to emphasize the difference.
7500001 .. 8750000
8750001 .. 10000000
+=over 3
+
=item MCE::Loop->run ( sub { code }, iterator )
=item mce_loop { code } iterator
+=back
+
An iterator reference may be specified for input_data. Iterators are described
under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
mce_loop { $_ } make_iterator(10, 30, 2);
-=back
-
=head1 GATHERING DATA
Unlike MCE::Map where gather and output order are done for you automatically,
@@ -898,6 +924,8 @@ The following does the same thing using the Core API.
=item MCE::Loop::finish
+=back
+
Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.
@@ -912,8 +940,6 @@ longer needed.
MCE::Loop::finish;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Map.pm b/lib/MCE/Map.pm
index 8acab1b..ad90580 100644
--- a/lib/MCE/Map.pm
+++ b/lib/MCE/Map.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -435,7 +435,7 @@ MCE::Map - Parallel map model similar to the native map function
=head1 VERSION
-This document describes MCE::Map version 1.838
+This document describes MCE::Map version 1.843
=head1 SYNOPSIS
@@ -444,18 +444,22 @@ This document describes MCE::Map version 1.838
## Array or array_ref
my @a = mce_map { $_ * $_ } 1..10000;
- my @b = mce_map { $_ * $_ } [ 1..10000 ];
+ my @b = mce_map { $_ * $_ } \@list;
+
+ ## Important; pass an array_ref for deeply input data
+ my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list;
## File_path, glob_ref, or scalar_ref
- my @c = mce_map_f { chomp; $_ } "/path/to/file";
- my @d = mce_map_f { chomp; $_ } $file_handle;
- my @e = mce_map_f { chomp; $_ } \$scalar;
+ my @e = mce_map_f { chomp; $_ } "/path/to/file";
+ my @f = mce_map_f { chomp; $_ } $file_handle;
+ my @g = mce_map_f { chomp; $_ } \$scalar;
## Sequence of numbers (begin, end [, step, format])
- my @f = mce_map_s { $_ * $_ } 1, 10000, 5;
- my @g = mce_map_s { $_ * $_ } [ 1, 10000, 5 ];
+ my @h = mce_map_s { $_ * $_ } 1, 10000, 5;
+ my @i = mce_map_s { $_ * $_ } [ 1, 10000, 5 ];
- my @h = mce_map_s { $_ * $_ } {
+ my @j = mce_map_s { $_ * $_ } {
begin => 1, end => 10000, step => 5, format => undef
};
@@ -539,6 +543,8 @@ Specify C<Sereal => 0> to use Storable instead.
=item MCE::Map::init { options }
+=back
+
The init function accepts a hash of MCE options. The gather option, if
specified, is ignored due to being used internally by the module.
@@ -580,8 +586,6 @@ specified, is ignored due to being used internally by the module.
7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
10000
-=back
-
=head1 API DOCUMENTATION
=over 3
@@ -590,18 +594,30 @@ specified, is ignored due to being used internally by the module.
=item mce_map { code } list
+=back
+
Input data may be defined using a list or an array reference. Unlike MCE::Loop,
Flow, and Step, specifying a hash reference as input data isn't allowed.
+ ## Array or array_ref
my @a = mce_map { $_ * 2 } 1..1000;
my @b = mce_map { $_ * 2 } \@list;
- my @z = mce_map { $_ * 2 } \%hash; # not supported
+ ## Important; pass an array_ref for deeply input data
+ my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ];
+ my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list;
+
+ ## Not supported
+ my @z = mce_map { ... } \%hash;
+
+=over 3
=item MCE::Map->run_file ( sub { code }, file )
=item mce_map_f { code } file
+=back
+
The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves with zero interaction by the manager process.
@@ -609,10 +625,14 @@ position among themselves with zero interaction by the manager process.
my @d = mce_map_f { chomp; $_ . "\r\n" } $file_handle;
my @e = mce_map_f { chomp; $_ . "\r\n" } \$scalar;
+=over 3
+
=item MCE::Map->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
=item mce_map_s { code } $beg, $end [, $step, $fmt ]
+=back
+
Sequence may be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).
@@ -627,17 +647,19 @@ optional. The format is passed to sprintf (% may be omitted below).
step => $step, format => $fmt
};
+=over 3
+
=item MCE::Map->run ( sub { code }, iterator )
=item mce_map { code } iterator
+=back
+
An iterator reference may be specified for input_data. Iterators are described
under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
my @a = mce_map { $_ * 2 } make_iterator(10, 30, 2);
-=back
-
=head1 MANUAL SHUTDOWN
=over 3
@@ -646,6 +668,8 @@ under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
=item MCE::Map::finish
+=back
+
Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.
@@ -660,8 +684,6 @@ longer needed.
MCE::Map::finish;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Mutex.pm b/lib/MCE/Mutex.pm
index 8ec75cf..19f5db7 100644
--- a/lib/MCE/Mutex.pm
+++ b/lib/MCE/Mutex.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -20,18 +20,18 @@ use Carp ();
sub new {
my ($class, %argv) = @_;
+ my $impl = defined($argv{'impl'})
+ ? $argv{'impl'} : defined($argv{'path'}) ? 'Flock' : 'Channel';
- my $pkg = (defined $argv{'impl'})
- ? $argv{'impl'} : (defined $argv{'path'}) ? 'Flock' : 'Channel';
+ $impl = ucfirst( lc $impl );
- $pkg = ucfirst( lc $pkg );
+ eval "require MCE::Mutex::$impl; 1" ||
+ Carp::croak("Could not load Mutex implementation '$impl': $@");
- if (eval "require MCE::Mutex::$pkg; 1") {
- no strict 'refs'; $pkg = 'MCE::Mutex::'.$pkg;
- return $pkg->new(%argv);
- }
+ my $pkg = 'MCE::Mutex::'.$impl;
+ no strict 'refs';
- Carp::croak("Could not load Mutex implementation $pkg: $@");
+ return $pkg->new( %argv );
}
## base class methods
@@ -68,7 +68,7 @@ MCE::Mutex - Locking for Many-Core Engine
=head1 VERSION
-This document describes MCE::Mutex version 1.838
+This document describes MCE::Mutex version 1.843
=head1 SYNOPSIS
diff --git a/lib/MCE/Mutex/Channel.pm b/lib/MCE/Mutex/Channel.pm
index f74d798..f92080a 100644
--- a/lib/MCE/Mutex/Channel.pm
+++ b/lib/MCE/Mutex/Channel.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized once );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
use base 'MCE::Mutex';
use Scalar::Util qw(refaddr weaken);
@@ -29,27 +29,38 @@ sub CLONE {
sub DESTROY {
my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_);
- MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0
- if $obj->{ $pid };
+ syswrite($obj->{_w_sock}, '0'), $obj->{$pid } = 0 if $obj->{$pid };
+ syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'};
- if ($obj->{'_init_pid'} eq $pid) {
+ if ( $obj->{_init_pid} eq $pid ) {
my $addr = refaddr $obj;
- ($^O eq 'MSWin32')
+ ($^O eq 'MSWin32' && $obj->{impl} eq 'Channel')
? MCE::Util::_destroy_pipes($obj, qw(_w_sock _r_sock))
: MCE::Util::_destroy_socks($obj, qw(_w_sock _r_sock));
- @MUTEX = map { refaddr($_) == $addr ? () : $_ } @MUTEX;
+ if ( ! $has_threads ) {
+ @MUTEX = map { refaddr($_) == $addr ? () : $_ } @MUTEX;
+ }
}
return;
}
sub _destroy {
- # Called by MCE::_exit && MCE::Hobo::_exit. Must iterate a copy.
+ # Called by { MCE, MCE::Child, and MCE::Hobo }::_exit.
+ # This must iterate a copy.
+
if ( @MUTEX ) { local $_; &DESTROY($_) for @{[ @MUTEX ]}; }
}
+sub _save_for_global_destruction {
+ if ( ! $has_threads ) {
+ push @MUTEX, $_[0];
+ weaken $MUTEX[-1];
+ }
+}
+
###############################################################################
## ----------------------------------------------------------------------------
## Public methods.
@@ -64,13 +75,15 @@ sub new {
? MCE::Util::_pipe_pair(\%obj, qw(_r_sock _w_sock))
: MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock));
- MCE::Util::_syswrite($obj{_w_sock}, '0');
+ syswrite $obj{_w_sock}, '0';
+
+ bless \%obj, $class;
- if (caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/) {
- push(@MUTEX, \%obj); weaken($MUTEX[-1]);
+ if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) {
+ MCE::Mutex::Channel::_save_for_global_destruction(\%obj);
}
- return bless(\%obj, $class);
+ return \%obj;
}
sub lock {
@@ -88,7 +101,7 @@ sub lock {
sub unlock {
my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_);
- MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0
+ syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0
if $obj->{ $pid };
return;
@@ -108,7 +121,7 @@ sub synchronize {
? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
: $code->(@_);
- MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0;
+ syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0;
return wantarray ? @ret : $ret[-1];
}
@@ -131,7 +144,7 @@ MCE::Mutex::Channel - Mutex locking via a pipe or socket
=head1 VERSION
-This document describes MCE::Mutex::Channel version 1.838
+This document describes MCE::Mutex::Channel version 1.843
=head1 DESCRIPTION
diff --git a/lib/MCE/Mutex/Channel2.pm b/lib/MCE/Mutex/Channel2.pm
new file mode 100644
index 0000000..eecc8ac
--- /dev/null
+++ b/lib/MCE/Mutex/Channel2.pm
@@ -0,0 +1,162 @@
+###############################################################################
+## ----------------------------------------------------------------------------
+## MCE::Mutex::Channel2 - Provides two mutexes using a single channel.
+##
+###############################################################################
+
+package MCE::Mutex::Channel2;
+
+use strict;
+use warnings;
+
+no warnings qw( threads recursion uninitialized once );
+
+our $VERSION = '1.843';
+
+use base 'MCE::Mutex::Channel';
+use MCE::Util ();
+
+my $has_threads = $INC{'threads.pm'} ? 1 : 0;
+my $tid = $has_threads ? threads->tid() : 0;
+
+sub CLONE {
+ $tid = threads->tid() if $has_threads;
+}
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Public methods.
+##
+###############################################################################
+
+sub new {
+ my ($class, %obj) = (@_, impl => 'Channel2');
+ $obj{'_init_pid'} = $has_threads ? $$ .'.'. $tid : $$;
+
+ MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock));
+
+ syswrite $obj{_r_sock}, '0';
+ syswrite $obj{_w_sock}, '0';
+
+ bless \%obj, $class;
+
+ if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) {
+ MCE::Mutex::Channel::_save_for_global_destruction(\%obj);
+ }
+
+ return \%obj;
+}
+
+sub lock2 {
+ my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_);
+
+ MCE::Util::_sysread($obj->{_w_sock}, my($b), 1), $obj->{ $pid.'b' } = 1
+ unless $obj->{ $pid.'b' };
+
+ return;
+}
+
+*lock_exclusive2 = \&lock2;
+*lock_shared2 = \&lock2;
+
+sub unlock2 {
+ my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_);
+
+ syswrite($obj->{_r_sock}, '0'), $obj->{ $pid.'b' } = 0
+ if $obj->{ $pid.'b' };
+
+ return;
+}
+
+sub synchronize2 {
+ my ($pid, $obj, $code, @ret) = (
+ $has_threads ? $$ .'.'. $tid : $$, shift, shift
+ );
+ return unless ref($code) eq 'CODE';
+
+ # lock, run, unlock - inlined for performance
+ MCE::Util::_sysread($obj->{_w_sock}, my($b), 1), $obj->{ $pid.'b' } = 1
+ unless $obj->{ $pid.'b' };
+
+ (defined wantarray)
+ ? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
+ : $code->(@_);
+
+ syswrite($obj->{_r_sock}, '0'), $obj->{ $pid.'b' } = 0;
+
+ return wantarray ? @ret : $ret[-1];
+}
+
+*enter2 = \&synchronize2;
+
+sub timedwait2 {
+ my ($obj, $timeout) = @_;
+
+ local $@; local $SIG{'ALRM'} = sub { alarm 0; die "timed out\n" };
+
+ eval { alarm $timeout || 1; $obj->lock_exclusive2 };
+
+ alarm 0;
+
+ ( $@ && $@ eq "timed out\n" ) ? '' : 1;
+}
+
+1;
+
+__END__
+
+###############################################################################
+## ----------------------------------------------------------------------------
+## Module usage.
+##
+###############################################################################
+
+=head1 NAME
+
+MCE::Mutex::Channel2 - Provides two mutexes using a single channel
+
+=head1 VERSION
+
+This document describes MCE::Mutex::Channel2 version 1.843
+
+=head1 DESCRIPTION
+
+A socket implementation based on L<MCE::Mutex>. The secondary lock is accessed
+by calling methods, described in L<MCE::Mutex>, with the C<2> suffix.
+
+ my $mutex = MCE::Mutex->new( impl => 'Channel2' );
+
+=head2 primary lock
+
+=over 3
+
+=item * $mutex->lock
+
+=item * $mutex->unlock
+
+=item * $mutex->synchronize
+
+=item * $mutex->timedwait
+
+=back
+
+=head2 secondary lock
+
+=over 3
+
+=item * $mutex->lock2
+
+=item * $mutex->unlock2
+
+=item * $mutex->synchronize2
+
+=item * $mutex->timedwait2
+
+=back
+
+=head1 AUTHOR
+
+Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
+
+=cut
+
diff --git a/lib/MCE/Mutex/Flock.pm b/lib/MCE/Mutex/Flock.pm
index 5c56c47..bbdaa98 100644
--- a/lib/MCE/Mutex/Flock.pm
+++ b/lib/MCE/Mutex/Flock.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized once );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
use base 'MCE::Mutex';
use Fcntl ':flock';
@@ -36,7 +36,6 @@ sub DESTROY {
sub _open {
my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_);
-
return if exists $obj->{ $pid };
open $obj->{_fh}, '+>>:raw:stdio', $obj->{path}
@@ -150,8 +149,7 @@ sub synchronize {
my ($pid, $obj, $code, @ret) = (
$has_threads ? $$ .'.'. $tid : $$, shift, shift
);
-
- return if ref($code) ne 'CODE';
+ return unless ref($code) eq 'CODE';
$obj->_open() unless exists $obj->{ $pid };
@@ -186,7 +184,7 @@ MCE::Mutex::Flock - Mutex locking via Fcntl
=head1 VERSION
-This document describes MCE::Mutex::Flock version 1.838
+This document describes MCE::Mutex::Flock version 1.843
=head1 DESCRIPTION
diff --git a/lib/MCE/Queue.pm b/lib/MCE/Queue.pm
index 88a99a1..f15b343 100644
--- a/lib/MCE/Queue.pm
+++ b/lib/MCE/Queue.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (Subroutines::ProhibitExplicitReturnUndef)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -86,7 +86,7 @@ sub import {
use constant {
MAX_DQ_DEPTH => 192, # Maximum dequeue notifications
- MUTEX_LOCKS => 6, # Number of mutex locks for 1st level defense
+ MUTEX_LOCKS => 5, # Number of mutex locks for 1st level defense
# against many workers waiting to dequeue
OUTPUT_W_QUE => 'W~QUE', # Await from the queue
@@ -221,7 +221,7 @@ sub new {
MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock)) if $_Q->{_await};
if (exists $_argv{queue} && scalar @{ $_argv{queue} }) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
return $_Q;
@@ -407,7 +407,7 @@ sub _heap_insert_high {
$_Q->{_tsem} = $_t;
if ($_Q->pending() <= $_t) {
- MCE::Util::_syswrite($_Q->{_aw_sock}, $LF);
+ syswrite($_Q->{_aw_sock}, $LF);
} else {
$_Q->{_asem} += 1;
}
@@ -460,7 +460,7 @@ sub _heap_insert_high {
return;
}
if (!$_Q->{_nb_flag} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
push @{ $_Q->{_datq} }, @{ $_MCE->{thaw}($_buf) };
}
@@ -504,7 +504,7 @@ sub _heap_insert_high {
return;
}
if (!$_Q->{_nb_flag} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
push @{ $_Q->{_datq} }, $_;
}
@@ -564,7 +564,7 @@ sub _heap_insert_high {
if ($_pending) {
$_pending = MAX_DQ_DEPTH if ($_pending > MAX_DQ_DEPTH);
for my $_i (1 .. $_pending) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
}
$_Q->{_dsem} = $_pending;
@@ -575,11 +575,11 @@ sub _heap_insert_high {
}
else {
## Otherwise, never to exceed one byte in the channel
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data();
+ syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data();
}
if (exists $_Q->{_ended} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
if ($_cnt) {
@@ -587,7 +587,7 @@ sub _heap_insert_high {
print {$_DAU_R_SOCK} length($_buf).'1'.$LF, $_buf;
}
elsif (defined $_buf) {
- if (!looks_like_number $_buf && !ref $_buf) {
+ if (!ref $_buf) {
print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf;
} else {
$_buf = $_MCE->{freeze}([ $_buf ]);
@@ -600,7 +600,7 @@ sub _heap_insert_high {
if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
for my $_i (1 .. $_Q->{_asem}) {
- MCE::Util::_syswrite($_Q->{_aw_sock}, $LF);
+ syswrite($_Q->{_aw_sock}, $LF);
}
$_Q->{_asem} = 0;
}
@@ -626,7 +626,7 @@ sub _heap_insert_high {
my $_buf = $_Q->_dequeue();
if (defined $_buf) {
- if (!looks_like_number $_buf && !ref $_buf) {
+ if (!ref $_buf) {
print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf;
} else {
$_buf = $_MCE->{freeze}([ $_buf ]);
@@ -660,7 +660,7 @@ sub _heap_insert_high {
if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
for my $_i (1 .. $_Q->{_asem}) {
- MCE::Util::_syswrite($_Q->{_aw_sock}, $LF);
+ syswrite($_Q->{_aw_sock}, $LF);
}
$_Q->{_asem} = 0;
}
@@ -732,7 +732,7 @@ sub _heap_insert_high {
print {$_DAU_R_SOCK} '-1'.$LF;
}
else {
- if (!looks_like_number $_buf && !ref $_buf) {
+ if (!ref $_buf) {
print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf;
} else {
$_buf = $_MCE->{freeze}([ $_buf ]);
@@ -757,7 +757,7 @@ sub _heap_insert_high {
print {$_DAU_R_SOCK} '-1'.$LF;
}
else {
- if (!looks_like_number $_buf && !ref $_buf) {
+ if (!ref $_buf) {
print {$_DAU_R_SOCK} length($_buf).'0'.$LF, $_buf;
} else {
$_buf = $_MCE->{freeze}([ $_buf ]);
@@ -858,7 +858,7 @@ sub _mce_m_end {
my ($_Q) = @_;
if (!exists $_Q->{_ended}) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) unless $_Q->{_nb_flag};
+ syswrite($_Q->{_qw_sock}, $LF) unless $_Q->{_nb_flag};
$_Q->{_ended} = undef;
}
@@ -877,7 +877,7 @@ sub _mce_m_enqueue {
return;
}
if (!$_Q->{_nb_flag} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
## Append item(s) into the queue.
@@ -901,7 +901,7 @@ sub _mce_m_enqueuep {
return;
}
if (!$_Q->{_nb_flag} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
$_Q->_enqueuep($_p, @_);
@@ -945,7 +945,7 @@ sub _mce_m_dequeue {
if ($_pending) {
$_pending = MAX_DQ_DEPTH if ($_pending > MAX_DQ_DEPTH);
for my $_i (1 .. $_pending) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
}
$_Q->{_dsem} = $_pending;
@@ -956,11 +956,11 @@ sub _mce_m_dequeue {
}
else {
## Otherwise, never to exceed one byte in the channel
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data();
+ syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data();
}
if (exists $_Q->{_ended} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
$_Q->{_nb_flag} = 0;
@@ -1040,7 +1040,7 @@ sub _mce_m_insert {
return;
}
if (!$_Q->{_nb_flag} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
if (abs($_i) > scalar @{ $_Q->{_datq} }) {
@@ -1088,7 +1088,7 @@ sub _mce_m_insertp {
return;
}
if (!$_Q->{_nb_flag} && !$_Q->_has_data()) {
- MCE::Util::_syswrite($_Q->{_qw_sock}, $LF);
+ syswrite($_Q->{_qw_sock}, $LF);
}
if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
@@ -1275,7 +1275,7 @@ sub _mce_m_heap {
};
$_dat_un = sub {
my $_pid = $_has_threads ? $$ .'.'. $_tid : $$;
- MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
if $_DAT_LOCK->{ $_pid };
};
}
@@ -1355,7 +1355,7 @@ sub _mce_m_heap {
my $_buf = $_Q->{_id}.$LF . length($_tmp).$LF;
$_req1->(OUTPUT_A_QUE, $_buf, $_tmp);
}
- elsif (!looks_like_number $_[0] && defined $_[0]) {
+ elsif (defined $_[0]) {
my $_buf = $_Q->{_id}.$LF . length($_[0]).$LF;
$_req1->(OUTPUT_S_QUE, $_buf, $_[0]);
}
@@ -1381,7 +1381,7 @@ sub _mce_m_heap {
my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_tmp).$LF;
$_req1->(OUTPUT_A_QUP, $_buf, $_tmp);
}
- elsif (!looks_like_number $_[0] && defined $_[0]) {
+ elsif (defined $_[0]) {
my $_buf = $_Q->{_id}.$LF . $_p.$LF . length($_[0]).$LF;
$_req1->(OUTPUT_S_QUP, $_buf, $_[0]);
}
@@ -1497,7 +1497,7 @@ sub _mce_m_heap {
my ($_buf, $_tmp);
- if (scalar @_ > 1 || looks_like_number $_[0] || ref $_[0] || !defined $_[0]) {
+ if (scalar @_ > 1 || ref $_[0] || !defined $_[0]) {
$_tmp = $_MCE->{freeze}([ @_ ]);
$_buf = $_Q->{_id}.$LF . $_i.$LF . (length($_tmp) + 1).$LF . $_tmp.'1';
} else {
@@ -1523,7 +1523,7 @@ sub _mce_m_heap {
my ($_buf, $_tmp);
- if (scalar @_ > 1 || looks_like_number $_[0] || ref $_[0] || !defined $_[0]) {
+ if (scalar @_ > 1 || ref $_[0] || !defined $_[0]) {
$_tmp = $_MCE->{freeze}([ @_ ]);
$_buf = $_Q->{_id}.$LF . $_p.$LF . $_i.$LF .
(length($_tmp) + 1).$LF . $_tmp.'1';
@@ -1602,7 +1602,7 @@ MCE::Queue - Hybrid (normal and priority) queues
=head1 VERSION
-This document describes MCE::Queue version 1.838
+This document describes MCE::Queue version 1.843
=head1 SYNOPSIS
@@ -2022,21 +2022,21 @@ numbers, not the data.
=over 3
-=item L<List::BinarySearch>
+=item * L<List::BinarySearch>
The bsearch_num_pos method was helpful for accommodating the highest and lowest
order in MCE::Queue.
-=item L<POE::Queue::Array>
+=item * L<POE::Queue::Array>
For extra optimization, two if statements were adopted for checking if the item
belongs at the end or head of the queue.
-=item L<List::Priority>
+=item * L<List::Priority>
MCE::Queue supports both normal and priority queues.
-=item L<Thread::Queue>
+=item * L<Thread::Queue>
Thread::Queue is used as a template for identifying and documenting the methods.
@@ -2051,7 +2051,7 @@ simultaneously; e.g.
$q->pending(); # counts both normal/priority queues
-=item L<Parallel::DataPipe>
+=item * L<Parallel::DataPipe>
The recursion example, in the synopsis above, was largely adopted from this
module.
diff --git a/lib/MCE/Relay.pm b/lib/MCE/Relay.pm
index 5df036c..ca3c8c9 100644
--- a/lib/MCE/Relay.pm
+++ b/lib/MCE/Relay.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -343,7 +343,7 @@ MCE::Relay - Extends Many-Core Engine with relay capabilities
=head1 VERSION
-This document describes MCE::Relay version 1.838
+This document describes MCE::Relay version 1.843
=head1 SYNOPSIS
@@ -412,6 +412,8 @@ across all platforms.
=item MCE::relay { code }
+=back
+
Relay is enabled by specifying the init_relay option which takes a hash or array
reference, or a scalar value. Relaying is orderly and driven by chunk_id when
processing data, otherwise task_wid. Omitting the code block (e.g. MCE::relay)
@@ -519,8 +521,12 @@ Or simply a scalar value.
4: 1003
4012 size
+=over 3
+
=item MCE->relay_final ( void )
+=back
+
Call this method to obtain the final relay value(s) after running. See included
example findnull.pl for another use case.
@@ -550,8 +556,12 @@ example findnull.pl for another use case.
40 : 180
+=over 3
+
=item MCE->relay_recv ( void )
+=back
+
Call this method to obtain the next relay value before relaying. This allows
serial-code to be processed orderly between workers. The following is a parallel
demonstration for the fasta-benchmark on the web.
@@ -718,10 +728,14 @@ demonstration for the fasta-benchmark on the web.
say ">THREE Homo sapiens frequency";
make_random_fasta( $homosapiens, $n * 5 );
+=over 3
+
=item MCE->relay_lock ( void )
=item MCE->relay_unlock ( void )
+=back
+
The C<relay_lock> and C<relay_unlock> methods, added to MCE 1.807, are
aliases for C<relay_recv> and C<relay> respectively. They allow one to
perform an exclusive action prior to actual relaying of data.
@@ -920,8 +934,6 @@ Here, workers write exclusively and orderly to C<STDOUT>.
say ">THREE Homo sapiens frequency";
make_random_fasta( $homosapiens, $n * 5 );
-=back
-
=head1 GATHER AND RELAY DEMONSTRATIONS
I received a request from John Martel to process a large flat file and expand
@@ -959,6 +971,8 @@ while preserving output order.
=item Example One
+=back
+
This example configures a custom function for preserving output order.
Unfortunately, the sprintf function alone involves extra CPU time causing
the manager process to fall behind. Thus, workers may idle while waiting
@@ -1019,8 +1033,12 @@ for the manager process to respond to the gather request.
MCE::Loop::finish();
close $fh_out;
+=over 3
+
=item Example Two
+=back
+
In this example, workers obtain the current ID value and increment/relay for
the next worker, ordered by chunk ID behind the scene. Workers call sprintf
in parallel, allowing the manager process (out_iter_fh) to accommodate up to
@@ -1075,8 +1093,6 @@ not call relay more than once per chunk. Doing so will cause IPC to stall.
MCE::Loop::finish();
close $fh_out;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Signal.pm b/lib/MCE/Signal.pm
index a682e59..64702ae 100644
--- a/lib/MCE/Signal.pm
+++ b/lib/MCE/Signal.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
@@ -428,7 +428,7 @@ MCE::Signal - Temporary directory creation/cleanup and signal handling
=head1 VERSION
-This document describes MCE::Signal version 1.838
+This document describes MCE::Signal version 1.843
=head1 SYNOPSIS
diff --git a/lib/MCE/Step.pm b/lib/MCE/Step.pm
index 6c9c8cd..d867da9 100644
--- a/lib/MCE/Step.pm
+++ b/lib/MCE/Step.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -716,7 +716,7 @@ MCE::Step - Parallel step model for building creative steps
=head1 VERSION
-This document describes MCE::Step version 1.838
+This document describes MCE::Step version 1.843
=head1 DESCRIPTION
@@ -903,7 +903,11 @@ inside the first block. Hence, the block is called once per each item.
## Array or array_ref
mce_step sub { do_work($_) }, 1..10000;
- mce_step sub { do_work($_) }, [ 1..10000 ];
+ mce_step sub { do_work($_) }, \@list;
+
+ ## Important; pass an array_ref for deeply input data
+ mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_step sub { do_work($_) }, \@deeply_list;
## File_path, glob_ref, or scalar_ref
mce_step_f sub { chomp; do_work($_) }, "/path/to/file";
@@ -933,10 +937,15 @@ This means having to loop through the chunk from inside the first block.
## Looping inside the block is the same for mce_step_f and
## mce_step_s.
+ ## Array or array_ref
mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
+ mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
- ## Same as above, resembles code using the Core API.
+ ## Important; pass an array_ref for deeply input data
+ mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
+ ## Resembles code using the core MCE API
mce_step sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -980,6 +989,8 @@ Specify C<Sereal => 0> to use Storable instead.
=item MCE::Step::init { options }
+=back
+
The init function accepts a hash of MCE options. Unlike with MCE::Stream,
both gather and bounds_only options may be specified when calling init
(not shown below).
@@ -1022,8 +1033,6 @@ both gather and bounds_only options may be specified when calling init
7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
10000
-=back
-
Like with MCE::Step::init above, MCE options may be specified using an
anonymous hash for the first argument. Notice how task_name, max_workers,
and use_threads can take an anonymous array for setting uniquely per
@@ -1088,6 +1097,8 @@ equals 1 in order to demonstrate all the possibilities for providing input data.
=item mce_step sub { code }, list
+=back
+
Input data may be defined using a list, an array ref, or a hash ref.
Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Step takes a
@@ -1096,10 +1107,15 @@ needed after the block.
# $_ contains the item when chunk_size => 1
- mce_step sub { $_ }, 1..1000;
- mce_step sub { $_ }, \@list;
+ mce_step sub { do_work($_) }, 1..1000;
+ mce_step sub { do_work($_) }, \@list;
+
+ # Important; pass an array_ref for deeply input data
+
+ mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ mce_step sub { do_work($_) }, \@deeply_list;
- # chunking, any chunk_size => 1 or higher
+ # Chunking; any chunk_size => 1 or greater
my %res = mce_step sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -1111,7 +1127,7 @@ needed after the block.
},
\@list;
- # input hash, current API available since 1.828
+ # Input hash; current API available since 1.828
my %res = mce_step sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -1123,7 +1139,7 @@ needed after the block.
},
\%hash;
- # unlike MCE::Loop, MCE::Step doesn't need input to run
+ # Unlike MCE::Loop, MCE::Step doesn't need input to run
mce_step { max_workers => 4 }, sub {
MCE->say( MCE->wid );
@@ -1144,7 +1160,7 @@ needed after the block.
MCE->say( "consumer: ", MCE->wid );
};
- # here, options are specified via init
+ # Here, options are specified via init
MCE::Step::init {
max_workers => [ 1, 3 ],
@@ -1153,10 +1169,14 @@ needed after the block.
mce_step \&producer, \&consumers;
+=over 3
+
=item MCE::Step->run_file ( sub { code }, file )
=item mce_step_f sub { code }, file
+=back
+
The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves with zero interaction by the manager process.
@@ -1166,7 +1186,7 @@ position among themselves with zero interaction by the manager process.
mce_step_f sub { $_ }, $file_handle;
mce_step_f sub { $_ }, \$scalar;
- # chunking, any chunk_size => 1 or higher
+ # chunking, any chunk_size => 1 or greater
my %res = mce_step_f sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -1178,10 +1198,14 @@ position among themselves with zero interaction by the manager process.
},
"/path/to/file";
+=over 3
+
=item MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
=item mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
+=back
+
Sequence may be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).
@@ -1198,7 +1222,7 @@ optional. The format is passed to sprintf (% may be omitted below).
step => $step, format => $fmt
};
- # chunking, any chunk_size => 1 or higher
+ # chunking, any chunk_size => 1 or greater
my %res = mce_step_s sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -1260,10 +1284,14 @@ Time was measured using 1 worker to emphasize the difference.
7500001 .. 8750000
8750001 .. 10000000
+=over 3
+
=item MCE::Step->run ( { input_data => iterator }, sub { code } )
=item mce_step { input_data => iterator }, sub { code }
+=back
+
An iterator reference may be specified for input_data. The only other way
is to specify input_data via MCE::Step::init. This prevents MCE::Step from
configuring the iterator reference as another user task which will not work.
@@ -1276,8 +1304,6 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
mce_step sub { $_ };
-=back
-
=head1 QUEUE-LIKE FEATURES
=over 3
@@ -1286,6 +1312,8 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
=item MCE->step ( arg1, arg2, argN )
+=back
+
The ->step method is the simplest form for passing elements into the next
sub-task.
@@ -1320,6 +1348,8 @@ sub-task.
2: 18, 0.985448
3: 19, 0.146548
+=over 3
+
=item MCE->enq ( task_name, item )
=item MCE->enq ( task_name, [ arg1, arg2, argN ] )
@@ -1332,6 +1362,8 @@ sub-task.
=item MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
+=back
+
The MCE 1.7 release enables finer control. Unlike ->step, which take multiple
arguments, the ->enq and ->enqp methods push items at the end of the array
internally. Passing multiple arguments is possible by enclosing the arguments
@@ -1390,8 +1422,12 @@ will cause an error.
D6: 28, 0.220465
D8: 29, 0.630111
+=over 3
+
=item MCE->await ( task_name, pending_threshold )
+=back
+
Providers may sometime run faster than consumers. Thus, increasing memory
consumption. MCE 1.7 adds the ->await method for pausing momentarily until
the receiving sub-task reaches the minimum threshold for the number of
@@ -1445,8 +1481,6 @@ items pending in its queue.
2: 28, 0.918173
3: 29, 0.358266
-=back
-
=head1 GATHERING DATA
Unlike MCE::Map where gather and output order are done for you automatically,
@@ -1636,6 +1670,8 @@ running.
=item MCE::Step::finish
+=back
+
Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.
@@ -1650,8 +1686,6 @@ longer needed.
MCE::Step::finish;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Stream.pm b/lib/MCE/Stream.pm
index 8318e43..4ee21de 100644
--- a/lib/MCE/Stream.pm
+++ b/lib/MCE/Stream.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -672,7 +672,7 @@ MCE::Stream - Parallel stream model for chaining multiple maps and greps
=head1 VERSION
-This document describes MCE::Stream version 1.838
+This document describes MCE::Stream version 1.843
=head1 SYNOPSIS
@@ -695,18 +695,22 @@ This document describes MCE::Stream version 1.838
## Array or array_ref
my @a = mce_stream sub { $_ * $_ }, 1..10000;
- my @b = mce_stream sub { $_ * $_ }, [ 1..10000 ];
+ my @b = mce_stream sub { $_ * $_ }, \@list;
+
+ ## Important; pass an array_ref for deeply input data
+ my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
## File_path, glob_ref, or scalar_ref
- my @c = mce_stream_f sub { chomp; $_ }, "/path/to/file";
- my @d = mce_stream_f sub { chomp; $_ }, $file_handle;
- my @e = mce_stream_f sub { chomp; $_ }, \$scalar;
+ my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file";
+ my @f = mce_stream_f sub { chomp; $_ }, $file_handle;
+ my @g = mce_stream_f sub { chomp; $_ }, \$scalar;
## Sequence of numbers (begin, end [, step, format])
- my @f = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
- my @g = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
+ my @h = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
+ my @i = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
- my @h = mce_stream_s sub { $_ * $_ }, {
+ my @j = mce_stream_s sub { $_ * $_ }, {
begin => 1, end => 10000, step => 5, format => undef
};
@@ -791,6 +795,8 @@ Specify C<Sereal => 0> to use Storable instead.
=item MCE::Stream::init { options }
+=back
+
The init function accepts a hash of MCE options. The gather and bounds_only
options, if specified, are ignored due to being used internally by the
module (not shown below).
@@ -833,8 +839,6 @@ module (not shown below).
7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
10000
-=back
-
Like with MCE::Stream::init above, MCE options may be specified using an
anonymous hash for the first argument. Notice how both max_workers and
task_name can take an anonymous array for setting values uniquely
@@ -916,18 +920,30 @@ possibilities for providing input data.
=item mce_stream sub { code }, list
+=back
+
Input data may be defined using a list or an array reference. Unlike MCE::Loop,
Flow, and Step, specifying a hash reference as input data isn't allowed.
+ ## Array or array_ref
my @a = mce_stream sub { $_ * 2 }, 1..1000;
my @b = mce_stream sub { $_ * 2 }, \@list;
- my @z = mce_stream sub { $_ * 2 }, \%hash; # not supported
+ ## Important; pass an array_ref for deeply input data
+ my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
+ my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
+
+ ## Not supported
+ my @z = mce_stream sub { ... }, \%hash;
+
+=over 3
=item MCE::Stream->run_file ( sub { code }, file )
=item mce_stream_f sub { code }, file
+=back
+
The fastest of these is the /path/to/file. Workers communicate the next offset
position among themselves with zero interaction by the manager process.
@@ -935,10 +951,14 @@ position among themselves with zero interaction by the manager process.
my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
+=over 3
+
=item MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
=item mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
+=back
+
Sequence may be defined as a list, an array reference, or a hash reference.
The functions require both begin and end values to run. Step and format are
optional. The format is passed to sprintf (% may be omitted below).
@@ -952,10 +972,14 @@ optional. The format is passed to sprintf (% may be omitted below).
begin => $beg, end => $end, step => $step, format => $fmt
};
+=over 3
+
=item MCE::Stream->run ( { input_data => iterator }, sub { code } )
=item mce_stream { input_data => iterator }, sub { code }
+=back
+
An iterator reference may be specified for input_data. The only other way
is to specify input_data via MCE::Stream::init. This prevents MCE::Stream
from configuring the iterator reference as another user task which will
@@ -969,8 +993,6 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
-=back
-
=head1 MANUAL SHUTDOWN
=over 3
@@ -979,6 +1001,8 @@ Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
=item MCE::Stream::finish
+=back
+
Workers remain persistent as much as possible after running. Shutdown occurs
automatically when the script terminates. Call finish when workers are no
longer needed.
@@ -993,8 +1017,6 @@ longer needed.
MCE::Stream::finish;
-=back
-
=head1 INDEX
L<MCE|MCE>, L<MCE::Core>
diff --git a/lib/MCE/Subs.pm b/lib/MCE/Subs.pm
index 91bcf86..2a82b39 100644
--- a/lib/MCE/Subs.pm
+++ b/lib/MCE/Subs.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (Subroutines::ProhibitSubroutinePrototypes)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -75,7 +75,6 @@ sub mce_status ( ) { return $MCE::MCE->status(); }
## Callable by the worker process only.
-sub mce_do (@) { return $MCE::MCE->do(@_); }
sub mce_exit (@) { return $MCE::MCE->exit(@_); }
sub mce_gather (@) { return $MCE::MCE->gather(@_); }
sub mce_last ( ) { return $MCE::MCE->last(); }
@@ -89,6 +88,7 @@ sub mce_yield ( ) { return $MCE::MCE->yield(); }
## Callable by both the manager and worker processes.
sub mce_abort ( ) { return $MCE::MCE->abort(); }
+sub mce_do (@) { return $MCE::MCE->do(@_); }
sub mce_freeze (@) { return $MCE::MCE->{freeze}(@_); }
sub mce_print (;*@) { return $MCE::MCE->print(@_); }
sub mce_printf (;*@) { return $MCE::MCE->printf(@_); }
@@ -147,7 +147,6 @@ sub _export_subs {
## Callable by the worker process only.
if ($_w_flg) {
- *{ $_package . '::mce_do' } = \&mce_do;
*{ $_package . '::mce_exit' } = \&mce_exit;
*{ $_package . '::mce_gather' } = \&mce_gather;
*{ $_package . '::mce_last' } = \&mce_last;
@@ -163,6 +162,7 @@ sub _export_subs {
if ($_m_flg || $_w_flg) {
*{ $_package . '::mce_abort' } = \&mce_abort;
+ *{ $_package . '::mce_do' } = \&mce_do;
*{ $_package . '::mce_freeze' } = \&mce_freeze;
*{ $_package . '::mce_print' } = \&mce_print;
*{ $_package . '::mce_printf' } = \&mce_printf;
@@ -204,7 +204,7 @@ MCE::Subs - Exports functions mapped directly to MCE methods
=head1 VERSION
-This document describes MCE::Subs version 1.838
+This document describes MCE::Subs version 1.843
=head1 SYNOPSIS
@@ -279,6 +279,8 @@ MCE methods are described in L<MCE::Core>.
=item * mce_abort
+=item * mce_do
+
=item * mce_forchunk
=item * mce_foreach
diff --git a/lib/MCE/Util.pm b/lib/MCE/Util.pm
index b884885..1d35108 100644
--- a/lib/MCE/Util.pm
+++ b/lib/MCE/Util.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized numeric );
-our $VERSION = '1.838';
+our $VERSION = '1.843';
## no critic (BuiltinFunctions::ProhibitStringyEval)
@@ -183,7 +183,7 @@ sub _destroy_socks {
for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
next unless (defined $_obj->{$_p}[$_i]);
if (fileno $_obj->{$_p}[$_i]) {
- MCE::Util::_syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv;
+ syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv;
eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) };
close $_obj->{$_p}[$_i];
}
@@ -192,7 +192,7 @@ sub _destroy_socks {
}
else {
if (fileno $_obj->{$_p}) {
- MCE::Util::_syswrite($_obj->{$_p}, '0') if $_is_winenv;
+ syswrite($_obj->{$_p}, '0') if $_is_winenv;
eval q{ CORE::shutdown($_obj->{$_p}, 2) };
close $_obj->{$_p};
}
@@ -262,15 +262,16 @@ sub _sock_pair {
sub _sock_ready {
my ($_socket, $_timeout) = @_;
- return '' if !defined $_timeout && exists $_sock_ready{"$_socket"};
+ return '' if !defined $_timeout && $_sock_ready{"$_socket"} > 1;
my $_val_bytes = "\x00\x00\x00\x00";
my $_ptr_bytes = unpack('I', pack('P', $_val_bytes));
my ($_delay, $_start) = (0, time);
if (!defined $_timeout) {
- $_sock_ready{"$_socket"} = undef;
- } else {
+ $_sock_ready{"$_socket"}++;
+ }
+ else {
$_timeout = undef if $_timeout < 0;
$_timeout += $_start if $_timeout;
}
@@ -288,14 +289,46 @@ sub _sock_ready {
}
}
+sub _sock_ready_w {
+ my ($_socket) = @_;
+ return if $_sock_ready{"${_socket}_w"} > 1;
+
+ my $_vec = '';
+ $_sock_ready{"${_socket}_w"}++;
+
+ while (1) {
+ vec($_vec, fileno($_socket), 1) = 1;
+ return if select(undef, $_vec, undef, 0) > 0;
+ sleep 0.045;
+ }
+
+ return;
+}
+
sub _sysread {
- my ($_delay, $_start);
+ my $_bytes;
- SYSREAD: ( @_ == 3
- ? sysread($_[0], $_[1], $_[2])
- : sysread($_[0], $_[1], $_[2], $_[3])
+ SYSREAD: $_bytes = ( @_ == 3
+ ? CORE::sysread($_[0], $_[1], $_[2])
+ : CORE::sysread($_[0], $_[1], $_[2], $_[3])
) or do {
- goto SYSREAD if $! == Errno::EINTR();
+ return $_bytes if (defined $MCE::Signal::KILLED);
+ goto SYSREAD if ($! == Errno::EINTR());
+ };
+
+ return $_bytes;
+}
+
+sub _sysread2 {
+ my ($_bytes, $_delay, $_start);
+ # called by MCE/Core/Manager.pm
+
+ SYSREAD: $_bytes = ( @_ == 3
+ ? CORE::sysread($_[0], $_[1], $_[2])
+ : CORE::sysread($_[0], $_[1], $_[2], $_[3])
+ ) or do {
+ return $_bytes if (defined $MCE::Signal::KILLED);
+ goto SYSREAD if ($! == Errno::EINTR());
# non-blocking operation could not be completed
if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) {
@@ -309,37 +342,18 @@ sub _sysread {
}
};
- return;
-}
-
-sub _sysseek {
- my $_pos;
-
- SYSSEEK: $_pos = sysseek($_[0], $_[1], $_[2]) or do {
- goto SYSSEEK if $! == Errno::EINTR();
- };
-
- return $_pos;
-}
-
-sub _syswrite {
- syswrite($_[0], $_[1]) or do {
- goto \&_syswrite if $! == Errno::EINTR();
- };
- return;
+ return $_bytes;
}
sub _nonblocking {
if ($^O eq 'MSWin32') {
- my $nonblocking = ( $_[1] ) ? "\x00\x00\x00\x01" : "\x00\x00\x00\x00";
-
# MSWin32 FIONBIO - from winsock2.h macro
+ my $nonblocking = $_[1] ? "\x00\x00\x00\x01" : "\x00\x00\x00\x00";
+
ioctl($_[0], 0x8004667e, unpack("I", pack('P', $nonblocking)));
}
else {
- my $nonblocking = ( $_[1] ) ? 0 : 1;
-
- $_[0]->blocking($nonblocking);
+ $_[0]->blocking( $_[1] ? 0 : 1 );
}
return;
@@ -361,7 +375,7 @@ MCE::Util - Utility functions
=head1 VERSION
-This document describes MCE::Util version 1.838
+This document describes MCE::Util version 1.843
=head1 SYNOPSIS