summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAngel Abad <angelabad@gmail.com>2016-11-03 08:37:59 +0100
committerAngel Abad <angelabad@gmail.com>2016-11-03 08:37:59 +0100
commitafb6328aa0d67bd9c1e56b6c89adf956c35d1362 (patch)
treef46d4dda005d92999532d60fbf39a43a34b9566d /lib
parentf14f07a71464d263f46e14ebafbe4894ec19a3e2 (diff)
New upstream version 1.807
Diffstat (limited to 'lib')
-rw-r--r--lib/MCE.pm21
-rw-r--r--lib/MCE.pod2
-rw-r--r--lib/MCE/Candy.pm4
-rw-r--r--lib/MCE/Core.pod2
-rw-r--r--lib/MCE/Core/Input/Generator.pm2
-rw-r--r--lib/MCE/Core/Input/Handle.pm2
-rw-r--r--lib/MCE/Core/Input/Iterator.pm2
-rw-r--r--lib/MCE/Core/Input/Request.pm2
-rw-r--r--lib/MCE/Core/Input/Sequence.pm2
-rw-r--r--lib/MCE/Core/Manager.pm4
-rw-r--r--lib/MCE/Core/Validation.pm2
-rw-r--r--lib/MCE/Core/Worker.pm13
-rw-r--r--lib/MCE/Examples.pod111
-rw-r--r--lib/MCE/Flow.pm4
-rw-r--r--lib/MCE/Grep.pm4
-rw-r--r--lib/MCE/Loop.pm4
-rw-r--r--lib/MCE/Map.pm4
-rw-r--r--lib/MCE/Mutex.pm4
-rw-r--r--lib/MCE/Queue.pm4
-rw-r--r--lib/MCE/Relay.pm446
-rw-r--r--lib/MCE/Signal.pm4
-rw-r--r--lib/MCE/Step.pm4
-rw-r--r--lib/MCE/Stream.pm4
-rw-r--r--lib/MCE/Subs.pm4
-rw-r--r--lib/MCE/Util.pm4
25 files changed, 553 insertions, 106 deletions
diff --git a/lib/MCE.pm b/lib/MCE.pm
index e6694ec..289e017 100644
--- a/lib/MCE.pm
+++ b/lib/MCE.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -86,7 +86,7 @@ BEGIN {
## _chunk_id _mce_sid _mce_tid _pids _run_mode _single_dim _thrs _tids _wid
## _exiting _exit_pid _total_exited _total_running _total_workers _task_wid
## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
- ## _init_pid _init_total_workers _last_sref _mgr_live _rla_data _rla_return
+ ## _init_pid _init_total_workers _last_sref _mgr_live _rla_data
##
## _bsb_r_sock _bsb_w_sock _bse_r_sock _bse_w_sock _com_r_sock _com_w_sock
## _dat_r_sock _dat_w_sock _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock
@@ -280,7 +280,7 @@ sub DESTROY {
END {
if ( defined $MCE ) {
- if ( !$_has_threads ) {
+ if ( !$_has_threads || (defined $TOP_HDLR && !$TOP_HDLR->{use_threads}) ) {
MCE::Flow->finish ( 'MCE' ) if $INC{'MCE/Flow.pm'};
MCE::Grep->finish ( 'MCE' ) if $INC{'MCE/Grep.pm'};
MCE::Loop->finish ( 'MCE' ) if $INC{'MCE/Loop.pm'};
@@ -764,6 +764,9 @@ sub relay (;&) {
unless (defined $MCE->{init_relay});
}
+*relay_lock = \&relay_recv;
+*relay_unlock = \&relay;
+
###############################################################################
## ----------------------------------------------------------------------------
## Restart worker method.
@@ -1366,14 +1369,7 @@ sub abort {
my $_lock_chn = $self->{_lock_chn};
$_DAT_LOCK->lock() if $_lock_chn;
-
- if (exists $self->{_rla_return}) {
- print {$_DAT_W_SOCK} OUTPUT_W_RLA.$LF . $_chn.$LF;
- print {$_DAU_W_SOCK} (delete $self->{_rla_return}).$LF;
- }
-
print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF;
-
$_DAT_LOCK->unlock() if $_lock_chn;
}
}
@@ -1414,11 +1410,6 @@ sub exit {
$_DAT_LOCK->lock() if $_lock_chn;
- if (exists $self->{_rla_return}) {
- print {$_DAT_W_SOCK} OUTPUT_W_RLA.$LF . $_chn.$LF;
- print {$_DAU_W_SOCK} (delete $self->{_rla_return}).$LF;
- }
-
print {$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF;
print {$_DAU_W_SOCK}
$_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
diff --git a/lib/MCE.pod b/lib/MCE.pod
index cc69af5..787c71d 100644
--- a/lib/MCE.pod
+++ b/lib/MCE.pod
@@ -5,7 +5,7 @@ MCE - Many-Core Engine for Perl providing parallel processing capabilities
=head1 VERSION
-This document describes MCE version 1.806
+This document describes MCE version 1.807
Many-Core Engine (MCE) for Perl helps enable a new level of performance by
maximizing all available cores.
diff --git a/lib/MCE/Candy.pm b/lib/MCE/Candy.pm
index 8e52735..eef1dee 100644
--- a/lib/MCE/Candy.pm
+++ b/lib/MCE/Candy.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
our @CARP_NOT = qw( MCE );
@@ -210,7 +210,7 @@ MCE::Candy - Sugar methods and output iterators
=head1 VERSION
-This document describes MCE::Candy version 1.806
+This document describes MCE::Candy version 1.807
=head1 DESCRIPTION
diff --git a/lib/MCE/Core.pod b/lib/MCE/Core.pod
index 40b7273..e8a3c7c 100644
--- a/lib/MCE/Core.pod
+++ b/lib/MCE/Core.pod
@@ -5,7 +5,7 @@ MCE::Core - Documentation describing the core MCE API
=head1 VERSION
-This document describes MCE::Core version 1.806
+This document describes MCE::Core version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm
index 19ee15d..c08c90b 100644
--- a/lib/MCE/Core/Input/Generator.pm
+++ b/lib/MCE/Core/Input/Generator.pm
@@ -15,7 +15,7 @@ package MCE::Core::Input::Generator;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm
index 52622a3..2d277cb 100644
--- a/lib/MCE/Core/Input/Handle.pm
+++ b/lib/MCE/Core/Input/Handle.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Handle;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm
index f5cc2ec..32c7be1 100644
--- a/lib/MCE/Core/Input/Iterator.pm
+++ b/lib/MCE/Core/Input/Iterator.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm
index 16fb6b0..4280eb5 100644
--- a/lib/MCE/Core/Input/Request.pm
+++ b/lib/MCE/Core/Input/Request.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Request;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm
index 8803300..727b9b8 100644
--- a/lib/MCE/Core/Input/Sequence.pm
+++ b/lib/MCE/Core/Input/Sequence.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Manager.pm b/lib/MCE/Core/Manager.pm
index 9cf6304..5b43592 100644
--- a/lib/MCE/Core/Manager.pm
+++ b/lib/MCE/Core/Manager.pm
@@ -14,7 +14,7 @@ package MCE::Core::Manager;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -838,7 +838,7 @@ sub _output_loop {
unless ($_nbytes = unpack('I', $_val_bytes)) {
if ($_count) {
# delay after a while to not consume a CPU core
- $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.005;
+ $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030;
} else {
sleep 0.030;
}
diff --git a/lib/MCE/Core/Validation.pm b/lib/MCE/Core/Validation.pm
index 0dbac72..b9a4901 100644
--- a/lib/MCE/Core/Validation.pm
+++ b/lib/MCE/Core/Validation.pm
@@ -14,7 +14,7 @@ package MCE::Core::Validation;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
diff --git a/lib/MCE/Core/Worker.pm b/lib/MCE/Core/Worker.pm
index 66514cd..41cecf7 100644
--- a/lib/MCE/Core/Worker.pm
+++ b/lib/MCE/Core/Worker.pm
@@ -14,7 +14,7 @@ package MCE::Core::Worker;
use strict;
use warnings;
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## Items below are folded into MCE.
@@ -399,7 +399,9 @@ sub _worker_do {
## Call user_begin if defined.
if (defined $self->{user_begin}) {
+ $self->{_chunk_id} = 0;
$self->{user_begin}($self, $_task_id, $_task_name);
+ $self->sync if ($_task_id == 0 && defined $self->{init_relay});
}
## Retry chunk if previous attempt died.
@@ -446,7 +448,7 @@ sub _worker_do {
_worker_read_handle($self, READ_MEMORY, $self->{input_data});
}
elsif (defined $self->{user_func}) {
- $self->{_chunk_id} = $self->{_task_wid};
+ $self->{_chunk_id} = 0;
$self->{user_func}->($self);
}
@@ -456,6 +458,8 @@ sub _worker_do {
## Call user_end if defined.
if (defined $self->{user_end}) {
+ $self->{_chunk_id} = 0;
+ $self->sync if ($_task_id == 0 && defined $self->{init_relay});
$self->{user_end}($self, $_task_id, $_task_name);
}
@@ -467,11 +471,6 @@ sub _worker_do {
$_DAT_LOCK->lock() if $_lock_chn;
- if (exists $self->{_rla_return}) {
- print {$_DAT_W_SOCK} OUTPUT_W_RLA.$LF . $_chn.$LF;
- print {$_DAU_W_SOCK} (delete $self->{_rla_return}).$LF;
- }
-
print {$_DAT_W_SOCK} OUTPUT_W_DNE.$LF . $_chn.$LF;
print {$_DAU_W_SOCK} $_task_id.$LF;
diff --git a/lib/MCE/Examples.pod b/lib/MCE/Examples.pod
index 0feac0b..58cfe03 100644
--- a/lib/MCE/Examples.pod
+++ b/lib/MCE/Examples.pod
@@ -5,7 +5,7 @@ MCE::Examples - Various examples and demonstrations
=head1 VERSION
-This document describes MCE::Examples version 1.806
+This document describes MCE::Examples version 1.807
=head1 INCLUDED WITH THE DISTRIBUTION
@@ -465,6 +465,115 @@ user_begin and mce_loop. Take notice of the comma after \&_func though.
2: 3
2: 4
+=head1 MANDELBROT DEMONSTRATION
+
+For the next demonstration, L<MCE::Relay> allows a section of code to run
+serially and orderly between workers. Relay capabilities is enabled with
+the C<init_relay> option, which loads MCE::Relay.
+
+ # perl mandelbrot.pl 16000 > image.pbm
+ # outputs a pbm binary to STDOUT
+
+ # The Computer Language Benchmarks Game
+ # http://benchmarksgame.alioth.debian.org/
+ #
+ # Started with:
+ # C# : Adapted by Antti Lankila from Isaac Gouy's implementation
+ # Perl: Contributed by Mykola Zubach
+ #
+ # MCE::Loop version by Mario Roy
+ # requires MCE 1.807+
+
+ use strict;
+ use warnings;
+
+ use MCE::Loop;
+
+ use constant MAXITER => 50;
+ use constant LIMIT => 4.0;
+ use constant XMIN => -1.5;
+ use constant YMIN => -1.0;
+
+ my ( $w, $h, $m, $invN );
+
+ sub draw_lines {
+ my ( $y1, $y2 ) = @_;
+ my @result;
+
+ # Workers run simultaneously, in parallel.
+
+ for my $y ( $y1 .. $y2 ) {
+ my ( $bits, $xcounter, @line ) = ( 0, 0 );
+ my $Ci = $y * $invN + YMIN;
+
+ for my $x ( 0 .. $w - 1 ) {
+ my ( $Zr, $Zi, $Tr, $Ti ) = ( 0, 0, 0, 0 );
+ my $Cr = $x * $invN + XMIN;
+
+ $bits = $bits << 1;
+
+ for ( 1 .. MAXITER ) {
+ $Zi = $Zi * 2 * $Zr + $Ci;
+ $Zr = $Tr - $Ti + $Cr;
+ $Ti = $Zi * $Zi, $Tr = $Zr * $Zr;
+
+ $bits |= 1, last if ( $Tr + $Ti > LIMIT );
+ }
+
+ if ( ++$xcounter == 8 ) {
+ push @line, $bits ^ 0xff;
+ $bits = $xcounter = 0;
+ }
+ }
+
+ if ( $xcounter ) {
+ push @line, ( $bits << ( 8 - $xcounter ) ) ^ 0xff;
+ }
+
+ push @result, pack 'C*', @line;
+ }
+
+ # Statements between lock & unlock are processed serially & orderly.
+
+ MCE->relay_lock;
+
+ print @result; # Workers display upper-half only.
+ MCE->gather( @result ); # Gather lines for the manager-process.
+
+ MCE->relay_unlock;
+ }
+
+ ## MAIN()
+
+ # Important, must flush output immediately.
+
+ $| = 1; binmode STDOUT;
+
+ $w = $h = shift || 200;
+ $m = int( $h / 2 );
+ $invN = 2 / $w;
+
+ print "P4\n$w $h\n"; # PBM image header.
+
+ # Workers display upper-half only. Also, lines are gathered to be
+ # displayed later by the manager-process after running.
+
+ MCE::Loop->init(
+ init_relay => 0, # Enables MCE::Relay capabilities if defined.
+ max_workers => 4,
+ bounds_only => 1,
+ );
+
+ my @upper = mce_loop_s { draw_lines( $_[1][0], $_[1][1] ) } 0, $m;
+
+ MCE::Loop->finish;
+
+ # Remove first and last lines from the upper half.
+ # Then, output bottom half.
+
+ shift @upper, pop @upper;
+ print reverse @upper;
+
=head1 MONTE CARLO SIMULATION
There is an article on the web (search for comp.lang.perl.misc MCE) suggesting
diff --git a/lib/MCE/Flow.pm b/lib/MCE/Flow.pm
index 342015f..0e1bf33 100644
--- a/lib/MCE/Flow.pm
+++ b/lib/MCE/Flow.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -499,7 +499,7 @@ MCE::Flow - Parallel flow model for building creative applications
=head1 VERSION
-This document describes MCE::Flow version 1.806
+This document describes MCE::Flow version 1.807
=head1 DESCRIPTION
diff --git a/lib/MCE/Grep.pm b/lib/MCE/Grep.pm
index 9355fcb..ea1a4a5 100644
--- a/lib/MCE/Grep.pm
+++ b/lib/MCE/Grep.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -449,7 +449,7 @@ MCE::Grep - Parallel grep model similar to the native grep function
=head1 VERSION
-This document describes MCE::Grep version 1.806
+This document describes MCE::Grep version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Loop.pm b/lib/MCE/Loop.pm
index a2444e7..5fe184e 100644
--- a/lib/MCE/Loop.pm
+++ b/lib/MCE/Loop.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -369,7 +369,7 @@ MCE::Loop - Parallel loop model for building creative loops
=head1 VERSION
-This document describes MCE::Loop version 1.806
+This document describes MCE::Loop version 1.807
=head1 DESCRIPTION
diff --git a/lib/MCE/Map.pm b/lib/MCE/Map.pm
index 81efe8f..d7832ab 100644
--- a/lib/MCE/Map.pm
+++ b/lib/MCE/Map.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -449,7 +449,7 @@ MCE::Map - Parallel map model similar to the native map function
=head1 VERSION
-This document describes MCE::Map version 1.806
+This document describes MCE::Map version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Mutex.pm b/lib/MCE/Mutex.pm
index 6364ba5..ffd6edc 100644
--- a/lib/MCE/Mutex.pm
+++ b/lib/MCE/Mutex.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
use MCE::Util qw( $LF );
@@ -111,7 +111,7 @@ MCE::Mutex - Locking for Many-Core Engine
=head1 VERSION
-This document describes MCE::Mutex version 1.806
+This document describes MCE::Mutex version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Queue.pm b/lib/MCE/Queue.pm
index d9b56d2..8d2822a 100644
--- a/lib/MCE/Queue.pm
+++ b/lib/MCE/Queue.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (Subroutines::ProhibitExplicitReturnUndef)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -1637,7 +1637,7 @@ MCE::Queue - Hybrid (normal and priority) queues
=head1 VERSION
-This document describes MCE::Queue version 1.806
+This document describes MCE::Queue version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Relay.pm b/lib/MCE/Relay.pm
index 174878c..3ffa186 100644
--- a/lib/MCE/Relay.pm
+++ b/lib/MCE/Relay.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -55,21 +55,15 @@ sub import {
###############################################################################
{
- my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_chunkid, $_rla_nextid);
+ my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_nextid, $_max_workers);
my %_output_function = (
OUTPUT_W_RLA.$LF => sub { # Worker has relayed
- $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
+ # $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
- my ($_chunk_id, $_next_id) = split(':', <$_DAU_R_SOCK>);
-
- if ($_chunk_id > $_rla_chunkid) {
- chomp $_next_id;
- $_rla_chunkid = $_chunk_id;
- $_rla_nextid = $_next_id;
- }
+ $_rla_nextid = 0 if ( ++$_rla_nextid == $_max_workers );
return;
},
@@ -82,6 +76,10 @@ sub import {
my $_caller = $_MCE->{_caller};
+ $_max_workers = (exists $_MCE->{user_tasks})
+ ? $_MCE->{user_tasks}[0]{max_workers}
+ : $_MCE->{max_workers};
+
## Write initial relay data.
if (defined $_MCE->{init_relay}) {
my $_ref = ref $_MCE->{init_relay};
@@ -104,7 +102,7 @@ sub import {
print {$_RLA_W_SOCK} length($_init_relay) . $LF . $_init_relay;
- $_rla_chunkid = $_rla_nextid = 0;
+ $_rla_nextid = 0;
}
delete $MCE::RLA->{$_caller};
@@ -130,8 +128,8 @@ sub import {
}
## Clear variables.
- $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = $_rla_chunkid = $_rla_nextid =
- undef;
+ $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = undef;
+ $_rla_nextid = $_max_workers = undef;
return;
}
@@ -197,17 +195,22 @@ sub relay_recv {
my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
- _croak('MCE::relay: (init_relay) is not specified')
+ _croak('MCE::relay_recv: (init_relay) is not specified')
unless (defined $self->{init_relay});
- _croak('MCE::relay: method is not allowed by the manager process')
+ _croak('MCE::relay_recv: method is not allowed by the manager process')
unless ($self->{_wid});
- _croak('MCE::relay: method is not allowed by this sub task')
+ _croak('MCE::relay_recv: method is not allowed by task_id > 0')
if ($self->{_task_id} > 0);
- my $_chn = ($self->{_chunk_id} - 1) % $self->{max_workers};
- my $_rdr = $self->{_rla_r_sock}->[$_chn];
+ my ($_chn, $_nxt, $_rdr, $_len, $_ref); local $_;
+
+ $_chn = $self->{_chunk_id} || $self->{_wid};
+ $_chn = ($_chn - 1) % $self->{max_workers};
+ $_nxt = $_chn + 1;
+ $_nxt = 0 if ($_nxt == $self->{max_workers});
+ $_rdr = $self->{_rla_r_sock}->[$_chn];
- my ($_len, $_ref); local $_;
+ print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
chomp($_len = <$_rdr>);
read $_rdr, $_, $_len;
@@ -247,7 +250,7 @@ sub relay (;&) {
unless (defined $self->{init_relay});
_croak('MCE::relay: method is not allowed by the manager process')
unless ($self->{_wid});
- _croak('MCE::relay: method is not allowed by this sub task')
+ _croak('MCE::relay: method is not allowed by task_id > 0')
if ($self->{_task_id} > 0);
if (ref $_code ne 'CODE') {
@@ -256,12 +259,14 @@ sub relay (;&) {
weaken $_code;
}
- my $_chn = ($self->{_chunk_id} - 1) % $self->{max_workers};
- my $_nxt = $_chn + 1; $_nxt = 0 if ($_nxt == $self->{max_workers});
- my $_rdr = $self->{_rla_r_sock}->[$_chn];
- my $_wtr = $self->{_rla_w_sock}->[$_nxt];
+ my ($_chn, $_nxt, $_rdr, $_wtr);
- $self->{_rla_return} = $self->{_chunk_id} .':'. $_nxt;
+ $_chn = $self->{_chunk_id} || $self->{_wid};
+ $_chn = ($_chn - 1) % $self->{max_workers};
+ $_nxt = $_chn + 1;
+ $_nxt = 0 if ($_nxt == $self->{max_workers});
+ $_rdr = $self->{_rla_r_sock}->[$_chn];
+ $_wtr = $self->{_rla_w_sock}->[$_nxt];
if (exists $self->{_rla_data}) {
local $_ = delete $self->{_rla_data};
@@ -283,6 +288,8 @@ sub relay (;&) {
else {
my ($_len, $_ref); local $_;
+ print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
+
chomp($_len = <$_rdr>);
read $_rdr, $_, $_len;
$_ref = chop $_;
@@ -315,6 +322,11 @@ sub relay (;&) {
return;
}
+## Aliases.
+
+*relay_lock = \&relay_recv;
+*relay_unlock = \&relay;
+
1;
__END__
@@ -331,7 +343,7 @@ MCE::Relay - Extends Many-Core Engine with relay capabilities
=head1 VERSION
-This document describes MCE::Relay version 1.806
+This document describes MCE::Relay version 1.807
=head1 SYNOPSIS
@@ -342,7 +354,9 @@ This document describes MCE::Relay version 1.806
## Line Count #######################################
mce_flow_f {
- use_slurpio => 1, init_relay => 0,
+ max_workers => 4,
+ use_slurpio => 1,
+ init_relay => 0,
},
sub {
my ($mce, $slurp_ref, $chunk_id) = @_;
@@ -359,14 +373,22 @@ This document describes MCE::Relay version 1.806
## Orderly Action ###################################
+ $| = 1; # Important, must flush output immediately.
+
mce_flow_f {
- use_slurpio => 1, init_relay => 0,
+ max_workers => 2,
+ use_slurpio => 1,
+ init_relay => 0,
},
sub {
my ($mce, $slurp_ref, $chunk_id) = @_;
- ## Exclusive access to STDOUT. Relays 0.
- MCE::relay { print $$slurp_ref };
+ ## The relay value is relayed and remains 0.
+ ## Writes to STDOUT orderly.
+
+ MCE->relay_lock;
+ print $$slurp_ref;
+ MCE->relay_unlock;
}, $file;
@@ -374,13 +396,12 @@ This document describes MCE::Relay version 1.806
This module enables workers to receive and pass on information orderly with
zero involvement by the manager process while running. The module is loaded
-automatically when init_relay is specified.
+automatically when MCE option C<init_relay> is specified.
-All workers must participate when relaying data. Calling relay more than once
-is not recommended inside the block. Doing so will stall the application.
+All workers (belonging to task_id 0) must participate when relaying data.
-Relaying is not meant for passing big data. The last worker will likely stall
-if exceeding the buffer size for the socket. Not exceeding 16 KiB - 7 is safe
+Relaying is not meant for passing big data. The last worker will stall if
+exceeding the buffer size for the socket. Not exceeding 16 KiB - 7 is safe
across all platforms.
=head1 API DOCUMENTATION
@@ -500,7 +521,7 @@ Or simply a scalar value.
=item MCE->relay_final ( void )
-Call this method to obtain the final relay values after running. See included
+Call this method to obtain the final relay value(s) after running. See included
example findnull.pl for another use case.
use MCE max_workers => 4;
@@ -531,9 +552,183 @@ example findnull.pl for another use case.
=item MCE->relay_recv ( void )
-The relay_recv method allows one to perform an exclusive action prior to
-relaying. Below, the user_func is taken from the cat.pl example. Relaying
-is chunk_id driven (or task_wid when not processing input), thus orderly.
+Call this method to obtain the next relay value before relaying. This allows
+serial-code to be processed orderly between workers. The following is a parallel
+demonstration for the fasta-benchmark on the web.
+
+ # perl fasta.pl 25000000
+
+ # The Computer Language Benchmarks game
+ # http://benchmarksgame.alioth.debian.org/
+ #
+ # contributed by Barry Walsh
+ # port of fasta.rb #6
+ #
+ # MCE::Flow version by Mario Roy
+ # requires MCE 1.807+
+ # requires MCE::Shared 1.806+
+
+ use strict;
+ use warnings;
+ use feature 'say';
+
+ use MCE::Flow;
+ use MCE::Shared;
+ use MCE::Candy;
+
+ use constant IM => 139968;
+ use constant IA => 3877;
+ use constant IC => 29573;
+
+ my $LAST = MCE::Shared->scalar( 42 );
+
+ my $alu =
+ 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
+ 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
+ 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
+ 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
+ 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
+ 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
+ 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
+
+ my $iub = [
+ [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
+ [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
+ [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
+ [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
+ [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
+ ];
+
+ my $homosapiens = [
+ [ 'a', 0.3029549426680 ],
+ [ 'c', 0.1979883004921 ],
+ [ 'g', 0.1975473066391 ],
+ [ 't', 0.3015094502008 ]
+ ];
+
+ sub make_repeat_fasta {
+ my ( $src, $n ) = @_;
+ my $width = qr/(.{1,60})/;
+ my $l = length $src;
+ my $s = $src x ( ($n / $l) + 1 );
+ substr( $s, $n, $l ) = '';
+
+ while ( $s =~ m/$width/g ) { say $1 }
+ }
+
+ sub make_random_fasta {
+ my ( $table, $n ) = @_;
+ my $rand = undef;
+ my $width = 60;
+ my $prob = 0.0;
+ my $output = '';
+ my ( $c1, $c2, $last );
+
+ $_->[1] = ( $prob += $_->[1] ) for @$table;
+
+ $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
+ $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
+
+ my $seq = MCE::Shared->sequence(
+ { chunk_size => 2000, bounds_only => 1 },
+ 1, $n / $width
+ );
+
+ my $code1 = q{
+ while ( 1 ) {
+ # --------------------------------------------
+ # Process code orderly between workers.
+ # --------------------------------------------
+
+ my $chunk_id = MCE->relay_recv;
+ my ( $begin, $end ) = $seq->next;
+
+ MCE->relay, last if ( !defined $begin );
+
+ my $last = $LAST->get;
+ my $temp = $last;
+
+ # Pre-compute $LAST value for the next worker
+ for ( 1 .. ( $end - $begin + 1 ) * $width ) {
+ $temp = ( $temp * IA + IC ) % IM;
+ }
+
+ $LAST->set( $temp );
+
+ # Increment chunk_id value
+ MCE->relay( sub { $_ += 1 } );
+
+ # --------------------------------------------
+ # Also run code in parallel between workers.
+ # --------------------------------------------
+
+ for ( $begin .. $end ) {
+ for ( 1 .. $width ) { !C! }
+ $output .= "\n";
+ }
+
+ # --------------------------------------------
+ # Display orderly.
+ # --------------------------------------------
+
+ MCE->gather( $chunk_id, $output );
+
+ $output = '';
+ }
+ };
+
+ $code1 =~ s/!C!/$c1/g;
+
+ MCE::Flow->init(
+ max_workers => 4, ## MCE::Util->get_ncpu || 4,
+ gather => MCE::Candy::out_iter_fh( \*STDOUT ),
+ init_relay => 1,
+ use_threads => 0,
+ );
+
+ MCE::Flow->run( sub { eval $code1 } );
+ MCE::Flow->finish;
+
+ $last = $LAST->get;
+
+ $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
+ $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
+
+ my $code2 = q{
+ if ( $n % $width != 0 ) {
+ for ( 1 .. $n % $width ) { !C! }
+ print "\n";
+ }
+ };
+
+ $code2 =~ s/!C!/$c2/g;
+ eval $code2;
+
+ $LAST->set( $last );
+ }
+
+ my $n = $ARGV[0] || 27;
+
+ say ">ONE Homo sapiens alu";
+ make_repeat_fasta( $alu, $n * 2 );
+
+ say ">TWO IUB ambiguity codes";
+ make_random_fasta( $iub, $n * 3 );
+
+ say ">THREE Homo sapiens frequency";
+ make_random_fasta( $homosapiens, $n * 5 );
+
+=item MCE->relay_lock ( void )
+
+=item MCE->relay_unlock ( void )
+
+The C<relay_lock> and C<relay_unlock> methods, added to MCE 1.807, are
+aliases for C<relay_recv> and C<relay> respectively. They allow one to
+perform an exclusive action prior to actual relaying of data.
+
+Below, C<user_func> is taken from the C<cat.pl> MCE example. Relaying is
+driven by C<chunk_id> or C<task_wid> when not processing input, thus
+occurs orderly.
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
@@ -554,24 +749,177 @@ is chunk_id driven (or task_wid when not processing input), thus orderly.
else {
## The following is another way to have ordered output. Workers
## write directly to STDOUT exclusively without any involvement
- ## from the manager process. The statements between relay_recv
- ## and relay run serially and most important orderly.
-
- ## STDERR/OUT flush automatically inside worker threads and
- ## processes. Disable buffering on file handles otherwise.
+ ## from the manager process. The statement(s) between relay_lock
+ ## and relay_unlock run serially and most important orderly.
- MCE->relay_recv; ## my $val = MCE->relay_recv;
- ## relay simply forwards 0 below
+ MCE->relay_lock; # alias for MCE->relay_recv
- print $$chunk_ref; ## exclusive access to STDOUT
- ## important, flush immediately
+ print $$chunk_ref; # ensure $| = 1 in script
- MCE->relay;
+ MCE->relay_unlock; # alias for MCE->relay
}
return;
}
+The following is a variant of the fasta-benchmark demonstration shown above.
+Here, workers write exclusively and orderly to C<STDOUT>.
+
+ # perl fasta.pl 25000000
+
+ # The Computer Language Benchmarks game
+ # http://benchmarksgame.alioth.debian.org/
+ #
+ # contributed by Barry Walsh
+ # port of fasta.rb #6
+ #
+ # MCE::Flow version by Mario Roy
+ # requires MCE 1.807+
+ # requires MCE::Shared 1.806+
+
+ use strict;
+ use warnings;
+ use feature 'say';
+
+ use MCE::Flow;
+ use MCE::Shared;
+
+ use constant IM => 139968;
+ use constant IA => 3877;
+ use constant IC => 29573;
+
+ my $LAST = MCE::Shared->scalar( 42 );
+
+ my $alu =
+ 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
+ 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
+ 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
+ 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
+ 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
+ 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
+ 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
+
+ my $iub = [
+ [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
+ [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
+ [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
+ [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
+ [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
+ ];
+
+ my $homosapiens = [
+ [ 'a', 0.3029549426680 ],
+ [ 'c', 0.1979883004921 ],
+ [ 'g', 0.1975473066391 ],
+ [ 't', 0.3015094502008 ]
+ ];
+
+ sub make_repeat_fasta {
+ my ( $src, $n ) = @_;
+ my $width = qr/(.{1,60})/;
+ my $l = length $src;
+ my $s = $src x ( ($n / $l) + 1 );
+ substr( $s, $n, $l ) = '';
+
+ while ( $s =~ m/$width/g ) { say $1 }
+ }
+
+ sub make_random_fasta {
+ my ( $table, $n ) = @_;
+ my $rand = undef;
+ my $width = 60;
+ my $prob = 0.0;
+ my $output = '';
+ my ( $c1, $c2, $last );
+
+ $_->[1] = ( $prob += $_->[1] ) for @$table;
+
+ $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
+ $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
+
+ my $seq = MCE::Shared->sequence(
+ { chunk_size => 2000, bounds_only => 1 },
+ 1, $n / $width
+ );
+
+ my $code1 = q{
+ $| = 1; # Important, must flush output immediately.
+
+ while ( 1 ) {
+ # --------------------------------------------
+ # Process code orderly between workers.
+ # --------------------------------------------
+
+ MCE->relay_lock;
+
+ my ( $begin, $end ) = $seq->next;
+ print( $output ), $output = '' if ( length $output );
+
+ MCE->relay_unlock, last if ( !defined $begin );
+
+ my $last = $LAST->get;
+ my $temp = $last;
+
+ # Pre-compute $LAST value for the next worker
+ for ( 1 .. ( $end - $begin + 1 ) * $width ) {
+ $temp = ( $temp * IA + IC ) % IM;
+ }
+
+ $LAST->set( $temp );
+
+ MCE->relay_unlock;
+
+ # --------------------------------------------
+ # Also run code in parallel.
+ # --------------------------------------------
+
+ for ( $begin .. $end ) {
+ for ( 1 .. $width ) { !C! }
+ $output .= "\n";
+ }
+ }
+ };
+
+ $code1 =~ s/!C!/$c1/g;
+
+ MCE::Flow->init(
+ max_workers => 4, ## MCE::Util->get_ncpu || 4,
+ init_relay => 0,
+ use_threads => 0,
+ );
+
+ MCE::Flow->run( sub { eval $code1 } );
+ MCE::Flow->finish;
+
+ $last = $LAST->get;
+
+ $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
+ $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
+
+ my $code2 = q{
+ if ( $n % $width != 0 ) {
+ for ( 1 .. $n % $width ) { !C! }
+ print "\n";
+ }
+ };
+
+ $code2 =~ s/!C!/$c2/g;
+ eval $code2;
+
+ $LAST->set( $last );
+ }
+
+ my $n = $ARGV[0] || 27;
+
+ say ">ONE Homo sapiens alu";
+ make_repeat_fasta( $alu, $n * 2 );
+
+ say ">TWO IUB ambiguity codes";
+ make_random_fasta( $iub, $n * 3 );
+
+ say ">THREE Homo sapiens frequency";
+ make_random_fasta( $homosapiens, $n * 5 );
+
=back
=head1 INDEX
diff --git a/lib/MCE/Signal.pm b/lib/MCE/Signal.pm
index e51bb2e..73b6ea4 100644
--- a/lib/MCE/Signal.pm
+++ b/lib/MCE/Signal.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
@@ -451,7 +451,7 @@ MCE::Signal - Temporary directory creation/cleanup and signal handling
=head1 VERSION
-This document describes MCE::Signal version 1.806
+This document describes MCE::Signal version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Step.pm b/lib/MCE/Step.pm
index 4a38e03..d9eca59 100644
--- a/lib/MCE/Step.pm
+++ b/lib/MCE/Step.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -730,7 +730,7 @@ MCE::Step - Parallel step model for building creative steps
=head1 VERSION
-This document describes MCE::Step version 1.806
+This document describes MCE::Step version 1.807
=head1 DESCRIPTION
diff --git a/lib/MCE/Stream.pm b/lib/MCE/Stream.pm
index 8990dc2..a21e6c0 100644
--- a/lib/MCE/Stream.pm
+++ b/lib/MCE/Stream.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
## no critic (Subroutines::ProhibitSubroutinePrototypes)
@@ -679,7 +679,7 @@ MCE::Stream - Parallel stream model for chaining multiple maps and greps
=head1 VERSION
-This document describes MCE::Stream version 1.806
+This document describes MCE::Stream version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Subs.pm b/lib/MCE/Subs.pm
index aa09dd9..e6fb8da 100644
--- a/lib/MCE/Subs.pm
+++ b/lib/MCE/Subs.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (Subroutines::ProhibitSubroutinePrototypes)
## no critic (TestingAndDebugging::ProhibitNoStrict)
@@ -204,7 +204,7 @@ MCE::Subs - Exports functions mapped directly to MCE methods
=head1 VERSION
-This document describes MCE::Subs version 1.806
+This document describes MCE::Subs version 1.807
=head1 SYNOPSIS
diff --git a/lib/MCE/Util.pm b/lib/MCE/Util.pm
index d29d554..637775c 100644
--- a/lib/MCE/Util.pm
+++ b/lib/MCE/Util.pm
@@ -11,7 +11,7 @@ use warnings;
no warnings qw( threads recursion uninitialized );
-our $VERSION = '1.806';
+our $VERSION = '1.807';
## no critic (BuiltinFunctions::ProhibitStringyEval)
@@ -428,7 +428,7 @@ MCE::Util - Utility functions
=head1 VERSION
-This document describes MCE::Util version 1.806
+This document describes MCE::Util version 1.807
=head1 SYNOPSIS