summaryrefslogtreecommitdiff
path: root/lib/MCE/Core/Input
diff options
context:
space:
mode:
authorAngel Abad <angelabad@gmail.com>2017-10-01 17:37:25 +0200
committerAngel Abad <angelabad@gmail.com>2017-10-01 17:37:25 +0200
commitaa403938ff08fa6be0e35d7dd66c33be3396d647 (patch)
tree788785fa791e98a8a09bdd3f01b66febefda3150 /lib/MCE/Core/Input
parent80444252bf6887593c90843f61e601d0b86829d7 (diff)
New upstream version 1.830
Diffstat (limited to 'lib/MCE/Core/Input')
-rw-r--r--lib/MCE/Core/Input/Generator.pm18
-rw-r--r--lib/MCE/Core/Input/Handle.pm71
-rw-r--r--lib/MCE/Core/Input/Iterator.pm23
-rw-r--r--lib/MCE/Core/Input/Request.pm37
-rw-r--r--lib/MCE/Core/Input/Sequence.pm54
5 files changed, 153 insertions, 50 deletions
diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm
index 55b162b..549637c 100644
--- a/lib/MCE/Core/Input/Generator.pm
+++ b/lib/MCE/Core/Input/Generator.pm
@@ -15,7 +15,7 @@ package MCE::Core::Input::Generator;
use strict;
use warnings;
-our $VERSION = '1.810';
+our $VERSION = '1.830';
## Items below are folded into MCE.
@@ -73,7 +73,7 @@ sub _worker_sequence_generator {
if ($_wid == 1) {
$self->{_next_jmp} = sub { goto _WORKER_SEQ_GEN__LAST; };
- $_ = (defined $_fmt) ? sprintf("%$_fmt", $_next) : $_next;
+ $_ = (defined $_fmt) ? _sprintf("%$_fmt", $_next) : $_next;
if ($_chunk_size > 1 || $_bounds_only) {
$_ = ($_bounds_only) ? [ $_, $_ ] : [ $_ ];
@@ -92,7 +92,7 @@ sub _worker_sequence_generator {
return if ( $_flag && $_next > $_end);
return if (!$_flag && $_next < $_end);
- $_ = (defined $_fmt) ? sprintf("%$_fmt", $_next) : $_next;
+ $_ = (defined $_fmt) ? _sprintf("%$_fmt", $_next) : $_next;
$_ = [ $_, $_ ] if ($_bounds_only);
$_wuf->($self, $_, $_chunk_id);
@@ -119,6 +119,9 @@ sub _worker_sequence_generator {
if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) {
$_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
}
+ elsif ($_step == 1) {
+ $_tmp_e = $_end if ($_next <= $_end);
+ }
else {
for my $_i (1 .. $_chunk_size) {
last if ($_next > $_end);
@@ -131,6 +134,9 @@ sub _worker_sequence_generator {
if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) {
$_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
}
+ elsif ($_step == -1) {
+ $_tmp_e = $_end if ($_next >= $_end);
+ }
else {
for my $_i (1 .. $_chunk_size) {
last if ($_next < $_end);
@@ -143,7 +149,7 @@ sub _worker_sequence_generator {
return unless (defined $_tmp_e);
@_n = (defined $_fmt)
- ? ( sprintf("%$_fmt",$_tmp_b), sprintf("%$_fmt",$_tmp_e) )
+ ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) )
: ( $_tmp_b, $_tmp_e );
}
@@ -161,7 +167,7 @@ sub _worker_sequence_generator {
last if ($_next > $_end);
push @_n, (defined $_fmt)
- ? sprintf("%$_fmt", $_next) : $_next;
+ ? _sprintf("%$_fmt", $_next) : $_next;
$_next = $_step * $_i + $_n_begin;
}
@@ -172,7 +178,7 @@ sub _worker_sequence_generator {
last if ($_next < $_end);
push @_n, (defined $_fmt)
- ? sprintf("%$_fmt", $_next) : $_next;
+ ? _sprintf("%$_fmt", $_next) : $_next;
$_next = $_step * $_i + $_n_begin;
}
diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm
index 5ccec89..8d9dfed 100644
--- a/lib/MCE/Core/Input/Handle.pm
+++ b/lib/MCE/Core/Input/Handle.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Handle;
use strict;
use warnings;
-our $VERSION = '1.810';
+our $VERSION = '1.830';
## Items below are folded into MCE.
@@ -23,7 +23,6 @@ package # hide from rpm
no warnings qw( threads recursion uninitialized );
-use Fcntl qw( SEEK_CUR );
use bytes;
my $_que_read_size = $MCE::_que_read_size;
@@ -36,7 +35,10 @@ my $_que_template = $MCE::_que_template;
###############################################################################
sub _systell {
- sysseek($_[0], 0, SEEK_CUR);
+ # To minimize memory consumption, SEEK_CUR equals 1 on most platforms.
+ # e.g. use Fcntl qw(SEEK_CUR);
+
+ sysseek($_[0], 0, 1);
}
sub _worker_read_handle {
@@ -48,17 +50,35 @@ sub _worker_read_handle {
_croak('MCE::_worker_read_handle: (user_func) is not specified')
unless (defined $self->{user_func});
+ my $_DAT_LOCK = $self->{_dat_lock};
my $_QUE_R_SOCK = $self->{_que_r_sock};
my $_QUE_W_SOCK = $self->{_que_w_sock};
+ my $_lock_chn = $self->{_lock_chn};
my $_chunk_size = $self->{chunk_size};
my $_use_slurpio = $self->{use_slurpio};
my $_parallel_io = $self->{parallel_io};
my $_RS = $self->{RS} || $/;
- my $_RS_FLG = (!$_RS || $_RS ne $LF);
my $_wuf = $self->{_wuf};
my ($_data_size, $_next, $_chunk_id, $_offset_pos, $_IN_FILE, $_tmp_cs);
- my ($_chop_len, $_chop_str, $_p);
+ my ($_dat_ex, $_dat_un, $_pid, $_chop_len, $_chop_str, $_p);
+
+ if ($_lock_chn) {
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ if ($self->{_data_channels} > 6) {
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )};
+ }
+ $_dat_ex = sub {
+ sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
+ }
if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
$_chop_str = substr($_RS, 1);
@@ -91,11 +111,19 @@ sub _worker_read_handle {
$_ = '';
## Obtain the next chunk_id and offset position.
- sysread $_QUE_R_SOCK, $_next, $_que_read_size;
+ $_dat_ex->() if $_lock_chn;
+
+ 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'});
+
($_chunk_id, $_offset_pos) = unpack($_que_template, $_next);
if ($_offset_pos >= $_data_size) {
- syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos);
+ 1 until syswrite ( $_QUE_W_SOCK,
+ pack($_que_template, 0, $_offset_pos)
+ ) || ($! && !$!{'EINTR'});
+
+ $_dat_un->() if $_lock_chn;
+
close $_IN_FILE; undef $_IN_FILE;
return;
}
@@ -108,7 +136,7 @@ sub _worker_read_handle {
## Read data.
if ($_chunk_size <= MAX_RECS_SIZE) { # One or many records.
- local $/ = $_RS if ($_RS_FLG);
+ local $/ = $_RS if ($/ ne $_RS);
seek $_IN_FILE, $_offset_pos, 0;
if ($_chunk_size == 1) {
@@ -149,15 +177,21 @@ sub _worker_read_handle {
}
}
- syswrite $_QUE_W_SOCK,
- pack($_que_template, $_chunk_id, tell $_IN_FILE);
+ 1 until syswrite ( $_QUE_W_SOCK,
+ pack($_que_template, $_chunk_id, tell $_IN_FILE)
+ ) || ($! && !$!{'EINTR'});
+
+ $_dat_un->() if $_lock_chn;
}
else { # Large chunk.
- local $/ = $_RS if ($_RS_FLG);
+ local $/ = $_RS if ($/ ne $_RS);
+
+ if ($_parallel_io && $_RS eq $LF) {
+ 1 until syswrite ( $_QUE_W_SOCK,
+ pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size)
+ ) || ($! && !$!{'EINTR'});
- if ($_parallel_io && ! $_RS_FLG) {
- syswrite $_QUE_W_SOCK,
- pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size);
+ $_dat_un->() if $_lock_chn;
$_tmp_cs = $_chunk_size;
seek $_IN_FILE, $_offset_pos, 0;
@@ -190,8 +224,11 @@ sub _worker_read_handle {
$_ .= <$_IN_FILE>;
- syswrite $_QUE_W_SOCK,
- pack($_que_template, $_chunk_id, tell $_IN_FILE);
+ 1 until syswrite ( $_QUE_W_SOCK,
+ pack($_que_template, $_chunk_id, tell $_IN_FILE)
+ ) || ($! && !$!{'EINTR'});
+
+ $_dat_un->() if $_lock_chn;
}
}
@@ -212,7 +249,7 @@ sub _worker_read_handle {
}
else {
if ($_chunk_size > MAX_RECS_SIZE) {
- local $/ = $_RS if ($_RS_FLG);
+ local $/ = $_RS if ($/ ne $_RS);
_sync_buffer_to_array(\$_, \@_recs, $_chop_str);
undef $_;
}
diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm
index 252914e..5f8d50e 100644
--- a/lib/MCE/Core/Input/Iterator.pm
+++ b/lib/MCE/Core/Input/Iterator.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator;
use strict;
use warnings;
-our $VERSION = '1.810';
+our $VERSION = '1.830';
## Items below are folded into MCE.
@@ -46,14 +46,22 @@ sub _worker_user_iterator {
my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
my $_lock_chn = $self->{_lock_chn};
my $_chunk_size = $self->{chunk_size};
- my $_I_FLG = (!$/ || $/ ne $LF);
my $_wuf = $self->{_wuf};
- my ($_dat_ex, $_dat_un);
+ my ($_dat_ex, $_dat_un, $_pid);
if ($_lock_chn) {
- $_dat_ex = sub { sysread ( $_DAT_LOCK->{_r_sock}, my $_b, 1 ) };
- $_dat_un = sub { syswrite ( $_DAT_LOCK->{_w_sock}, '0' ) };
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ $_dat_ex = sub {
+ sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
}
my ($_chunk_id, $_len, $_is_ref);
@@ -74,10 +82,11 @@ sub _worker_user_iterator {
## Obtain the next chunk of data.
{
- local $\ = undef if (defined $\); local $/ = $LF if ($_I_FLG);
+ local $\ = undef if (defined $\);
+ local $/ = $LF if ($/ ne $LF );
$_dat_ex->() if $_lock_chn;
- print {$_DAT_W_SOCK} OUTPUT_U_ITR . $LF . $_chn . $LF;
+ print {$_DAT_W_SOCK} OUTPUT_I_REF . $LF . $_chn . $LF;
chomp($_len = <$_DAU_W_SOCK>);
if ($_len < 0) {
diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm
index 831635e..f80bce7 100644
--- a/lib/MCE/Core/Input/Request.pm
+++ b/lib/MCE/Core/Input/Request.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Request;
use strict;
use warnings;
-our $VERSION = '1.810';
+our $VERSION = '1.830';
## Items below are folded into MCE.
@@ -49,26 +49,37 @@ sub _worker_request_chunk {
my $_chunk_size = $self->{chunk_size};
my $_use_slurpio = $self->{use_slurpio};
my $_RS = $self->{RS} || $/;
- my $_RS_FLG = (!$_RS || $_RS ne $LF);
- my $_I_FLG = (!$/ || $/ ne $LF);
my $_wuf = $self->{_wuf};
- my ($_dat_ex, $_dat_un);
+ my ($_dat_ex, $_dat_un, $_pid);
if ($_lock_chn) {
- $_dat_ex = sub { sysread ( $_DAT_LOCK->{_r_sock}, my $_b, 1 ) };
- $_dat_un = sub { syswrite ( $_DAT_LOCK->{_w_sock}, '0' ) };
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ $_dat_ex = sub {
+ sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
}
my ($_chunk_id, $_len, $_output_tag);
my ($_chop_len, $_chop_str, $_p);
if ($_proc_type == REQUEST_ARRAY) {
- $_output_tag = OUTPUT_A_ARY;
+ $_output_tag = OUTPUT_A_REF;
+ $_chop_len = 0;
+ }
+ elsif ($_proc_type == REQUEST_HASH) {
+ $_output_tag = OUTPUT_H_REF;
$_chop_len = 0;
}
else {
- $_output_tag = OUTPUT_S_GLB;
+ $_output_tag = OUTPUT_G_REF;
if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
$_chop_str = substr($_RS, 1);
$_chop_len = length $_chop_str;
@@ -94,7 +105,8 @@ sub _worker_request_chunk {
## Obtain the next chunk of data.
{
- local $\ = undef if (defined $\); local $/ = $LF if ($_I_FLG);
+ local $\ = undef if (defined $\);
+ local $/ = $LF if ($/ ne $LF );
$_dat_ex->() if $_lock_chn;
print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF;
@@ -129,6 +141,11 @@ sub _worker_request_chunk {
$_wuf->($self, $_chunk_ref, $_chunk_id);
}
}
+ elsif ($_proc_type == REQUEST_HASH) {
+ my $_chunk_ref = { @{ $self->{thaw}($_) } }; undef $_;
+ $_ = $_chunk_ref;
+ $_wuf->($self, $_chunk_ref, $_chunk_id);
+ }
else {
if ($_use_slurpio) {
if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
@@ -147,7 +164,7 @@ sub _worker_request_chunk {
else {
my @_recs;
{
- local $/ = $_RS if ($_RS_FLG);
+ local $/ = $_RS if ($/ ne $_RS);
_sync_buffer_to_array(\$_, \@_recs, $_chop_str);
undef $_;
}
diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm
index 92bc2e8..baea8bc 100644
--- a/lib/MCE/Core/Input/Sequence.pm
+++ b/lib/MCE/Core/Input/Sequence.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence;
use strict;
use warnings;
-our $VERSION = '1.810';
+our $VERSION = '1.830';
## Items below are folded into MCE.
@@ -41,14 +41,33 @@ sub _worker_sequence_queue {
_croak('MCE::_worker_sequence_queue: (user_func) is not specified')
unless (defined $self->{user_func});
+ my $_DAT_LOCK = $self->{_dat_lock};
my $_QUE_R_SOCK = $self->{_que_r_sock};
my $_QUE_W_SOCK = $self->{_que_w_sock};
+ my $_lock_chn = $self->{_lock_chn};
my $_bounds_only = $self->{bounds_only} || 0;
my $_chunk_size = $self->{chunk_size};
my $_wuf = $self->{_wuf};
my ($_next, $_chunk_id, $_seq_n, $_begin, $_end, $_step, $_fmt);
- my ($_abort, $_offset);
+ my ($_dat_ex, $_dat_un, $_pid, $_abort, $_offset);
+
+ if ($_lock_chn) {
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ if ($self->{_data_channels} > 6) {
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )};
+ }
+ $_dat_ex = sub {
+ sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
+ }
if (ref $self->{sequence} eq 'ARRAY') {
($_begin, $_end, $_step, $_fmt) = @{ $self->{sequence} };
@@ -77,23 +96,32 @@ sub _worker_sequence_queue {
while (1) {
## Obtain the next chunk_id and sequence number.
- sysread $_QUE_R_SOCK, $_next, $_que_read_size;
+ $_dat_ex->() if $_lock_chn;
+
+ 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'});
+
($_chunk_id, $_offset) = unpack($_que_template, $_next);
if ($_offset >= $_abort) {
- syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset);
+ 1 until syswrite (
+ $_QUE_W_SOCK, pack($_que_template, 0, $_offset)
+ ) || ($! && !$!{'EINTR'});
+
+ $_dat_un->() if $_lock_chn;
return;
}
- syswrite $_QUE_W_SOCK,
- pack($_que_template, $_chunk_id + 1, $_offset + 1);
+ 1 until syswrite (
+ $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1)
+ ) || ($! && !$!{'EINTR'});
+ $_dat_un->() if $_lock_chn;
$_chunk_id++;
## Call user function.
if ($_chunk_size == 1) {
$_ = $_offset * $_step + $_begin;
- $_ = sprintf("%$_fmt", $_) if (defined $_fmt);
+ $_ = _sprintf("%$_fmt", $_) if (defined $_fmt);
$_ = [ $_, $_ ] if ($_bounds_only);
$_wuf->($self, $_, $_chunk_id);
}
@@ -110,6 +138,9 @@ sub _worker_sequence_queue {
if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) {
$_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
}
+ elsif ($_step == 1) {
+ $_tmp_e = $_end;
+ }
else {
for my $_i (1 .. $_chunk_size) {
last if ($_seq_n > $_end);
@@ -122,6 +153,9 @@ sub _worker_sequence_queue {
if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) {
$_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin;
}
+ elsif ($_step == -1) {
+ $_tmp_e = $_end;
+ }
else {
for my $_i (1 .. $_chunk_size) {
last if ($_seq_n < $_end);
@@ -132,7 +166,7 @@ sub _worker_sequence_queue {
}
@_n = (defined $_fmt)
- ? ( sprintf("%$_fmt",$_tmp_b), sprintf("%$_fmt",$_tmp_e) )
+ ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) )
: ( $_tmp_b, $_tmp_e );
}
@@ -153,7 +187,7 @@ sub _worker_sequence_queue {
last if ($_seq_n > $_end);
push @_n, (defined $_fmt)
- ? sprintf("%$_fmt", $_seq_n) : $_seq_n;
+ ? _sprintf("%$_fmt", $_seq_n) : $_seq_n;
$_seq_n = $_step * $_i + $_n_begin;
}
@@ -164,7 +198,7 @@ sub _worker_sequence_queue {
last if ($_seq_n < $_end);
push @_n, (defined $_fmt)
- ? sprintf("%$_fmt", $_seq_n) : $_seq_n;
+ ? _sprintf("%$_fmt", $_seq_n) : $_seq_n;
$_seq_n = $_step * $_i + $_n_begin;
}