diff options
Diffstat (limited to 'lib/MCE/Core/Input/Sequence.pm')
-rw-r--r-- | lib/MCE/Core/Input/Sequence.pm | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm index 92bc2e8..baea8bc 100644 --- a/lib/MCE/Core/Input/Sequence.pm +++ b/lib/MCE/Core/Input/Sequence.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence; use strict; use warnings; -our $VERSION = '1.810'; +our $VERSION = '1.830'; ## Items below are folded into MCE. @@ -41,14 +41,33 @@ sub _worker_sequence_queue { _croak('MCE::_worker_sequence_queue: (user_func) is not specified') unless (defined $self->{user_func}); + my $_DAT_LOCK = $self->{_dat_lock}; my $_QUE_R_SOCK = $self->{_que_r_sock}; my $_QUE_W_SOCK = $self->{_que_w_sock}; + my $_lock_chn = $self->{_lock_chn}; my $_bounds_only = $self->{bounds_only} || 0; my $_chunk_size = $self->{chunk_size}; my $_wuf = $self->{_wuf}; my ($_next, $_chunk_id, $_seq_n, $_begin, $_end, $_step, $_fmt); - my ($_abort, $_offset); + my ($_dat_ex, $_dat_un, $_pid, $_abort, $_offset); + + if ($_lock_chn) { + $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; + + # inlined for performance + if ($self->{_data_channels} > 6) { + $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; + } + $_dat_ex = sub { + sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 + unless $_DAT_LOCK->{ $_pid }; + }; + $_dat_un = sub { + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + if $_DAT_LOCK->{ $_pid }; + }; + } if (ref $self->{sequence} eq 'ARRAY') { ($_begin, $_end, $_step, $_fmt) = @{ $self->{sequence} }; @@ -77,23 +96,32 @@ sub _worker_sequence_queue { while (1) { ## Obtain the next chunk_id and sequence number. - sysread $_QUE_R_SOCK, $_next, $_que_read_size; + $_dat_ex->() if $_lock_chn; + + 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'}); + ($_chunk_id, $_offset) = unpack($_que_template, $_next); if ($_offset >= $_abort) { - syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset); + 1 until syswrite ( + $_QUE_W_SOCK, pack($_que_template, 0, $_offset) + ) || ($! && !$!{'EINTR'}); + + $_dat_un->() if $_lock_chn; return; } - syswrite $_QUE_W_SOCK, - pack($_que_template, $_chunk_id + 1, $_offset + 1); + 1 until syswrite ( + $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1) + ) || ($! && !$!{'EINTR'}); + $_dat_un->() if $_lock_chn; $_chunk_id++; ## Call user function. if ($_chunk_size == 1) { $_ = $_offset * $_step + $_begin; - $_ = sprintf("%$_fmt", $_) if (defined $_fmt); + $_ = _sprintf("%$_fmt", $_) if (defined $_fmt); $_ = [ $_, $_ ] if ($_bounds_only); $_wuf->($self, $_, $_chunk_id); } @@ -110,6 +138,9 @@ sub _worker_sequence_queue { if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) { $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin; } + elsif ($_step == 1) { + $_tmp_e = $_end; + } else { for my $_i (1 .. $_chunk_size) { last if ($_seq_n > $_end); @@ -122,6 +153,9 @@ sub _worker_sequence_queue { if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) { $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin; } + elsif ($_step == -1) { + $_tmp_e = $_end; + } else { for my $_i (1 .. $_chunk_size) { last if ($_seq_n < $_end); @@ -132,7 +166,7 @@ sub _worker_sequence_queue { } @_n = (defined $_fmt) - ? ( sprintf("%$_fmt",$_tmp_b), sprintf("%$_fmt",$_tmp_e) ) + ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) ) : ( $_tmp_b, $_tmp_e ); } @@ -153,7 +187,7 @@ sub _worker_sequence_queue { last if ($_seq_n > $_end); push @_n, (defined $_fmt) - ? sprintf("%$_fmt", $_seq_n) : $_seq_n; + ? _sprintf("%$_fmt", $_seq_n) : $_seq_n; $_seq_n = $_step * $_i + $_n_begin; } @@ -164,7 +198,7 @@ sub _worker_sequence_queue { last if ($_seq_n < $_end); push @_n, (defined $_fmt) - ? sprintf("%$_fmt", $_seq_n) : $_seq_n; + ? _sprintf("%$_fmt", $_seq_n) : $_seq_n; $_seq_n = $_step * $_i + $_n_begin; } |