summaryrefslogtreecommitdiff
path: root/lib/MCE/Core/Input/Sequence.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/MCE/Core/Input/Sequence.pm')
-rw-r--r--lib/MCE/Core/Input/Sequence.pm54
1 files changed, 44 insertions, 10 deletions
diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm
index 92bc2e8..baea8bc 100644
--- a/lib/MCE/Core/Input/Sequence.pm
+++ b/lib/MCE/Core/Input/Sequence.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence;
use strict;
use warnings;
-our $VERSION = '1.810';
+our $VERSION = '1.830';
## Items below are folded into MCE.
@@ -41,14 +41,33 @@ sub _worker_sequence_queue {
_croak('MCE::_worker_sequence_queue: (user_func) is not specified')
unless (defined $self->{user_func});
+ my $_DAT_LOCK = $self->{_dat_lock};
my $_QUE_R_SOCK = $self->{_que_r_sock};
my $_QUE_W_SOCK = $self->{_que_w_sock};
+ my $_lock_chn = $self->{_lock_chn};
my $_bounds_only = $self->{bounds_only} || 0;
my $_chunk_size = $self->{chunk_size};
my $_wuf = $self->{_wuf};
my ($_next, $_chunk_id, $_seq_n, $_begin, $_end, $_step, $_fmt);
- my ($_abort, $_offset);
+ my ($_dat_ex, $_dat_un, $_pid, $_abort, $_offset);
+
+ if ($_lock_chn) {
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ if ($self->{_data_channels} > 6) {
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )};
+ }
+ $_dat_ex = sub {
+ sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
+ }
if (ref $self->{sequence} eq 'ARRAY') {
($_begin, $_end, $_step, $_fmt) = @{ $self->{sequence} };
@@ -77,23 +96,32 @@ sub _worker_sequence_queue {
while (1) {
## Obtain the next chunk_id and sequence number.
- sysread $_QUE_R_SOCK, $_next, $_que_read_size;
+ $_dat_ex->() if $_lock_chn;
+
+ 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'});
+
($_chunk_id, $_offset) = unpack($_que_template, $_next);
if ($_offset >= $_abort) {
- syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset);
+ 1 until syswrite (
+ $_QUE_W_SOCK, pack($_que_template, 0, $_offset)
+ ) || ($! && !$!{'EINTR'});
+
+ $_dat_un->() if $_lock_chn;
return;
}
- syswrite $_QUE_W_SOCK,
- pack($_que_template, $_chunk_id + 1, $_offset + 1);
+ 1 until syswrite (
+ $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1)
+ ) || ($! && !$!{'EINTR'});
+ $_dat_un->() if $_lock_chn;
$_chunk_id++;
## Call user function.
if ($_chunk_size == 1) {
$_ = $_offset * $_step + $_begin;
- $_ = sprintf("%$_fmt", $_) if (defined $_fmt);
+ $_ = _sprintf("%$_fmt", $_) if (defined $_fmt);
$_ = [ $_, $_ ] if ($_bounds_only);
$_wuf->($self, $_, $_chunk_id);
}
@@ -110,6 +138,9 @@ sub _worker_sequence_queue {
if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) {
$_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
}
+ elsif ($_step == 1) {
+ $_tmp_e = $_end;
+ }
else {
for my $_i (1 .. $_chunk_size) {
last if ($_seq_n > $_end);
@@ -122,6 +153,9 @@ sub _worker_sequence_queue {
if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) {
$_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
}
+ elsif ($_step == -1) {
+ $_tmp_e = $_end;
+ }
else {
for my $_i (1 .. $_chunk_size) {
last if ($_seq_n < $_end);
@@ -132,7 +166,7 @@ sub _worker_sequence_queue {
}
@_n = (defined $_fmt)
- ? ( sprintf("%$_fmt",$_tmp_b), sprintf("%$_fmt",$_tmp_e) )
+ ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) )
: ( $_tmp_b, $_tmp_e );
}
@@ -153,7 +187,7 @@ sub _worker_sequence_queue {
last if ($_seq_n > $_end);
push @_n, (defined $_fmt)
- ? sprintf("%$_fmt", $_seq_n) : $_seq_n;
+ ? _sprintf("%$_fmt", $_seq_n) : $_seq_n;
$_seq_n = $_step * $_i + $_n_begin;
}
@@ -164,7 +198,7 @@ sub _worker_sequence_queue {
last if ($_seq_n < $_end);
push @_n, (defined $_fmt)
- ? sprintf("%$_fmt", $_seq_n) : $_seq_n;
+ ? _sprintf("%$_fmt", $_seq_n) : $_seq_n;
$_seq_n = $_step * $_i + $_n_begin;
}