diff options
author | Angel Abad <angelabad@gmail.com> | 2016-11-03 08:37:59 +0100 |
---|---|---|
committer | Angel Abad <angelabad@gmail.com> | 2016-11-03 08:37:59 +0100 |
commit | afb6328aa0d67bd9c1e56b6c89adf956c35d1362 (patch) | |
tree | f46d4dda005d92999532d60fbf39a43a34b9566d /lib | |
parent | f14f07a71464d263f46e14ebafbe4894ec19a3e2 (diff) |
New upstream version 1.807
Diffstat (limited to 'lib')
-rw-r--r-- | lib/MCE.pm | 21 | ||||
-rw-r--r-- | lib/MCE.pod | 2 | ||||
-rw-r--r-- | lib/MCE/Candy.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Core.pod | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Generator.pm | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Handle.pm | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Iterator.pm | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Request.pm | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Sequence.pm | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Manager.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Core/Validation.pm | 2 | ||||
-rw-r--r-- | lib/MCE/Core/Worker.pm | 13 | ||||
-rw-r--r-- | lib/MCE/Examples.pod | 111 | ||||
-rw-r--r-- | lib/MCE/Flow.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Grep.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Loop.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Map.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Mutex.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Queue.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Relay.pm | 446 | ||||
-rw-r--r-- | lib/MCE/Signal.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Step.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Stream.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Subs.pm | 4 | ||||
-rw-r--r-- | lib/MCE/Util.pm | 4 |
25 files changed, 553 insertions, 106 deletions
@@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -86,7 +86,7 @@ BEGIN { ## _chunk_id _mce_sid _mce_tid _pids _run_mode _single_dim _thrs _tids _wid ## _exiting _exit_pid _total_exited _total_running _total_workers _task_wid ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status - ## _init_pid _init_total_workers _last_sref _mgr_live _rla_data _rla_return + ## _init_pid _init_total_workers _last_sref _mgr_live _rla_data ## ## _bsb_r_sock _bsb_w_sock _bse_r_sock _bse_w_sock _com_r_sock _com_w_sock ## _dat_r_sock _dat_w_sock _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock @@ -280,7 +280,7 @@ sub DESTROY { END { if ( defined $MCE ) { - if ( !$_has_threads ) { + if ( !$_has_threads || (defined $TOP_HDLR && !$TOP_HDLR->{use_threads}) ) { MCE::Flow->finish ( 'MCE' ) if $INC{'MCE/Flow.pm'}; MCE::Grep->finish ( 'MCE' ) if $INC{'MCE/Grep.pm'}; MCE::Loop->finish ( 'MCE' ) if $INC{'MCE/Loop.pm'}; @@ -764,6 +764,9 @@ sub relay (;&) { unless (defined $MCE->{init_relay}); } +*relay_lock = \&relay_recv; +*relay_unlock = \&relay; + ############################################################################### ## ---------------------------------------------------------------------------- ## Restart worker method. @@ -1366,14 +1369,7 @@ sub abort { my $_lock_chn = $self->{_lock_chn}; $_DAT_LOCK->lock() if $_lock_chn; - - if (exists $self->{_rla_return}) { - print {$_DAT_W_SOCK} OUTPUT_W_RLA.$LF . $_chn.$LF; - print {$_DAU_W_SOCK} (delete $self->{_rla_return}).$LF; - } - print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF; - $_DAT_LOCK->unlock() if $_lock_chn; } } @@ -1414,11 +1410,6 @@ sub exit { $_DAT_LOCK->lock() if $_lock_chn; - if (exists $self->{_rla_return}) { - print {$_DAT_W_SOCK} OUTPUT_W_RLA.$LF . $_chn.$LF; - print {$_DAU_W_SOCK} (delete $self->{_rla_return}).$LF; - } - print {$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF; print {$_DAU_W_SOCK} $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF . diff --git a/lib/MCE.pod b/lib/MCE.pod index cc69af5..787c71d 100644 --- a/lib/MCE.pod +++ b/lib/MCE.pod @@ -5,7 +5,7 @@ MCE - Many-Core Engine for Perl providing parallel processing capabilities =head1 VERSION -This document describes MCE version 1.806 +This document describes MCE version 1.807 Many-Core Engine (MCE) for Perl helps enable a new level of performance by maximizing all available cores. diff --git a/lib/MCE/Candy.pm b/lib/MCE/Candy.pm index 8e52735..eef1dee 100644 --- a/lib/MCE/Candy.pm +++ b/lib/MCE/Candy.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; our @CARP_NOT = qw( MCE ); @@ -210,7 +210,7 @@ MCE::Candy - Sugar methods and output iterators =head1 VERSION -This document describes MCE::Candy version 1.806 +This document describes MCE::Candy version 1.807 =head1 DESCRIPTION diff --git a/lib/MCE/Core.pod b/lib/MCE/Core.pod index 40b7273..e8a3c7c 100644 --- a/lib/MCE/Core.pod +++ b/lib/MCE/Core.pod @@ -5,7 +5,7 @@ MCE::Core - Documentation describing the core MCE API =head1 VERSION -This document describes MCE::Core version 1.806 +This document describes MCE::Core version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm index 19ee15d..c08c90b 100644 --- a/lib/MCE/Core/Input/Generator.pm +++ b/lib/MCE/Core/Input/Generator.pm @@ -15,7 +15,7 @@ package MCE::Core::Input::Generator; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm index 52622a3..2d277cb 100644 --- a/lib/MCE/Core/Input/Handle.pm +++ b/lib/MCE/Core/Input/Handle.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Handle; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm index f5cc2ec..32c7be1 100644 --- a/lib/MCE/Core/Input/Iterator.pm +++ b/lib/MCE/Core/Input/Iterator.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm index 16fb6b0..4280eb5 100644 --- a/lib/MCE/Core/Input/Request.pm +++ b/lib/MCE/Core/Input/Request.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Request; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm index 8803300..727b9b8 100644 --- a/lib/MCE/Core/Input/Sequence.pm +++ b/lib/MCE/Core/Input/Sequence.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Manager.pm b/lib/MCE/Core/Manager.pm index 9cf6304..5b43592 100644 --- a/lib/MCE/Core/Manager.pm +++ b/lib/MCE/Core/Manager.pm @@ -14,7 +14,7 @@ package MCE::Core::Manager; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -838,7 +838,7 @@ sub _output_loop { unless ($_nbytes = unpack('I', $_val_bytes)) { if ($_count) { # delay after a while to not consume a CPU core - $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.005; + $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030; } else { sleep 0.030; } diff --git a/lib/MCE/Core/Validation.pm b/lib/MCE/Core/Validation.pm index 0dbac72..b9a4901 100644 --- a/lib/MCE/Core/Validation.pm +++ b/lib/MCE/Core/Validation.pm @@ -14,7 +14,7 @@ package MCE::Core::Validation; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. diff --git a/lib/MCE/Core/Worker.pm b/lib/MCE/Core/Worker.pm index 66514cd..41cecf7 100644 --- a/lib/MCE/Core/Worker.pm +++ b/lib/MCE/Core/Worker.pm @@ -14,7 +14,7 @@ package MCE::Core::Worker; use strict; use warnings; -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## Items below are folded into MCE. @@ -399,7 +399,9 @@ sub _worker_do { ## Call user_begin if defined. if (defined $self->{user_begin}) { + $self->{_chunk_id} = 0; $self->{user_begin}($self, $_task_id, $_task_name); + $self->sync if ($_task_id == 0 && defined $self->{init_relay}); } ## Retry chunk if previous attempt died. @@ -446,7 +448,7 @@ sub _worker_do { _worker_read_handle($self, READ_MEMORY, $self->{input_data}); } elsif (defined $self->{user_func}) { - $self->{_chunk_id} = $self->{_task_wid}; + $self->{_chunk_id} = 0; $self->{user_func}->($self); } @@ -456,6 +458,8 @@ sub _worker_do { ## Call user_end if defined. if (defined $self->{user_end}) { + $self->{_chunk_id} = 0; + $self->sync if ($_task_id == 0 && defined $self->{init_relay}); $self->{user_end}($self, $_task_id, $_task_name); } @@ -467,11 +471,6 @@ sub _worker_do { $_DAT_LOCK->lock() if $_lock_chn; - if (exists $self->{_rla_return}) { - print {$_DAT_W_SOCK} OUTPUT_W_RLA.$LF . $_chn.$LF; - print {$_DAU_W_SOCK} (delete $self->{_rla_return}).$LF; - } - print {$_DAT_W_SOCK} OUTPUT_W_DNE.$LF . $_chn.$LF; print {$_DAU_W_SOCK} $_task_id.$LF; diff --git a/lib/MCE/Examples.pod b/lib/MCE/Examples.pod index 0feac0b..58cfe03 100644 --- a/lib/MCE/Examples.pod +++ b/lib/MCE/Examples.pod @@ -5,7 +5,7 @@ MCE::Examples - Various examples and demonstrations =head1 VERSION -This document describes MCE::Examples version 1.806 +This document describes MCE::Examples version 1.807 =head1 INCLUDED WITH THE DISTRIBUTION @@ -465,6 +465,115 @@ user_begin and mce_loop. Take notice of the comma after \&_func though. 2: 3 2: 4 +=head1 MANDELBROT DEMONSTRATION + +For the next demonstration, L<MCE::Relay> allows a section of code to run +serially and orderly between workers. Relay capabilities is enabled with +the C<init_relay> option, which loads MCE::Relay. + + # perl mandelbrot.pl 16000 > image.pbm + # outputs a pbm binary to STDOUT + + # The Computer Language Benchmarks Game + # http://benchmarksgame.alioth.debian.org/ + # + # Started with: + # C# : Adapted by Antti Lankila from Isaac Gouy's implementation + # Perl: Contributed by Mykola Zubach + # + # MCE::Loop version by Mario Roy + # requires MCE 1.807+ + + use strict; + use warnings; + + use MCE::Loop; + + use constant MAXITER => 50; + use constant LIMIT => 4.0; + use constant XMIN => -1.5; + use constant YMIN => -1.0; + + my ( $w, $h, $m, $invN ); + + sub draw_lines { + my ( $y1, $y2 ) = @_; + my @result; + + # Workers run simultaneously, in parallel. + + for my $y ( $y1 .. $y2 ) { + my ( $bits, $xcounter, @line ) = ( 0, 0 ); + my $Ci = $y * $invN + YMIN; + + for my $x ( 0 .. $w - 1 ) { + my ( $Zr, $Zi, $Tr, $Ti ) = ( 0, 0, 0, 0 ); + my $Cr = $x * $invN + XMIN; + + $bits = $bits << 1; + + for ( 1 .. MAXITER ) { + $Zi = $Zi * 2 * $Zr + $Ci; + $Zr = $Tr - $Ti + $Cr; + $Ti = $Zi * $Zi, $Tr = $Zr * $Zr; + + $bits |= 1, last if ( $Tr + $Ti > LIMIT ); + } + + if ( ++$xcounter == 8 ) { + push @line, $bits ^ 0xff; + $bits = $xcounter = 0; + } + } + + if ( $xcounter ) { + push @line, ( $bits << ( 8 - $xcounter ) ) ^ 0xff; + } + + push @result, pack 'C*', @line; + } + + # Statements between lock & unlock are processed serially & orderly. + + MCE->relay_lock; + + print @result; # Workers display upper-half only. + MCE->gather( @result ); # Gather lines for the manager-process. + + MCE->relay_unlock; + } + + ## MAIN() + + # Important, must flush output immediately. + + $| = 1; binmode STDOUT; + + $w = $h = shift || 200; + $m = int( $h / 2 ); + $invN = 2 / $w; + + print "P4\n$w $h\n"; # PBM image header. + + # Workers display upper-half only. Also, lines are gathered to be + # displayed later by the manager-process after running. + + MCE::Loop->init( + init_relay => 0, # Enables MCE::Relay capabilities if defined. + max_workers => 4, + bounds_only => 1, + ); + + my @upper = mce_loop_s { draw_lines( $_[1][0], $_[1][1] ) } 0, $m; + + MCE::Loop->finish; + + # Remove first and last lines from the upper half. + # Then, output bottom half. + + shift @upper, pop @upper; + print reverse @upper; + =head1 MONTE CARLO SIMULATION There is an article on the web (search for comp.lang.perl.misc MCE) suggesting diff --git a/lib/MCE/Flow.pm b/lib/MCE/Flow.pm index 342015f..0e1bf33 100644 --- a/lib/MCE/Flow.pm +++ b/lib/MCE/Flow.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -499,7 +499,7 @@ MCE::Flow - Parallel flow model for building creative applications =head1 VERSION -This document describes MCE::Flow version 1.806 +This document describes MCE::Flow version 1.807 =head1 DESCRIPTION diff --git a/lib/MCE/Grep.pm b/lib/MCE/Grep.pm index 9355fcb..ea1a4a5 100644 --- a/lib/MCE/Grep.pm +++ b/lib/MCE/Grep.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -449,7 +449,7 @@ MCE::Grep - Parallel grep model similar to the native grep function =head1 VERSION -This document describes MCE::Grep version 1.806 +This document describes MCE::Grep version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Loop.pm b/lib/MCE/Loop.pm index a2444e7..5fe184e 100644 --- a/lib/MCE/Loop.pm +++ b/lib/MCE/Loop.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -369,7 +369,7 @@ MCE::Loop - Parallel loop model for building creative loops =head1 VERSION -This document describes MCE::Loop version 1.806 +This document describes MCE::Loop version 1.807 =head1 DESCRIPTION diff --git a/lib/MCE/Map.pm b/lib/MCE/Map.pm index 81efe8f..d7832ab 100644 --- a/lib/MCE/Map.pm +++ b/lib/MCE/Map.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -449,7 +449,7 @@ MCE::Map - Parallel map model similar to the native map function =head1 VERSION -This document describes MCE::Map version 1.806 +This document describes MCE::Map version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Mutex.pm b/lib/MCE/Mutex.pm index 6364ba5..ffd6edc 100644 --- a/lib/MCE/Mutex.pm +++ b/lib/MCE/Mutex.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; use MCE::Util qw( $LF ); @@ -111,7 +111,7 @@ MCE::Mutex - Locking for Many-Core Engine =head1 VERSION -This document describes MCE::Mutex version 1.806 +This document describes MCE::Mutex version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Queue.pm b/lib/MCE/Queue.pm index d9b56d2..8d2822a 100644 --- a/lib/MCE/Queue.pm +++ b/lib/MCE/Queue.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (Subroutines::ProhibitExplicitReturnUndef) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -1637,7 +1637,7 @@ MCE::Queue - Hybrid (normal and priority) queues =head1 VERSION -This document describes MCE::Queue version 1.806 +This document describes MCE::Queue version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Relay.pm b/lib/MCE/Relay.pm index 174878c..3ffa186 100644 --- a/lib/MCE/Relay.pm +++ b/lib/MCE/Relay.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -55,21 +55,15 @@ sub import { ############################################################################### { - my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_chunkid, $_rla_nextid); + my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_nextid, $_max_workers); my %_output_function = ( OUTPUT_W_RLA.$LF => sub { # Worker has relayed - $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; + # $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; - my ($_chunk_id, $_next_id) = split(':', <$_DAU_R_SOCK>); - - if ($_chunk_id > $_rla_chunkid) { - chomp $_next_id; - $_rla_chunkid = $_chunk_id; - $_rla_nextid = $_next_id; - } + $_rla_nextid = 0 if ( ++$_rla_nextid == $_max_workers ); return; }, @@ -82,6 +76,10 @@ sub import { my $_caller = $_MCE->{_caller}; + $_max_workers = (exists $_MCE->{user_tasks}) + ? $_MCE->{user_tasks}[0]{max_workers} + : $_MCE->{max_workers}; + ## Write initial relay data. if (defined $_MCE->{init_relay}) { my $_ref = ref $_MCE->{init_relay}; @@ -104,7 +102,7 @@ sub import { print {$_RLA_W_SOCK} length($_init_relay) . $LF . $_init_relay; - $_rla_chunkid = $_rla_nextid = 0; + $_rla_nextid = 0; } delete $MCE::RLA->{$_caller}; @@ -130,8 +128,8 @@ sub import { } ## Clear variables. - $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = $_rla_chunkid = $_rla_nextid = - undef; + $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = undef; + $_rla_nextid = $_max_workers = undef; return; } @@ -197,17 +195,22 @@ sub relay_recv { my $x = shift; my $self = ref($x) ? $x : $MCE::MCE; - _croak('MCE::relay: (init_relay) is not specified') + _croak('MCE::relay_recv: (init_relay) is not specified') unless (defined $self->{init_relay}); - _croak('MCE::relay: method is not allowed by the manager process') + _croak('MCE::relay_recv: method is not allowed by the manager process') unless ($self->{_wid}); - _croak('MCE::relay: method is not allowed by this sub task') + _croak('MCE::relay_recv: method is not allowed by task_id > 0') if ($self->{_task_id} > 0); - my $_chn = ($self->{_chunk_id} - 1) % $self->{max_workers}; - my $_rdr = $self->{_rla_r_sock}->[$_chn]; + my ($_chn, $_nxt, $_rdr, $_len, $_ref); local $_; + + $_chn = $self->{_chunk_id} || $self->{_wid}; + $_chn = ($_chn - 1) % $self->{max_workers}; + $_nxt = $_chn + 1; + $_nxt = 0 if ($_nxt == $self->{max_workers}); + $_rdr = $self->{_rla_r_sock}->[$_chn]; - my ($_len, $_ref); local $_; + print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF; chomp($_len = <$_rdr>); read $_rdr, $_, $_len; @@ -247,7 +250,7 @@ sub relay (;&) { unless (defined $self->{init_relay}); _croak('MCE::relay: method is not allowed by the manager process') unless ($self->{_wid}); - _croak('MCE::relay: method is not allowed by this sub task') + _croak('MCE::relay: method is not allowed by task_id > 0') if ($self->{_task_id} > 0); if (ref $_code ne 'CODE') { @@ -256,12 +259,14 @@ sub relay (;&) { weaken $_code; } - my $_chn = ($self->{_chunk_id} - 1) % $self->{max_workers}; - my $_nxt = $_chn + 1; $_nxt = 0 if ($_nxt == $self->{max_workers}); - my $_rdr = $self->{_rla_r_sock}->[$_chn]; - my $_wtr = $self->{_rla_w_sock}->[$_nxt]; + my ($_chn, $_nxt, $_rdr, $_wtr); - $self->{_rla_return} = $self->{_chunk_id} .':'. $_nxt; + $_chn = $self->{_chunk_id} || $self->{_wid}; + $_chn = ($_chn - 1) % $self->{max_workers}; + $_nxt = $_chn + 1; + $_nxt = 0 if ($_nxt == $self->{max_workers}); + $_rdr = $self->{_rla_r_sock}->[$_chn]; + $_wtr = $self->{_rla_w_sock}->[$_nxt]; if (exists $self->{_rla_data}) { local $_ = delete $self->{_rla_data}; @@ -283,6 +288,8 @@ sub relay (;&) { else { my ($_len, $_ref); local $_; + print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF; + chomp($_len = <$_rdr>); read $_rdr, $_, $_len; $_ref = chop $_; @@ -315,6 +322,11 @@ sub relay (;&) { return; } +## Aliases. + +*relay_lock = \&relay_recv; +*relay_unlock = \&relay; + 1; __END__ @@ -331,7 +343,7 @@ MCE::Relay - Extends Many-Core Engine with relay capabilities =head1 VERSION -This document describes MCE::Relay version 1.806 +This document describes MCE::Relay version 1.807 =head1 SYNOPSIS @@ -342,7 +354,9 @@ This document describes MCE::Relay version 1.806 ## Line Count ####################################### mce_flow_f { - use_slurpio => 1, init_relay => 0, + max_workers => 4, + use_slurpio => 1, + init_relay => 0, }, sub { my ($mce, $slurp_ref, $chunk_id) = @_; @@ -359,14 +373,22 @@ This document describes MCE::Relay version 1.806 ## Orderly Action ################################### + $| = 1; # Important, must flush output immediately. + mce_flow_f { - use_slurpio => 1, init_relay => 0, + max_workers => 2, + use_slurpio => 1, + init_relay => 0, }, sub { my ($mce, $slurp_ref, $chunk_id) = @_; - ## Exclusive access to STDOUT. Relays 0. - MCE::relay { print $$slurp_ref }; + ## The relay value is relayed and remains 0. + ## Writes to STDOUT orderly. + + MCE->relay_lock; + print $$slurp_ref; + MCE->relay_unlock; }, $file; @@ -374,13 +396,12 @@ This document describes MCE::Relay version 1.806 This module enables workers to receive and pass on information orderly with zero involvement by the manager process while running. The module is loaded -automatically when init_relay is specified. +automatically when MCE option C<init_relay> is specified. -All workers must participate when relaying data. Calling relay more than once -is not recommended inside the block. Doing so will stall the application. +All workers (belonging to task_id 0) must participate when relaying data. -Relaying is not meant for passing big data. The last worker will likely stall -if exceeding the buffer size for the socket. Not exceeding 16 KiB - 7 is safe +Relaying is not meant for passing big data. The last worker will stall if +exceeding the buffer size for the socket. Not exceeding 16 KiB - 7 is safe across all platforms. =head1 API DOCUMENTATION @@ -500,7 +521,7 @@ Or simply a scalar value. =item MCE->relay_final ( void ) -Call this method to obtain the final relay values after running. See included +Call this method to obtain the final relay value(s) after running. See included example findnull.pl for another use case. use MCE max_workers => 4; @@ -531,9 +552,183 @@ example findnull.pl for another use case. =item MCE->relay_recv ( void ) -The relay_recv method allows one to perform an exclusive action prior to -relaying. Below, the user_func is taken from the cat.pl example. Relaying -is chunk_id driven (or task_wid when not processing input), thus orderly. +Call this method to obtain the next relay value before relaying. This allows +serial-code to be processed orderly between workers. The following is a parallel +demonstration for the fasta-benchmark on the web. + + # perl fasta.pl 25000000 + + # The Computer Language Benchmarks game + # http://benchmarksgame.alioth.debian.org/ + # + # contributed by Barry Walsh + # port of fasta.rb #6 + # + # MCE::Flow version by Mario Roy + # requires MCE 1.807+ + # requires MCE::Shared 1.806+ + + use strict; + use warnings; + use feature 'say'; + + use MCE::Flow; + use MCE::Shared; + use MCE::Candy; + + use constant IM => 139968; + use constant IA => 3877; + use constant IC => 29573; + + my $LAST = MCE::Shared->scalar( 42 ); + + my $alu = + 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' . + 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' . + 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' . + 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' . + 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' . + 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' . + 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA'; + + my $iub = [ + [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ], + [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ], + [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ], + [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ], + [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ] + ]; + + my $homosapiens = [ + [ 'a', 0.3029549426680 ], + [ 'c', 0.1979883004921 ], + [ 'g', 0.1975473066391 ], + [ 't', 0.3015094502008 ] + ]; + + sub make_repeat_fasta { + my ( $src, $n ) = @_; + my $width = qr/(.{1,60})/; + my $l = length $src; + my $s = $src x ( ($n / $l) + 1 ); + substr( $s, $n, $l ) = ''; + + while ( $s =~ m/$width/g ) { say $1 } + } + + sub make_random_fasta { + my ( $table, $n ) = @_; + my $rand = undef; + my $width = 60; + my $prob = 0.0; + my $output = ''; + my ( $c1, $c2, $last ); + + $_->[1] = ( $prob += $_->[1] ) for @$table; + + $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;'; + $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table; + + my $seq = MCE::Shared->sequence( + { chunk_size => 2000, bounds_only => 1 }, + 1, $n / $width + ); + + my $code1 = q{ + while ( 1 ) { + # -------------------------------------------- + # Process code orderly between workers. + # -------------------------------------------- + + my $chunk_id = MCE->relay_recv; + my ( $begin, $end ) = $seq->next; + + MCE->relay, last if ( !defined $begin ); + + my $last = $LAST->get; + my $temp = $last; + + # Pre-compute $LAST value for the next worker + for ( 1 .. ( $end - $begin + 1 ) * $width ) { + $temp = ( $temp * IA + IC ) % IM; + } + + $LAST->set( $temp ); + + # Increment chunk_id value + MCE->relay( sub { $_ += 1 } ); + + # -------------------------------------------- + # Also run code in parallel between workers. + # -------------------------------------------- + + for ( $begin .. $end ) { + for ( 1 .. $width ) { !C! } + $output .= "\n"; + } + + # -------------------------------------------- + # Display orderly. + # -------------------------------------------- + + MCE->gather( $chunk_id, $output ); + + $output = ''; + } + }; + + $code1 =~ s/!C!/$c1/g; + + MCE::Flow->init( + max_workers => 4, ## MCE::Util->get_ncpu || 4, + gather => MCE::Candy::out_iter_fh( \*STDOUT ), + init_relay => 1, + use_threads => 0, + ); + + MCE::Flow->run( sub { eval $code1 } ); + MCE::Flow->finish; + + $last = $LAST->get; + + $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;'; + $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table; + + my $code2 = q{ + if ( $n % $width != 0 ) { + for ( 1 .. $n % $width ) { !C! } + print "\n"; + } + }; + + $code2 =~ s/!C!/$c2/g; + eval $code2; + + $LAST->set( $last ); + } + + my $n = $ARGV[0] || 27; + + say ">ONE Homo sapiens alu"; + make_repeat_fasta( $alu, $n * 2 ); + + say ">TWO IUB ambiguity codes"; + make_random_fasta( $iub, $n * 3 ); + + say ">THREE Homo sapiens frequency"; + make_random_fasta( $homosapiens, $n * 5 ); + +=item MCE->relay_lock ( void ) + +=item MCE->relay_unlock ( void ) + +The C<relay_lock> and C<relay_unlock> methods, added to MCE 1.807, are +aliases for C<relay_recv> and C<relay> respectively. They allow one to +perform an exclusive action prior to actual relaying of data. + +Below, C<user_func> is taken from the C<cat.pl> MCE example. Relaying is +driven by C<chunk_id> or C<task_wid> when not processing input, thus +occurs orderly. user_func => sub { my ($mce, $chunk_ref, $chunk_id) = @_; @@ -554,24 +749,177 @@ is chunk_id driven (or task_wid when not processing input), thus orderly. else { ## The following is another way to have ordered output. Workers ## write directly to STDOUT exclusively without any involvement - ## from the manager process. The statements between relay_recv - ## and relay run serially and most important orderly. - - ## STDERR/OUT flush automatically inside worker threads and - ## processes. Disable buffering on file handles otherwise. + ## from the manager process. The statement(s) between relay_lock + ## and relay_unlock run serially and most important orderly. - MCE->relay_recv; ## my $val = MCE->relay_recv; - ## relay simply forwards 0 below + MCE->relay_lock; # alias for MCE->relay_recv - print $$chunk_ref; ## exclusive access to STDOUT - ## important, flush immediately + print $$chunk_ref; # ensure $| = 1 in script - MCE->relay; + MCE->relay_unlock; # alias for MCE->relay } return; } +The following is a variant of the fasta-benchmark demonstration shown above. +Here, workers write exclusively and orderly to C<STDOUT>. + + # perl fasta.pl 25000000 + + # The Computer Language Benchmarks game + # http://benchmarksgame.alioth.debian.org/ + # + # contributed by Barry Walsh + # port of fasta.rb #6 + # + # MCE::Flow version by Mario Roy + # requires MCE 1.807+ + # requires MCE::Shared 1.806+ + + use strict; + use warnings; + use feature 'say'; + + use MCE::Flow; + use MCE::Shared; + + use constant IM => 139968; + use constant IA => 3877; + use constant IC => 29573; + + my $LAST = MCE::Shared->scalar( 42 ); + + my $alu = + 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' . + 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' . + 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' . + 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' . + 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' . + 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' . + 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA'; + + my $iub = [ + [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ], + [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ], + [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ], + [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ], + [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ] + ]; + + my $homosapiens = [ + [ 'a', 0.3029549426680 ], + [ 'c', 0.1979883004921 ], + [ 'g', 0.1975473066391 ], + [ 't', 0.3015094502008 ] + ]; + + sub make_repeat_fasta { + my ( $src, $n ) = @_; + my $width = qr/(.{1,60})/; + my $l = length $src; + my $s = $src x ( ($n / $l) + 1 ); + substr( $s, $n, $l ) = ''; + + while ( $s =~ m/$width/g ) { say $1 } + } + + sub make_random_fasta { + my ( $table, $n ) = @_; + my $rand = undef; + my $width = 60; + my $prob = 0.0; + my $output = ''; + my ( $c1, $c2, $last ); + + $_->[1] = ( $prob += $_->[1] ) for @$table; + + $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;'; + $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table; + + my $seq = MCE::Shared->sequence( + { chunk_size => 2000, bounds_only => 1 }, + 1, $n / $width + ); + + my $code1 = q{ + $| = 1; # Important, must flush output immediately. + + while ( 1 ) { + # -------------------------------------------- + # Process code orderly between workers. + # -------------------------------------------- + + MCE->relay_lock; + + my ( $begin, $end ) = $seq->next; + print( $output ), $output = '' if ( length $output ); + + MCE->relay_unlock, last if ( !defined $begin ); + + my $last = $LAST->get; + my $temp = $last; + + # Pre-compute $LAST value for the next worker + for ( 1 .. ( $end - $begin + 1 ) * $width ) { + $temp = ( $temp * IA + IC ) % IM; + } + + $LAST->set( $temp ); + + MCE->relay_unlock; + + # -------------------------------------------- + # Also run code in parallel. + # -------------------------------------------- + + for ( $begin .. $end ) { + for ( 1 .. $width ) { !C! } + $output .= "\n"; + } + } + }; + + $code1 =~ s/!C!/$c1/g; + + MCE::Flow->init( + max_workers => 4, ## MCE::Util->get_ncpu || 4, + init_relay => 0, + use_threads => 0, + ); + + MCE::Flow->run( sub { eval $code1 } ); + MCE::Flow->finish; + + $last = $LAST->get; + + $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;'; + $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table; + + my $code2 = q{ + if ( $n % $width != 0 ) { + for ( 1 .. $n % $width ) { !C! } + print "\n"; + } + }; + + $code2 =~ s/!C!/$c2/g; + eval $code2; + + $LAST->set( $last ); + } + + my $n = $ARGV[0] || 27; + + say ">ONE Homo sapiens alu"; + make_repeat_fasta( $alu, $n * 2 ); + + say ">TWO IUB ambiguity codes"; + make_random_fasta( $iub, $n * 3 ); + + say ">THREE Homo sapiens frequency"; + make_random_fasta( $homosapiens, $n * 5 ); + =back =head1 INDEX diff --git a/lib/MCE/Signal.pm b/lib/MCE/Signal.pm index e51bb2e..73b6ea4 100644 --- a/lib/MCE/Signal.pm +++ b/lib/MCE/Signal.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) @@ -451,7 +451,7 @@ MCE::Signal - Temporary directory creation/cleanup and signal handling =head1 VERSION -This document describes MCE::Signal version 1.806 +This document describes MCE::Signal version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Step.pm b/lib/MCE/Step.pm index 4a38e03..d9eca59 100644 --- a/lib/MCE/Step.pm +++ b/lib/MCE/Step.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -730,7 +730,7 @@ MCE::Step - Parallel step model for building creative steps =head1 VERSION -This document describes MCE::Step version 1.806 +This document describes MCE::Step version 1.807 =head1 DESCRIPTION diff --git a/lib/MCE/Stream.pm b/lib/MCE/Stream.pm index 8990dc2..a21e6c0 100644 --- a/lib/MCE/Stream.pm +++ b/lib/MCE/Stream.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) ## no critic (Subroutines::ProhibitSubroutinePrototypes) @@ -679,7 +679,7 @@ MCE::Stream - Parallel stream model for chaining multiple maps and greps =head1 VERSION -This document describes MCE::Stream version 1.806 +This document describes MCE::Stream version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Subs.pm b/lib/MCE/Subs.pm index aa09dd9..e6fb8da 100644 --- a/lib/MCE/Subs.pm +++ b/lib/MCE/Subs.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (Subroutines::ProhibitSubroutinePrototypes) ## no critic (TestingAndDebugging::ProhibitNoStrict) @@ -204,7 +204,7 @@ MCE::Subs - Exports functions mapped directly to MCE methods =head1 VERSION -This document describes MCE::Subs version 1.806 +This document describes MCE::Subs version 1.807 =head1 SYNOPSIS diff --git a/lib/MCE/Util.pm b/lib/MCE/Util.pm index d29d554..637775c 100644 --- a/lib/MCE/Util.pm +++ b/lib/MCE/Util.pm @@ -11,7 +11,7 @@ use warnings; no warnings qw( threads recursion uninitialized ); -our $VERSION = '1.806'; +our $VERSION = '1.807'; ## no critic (BuiltinFunctions::ProhibitStringyEval) @@ -428,7 +428,7 @@ MCE::Util - Utility functions =head1 VERSION -This document describes MCE::Util version 1.806 +This document describes MCE::Util version 1.807 =head1 SYNOPSIS |