diff options
author | Angel Abad <angelabad@gmail.com> | 2017-10-01 17:37:25 +0200 |
---|---|---|
committer | Angel Abad <angelabad@gmail.com> | 2017-10-01 17:37:25 +0200 |
commit | aa403938ff08fa6be0e35d7dd66c33be3396d647 (patch) | |
tree | 788785fa791e98a8a09bdd3f01b66febefda3150 /lib/MCE/Core/Input | |
parent | 80444252bf6887593c90843f61e601d0b86829d7 (diff) |
New upstream version 1.830
Diffstat (limited to 'lib/MCE/Core/Input')
-rw-r--r-- | lib/MCE/Core/Input/Generator.pm | 18 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Handle.pm | 71 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Iterator.pm | 23 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Request.pm | 37 | ||||
-rw-r--r-- | lib/MCE/Core/Input/Sequence.pm | 54 |
5 files changed, 153 insertions, 50 deletions
diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm index 55b162b..549637c 100644 --- a/lib/MCE/Core/Input/Generator.pm +++ b/lib/MCE/Core/Input/Generator.pm @@ -15,7 +15,7 @@ package MCE::Core::Input::Generator; use strict; use warnings; -our $VERSION = '1.810'; +our $VERSION = '1.830'; ## Items below are folded into MCE. @@ -73,7 +73,7 @@ sub _worker_sequence_generator { if ($_wid == 1) { $self->{_next_jmp} = sub { goto _WORKER_SEQ_GEN__LAST; }; - $_ = (defined $_fmt) ? sprintf("%$_fmt", $_next) : $_next; + $_ = (defined $_fmt) ? _sprintf("%$_fmt", $_next) : $_next; if ($_chunk_size > 1 || $_bounds_only) { $_ = ($_bounds_only) ? [ $_, $_ ] : [ $_ ]; @@ -92,7 +92,7 @@ sub _worker_sequence_generator { return if ( $_flag && $_next > $_end); return if (!$_flag && $_next < $_end); - $_ = (defined $_fmt) ? sprintf("%$_fmt", $_next) : $_next; + $_ = (defined $_fmt) ? _sprintf("%$_fmt", $_next) : $_next; $_ = [ $_, $_ ] if ($_bounds_only); $_wuf->($self, $_, $_chunk_id); @@ -119,6 +119,9 @@ sub _worker_sequence_generator { if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) { $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin; } + elsif ($_step == 1) { + $_tmp_e = $_end if ($_next <= $_end); + } else { for my $_i (1 .. $_chunk_size) { last if ($_next > $_end); @@ -131,6 +134,9 @@ sub _worker_sequence_generator { if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) { $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin; } + elsif ($_step == -1) { + $_tmp_e = $_end if ($_next >= $_end); + } else { for my $_i (1 .. $_chunk_size) { last if ($_next < $_end); @@ -143,7 +149,7 @@ sub _worker_sequence_generator { return unless (defined $_tmp_e); @_n = (defined $_fmt) - ? ( sprintf("%$_fmt",$_tmp_b), sprintf("%$_fmt",$_tmp_e) ) + ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) ) : ( $_tmp_b, $_tmp_e ); } @@ -161,7 +167,7 @@ sub _worker_sequence_generator { last if ($_next > $_end); push @_n, (defined $_fmt) - ? sprintf("%$_fmt", $_next) : $_next; + ? _sprintf("%$_fmt", $_next) : $_next; $_next = $_step * $_i + $_n_begin; } @@ -172,7 +178,7 @@ sub _worker_sequence_generator { last if ($_next < $_end); push @_n, (defined $_fmt) - ? sprintf("%$_fmt", $_next) : $_next; + ? _sprintf("%$_fmt", $_next) : $_next; $_next = $_step * $_i + $_n_begin; } diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm index 5ccec89..8d9dfed 100644 --- a/lib/MCE/Core/Input/Handle.pm +++ b/lib/MCE/Core/Input/Handle.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Handle; use strict; use warnings; -our $VERSION = '1.810'; +our $VERSION = '1.830'; ## Items below are folded into MCE. @@ -23,7 +23,6 @@ package # hide from rpm no warnings qw( threads recursion uninitialized ); -use Fcntl qw( SEEK_CUR ); use bytes; my $_que_read_size = $MCE::_que_read_size; @@ -36,7 +35,10 @@ my $_que_template = $MCE::_que_template; ############################################################################### sub _systell { - sysseek($_[0], 0, SEEK_CUR); + # To minimize memory consumption, SEEK_CUR equals 1 on most platforms. + # e.g. use Fcntl qw(SEEK_CUR); + + sysseek($_[0], 0, 1); } sub _worker_read_handle { @@ -48,17 +50,35 @@ sub _worker_read_handle { _croak('MCE::_worker_read_handle: (user_func) is not specified') unless (defined $self->{user_func}); + my $_DAT_LOCK = $self->{_dat_lock}; my $_QUE_R_SOCK = $self->{_que_r_sock}; my $_QUE_W_SOCK = $self->{_que_w_sock}; + my $_lock_chn = $self->{_lock_chn}; my $_chunk_size = $self->{chunk_size}; my $_use_slurpio = $self->{use_slurpio}; my $_parallel_io = $self->{parallel_io}; my $_RS = $self->{RS} || $/; - my $_RS_FLG = (!$_RS || $_RS ne $LF); my $_wuf = $self->{_wuf}; my ($_data_size, $_next, $_chunk_id, $_offset_pos, $_IN_FILE, $_tmp_cs); - my ($_chop_len, $_chop_str, $_p); + my ($_dat_ex, $_dat_un, $_pid, $_chop_len, $_chop_str, $_p); + + if ($_lock_chn) { + $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; + + # inlined for performance + if ($self->{_data_channels} > 6) { + $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; + } + $_dat_ex = sub { + sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 + unless $_DAT_LOCK->{ $_pid }; + }; + $_dat_un = sub { + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + if $_DAT_LOCK->{ $_pid }; + }; + } if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") { $_chop_str = substr($_RS, 1); @@ -91,11 +111,19 @@ sub _worker_read_handle { $_ = ''; ## Obtain the next chunk_id and offset position. - sysread $_QUE_R_SOCK, $_next, $_que_read_size; + $_dat_ex->() if $_lock_chn; + + 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'}); + ($_chunk_id, $_offset_pos) = unpack($_que_template, $_next); if ($_offset_pos >= $_data_size) { - syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos); + 1 until syswrite ( $_QUE_W_SOCK, + pack($_que_template, 0, $_offset_pos) + ) || ($! && !$!{'EINTR'}); + + $_dat_un->() if $_lock_chn; + close $_IN_FILE; undef $_IN_FILE; return; } @@ -108,7 +136,7 @@ sub _worker_read_handle { ## Read data. if ($_chunk_size <= MAX_RECS_SIZE) { # One or many records. - local $/ = $_RS if ($_RS_FLG); + local $/ = $_RS if ($/ ne $_RS); seek $_IN_FILE, $_offset_pos, 0; if ($_chunk_size == 1) { @@ -149,15 +177,21 @@ sub _worker_read_handle { } } - syswrite $_QUE_W_SOCK, - pack($_que_template, $_chunk_id, tell $_IN_FILE); + 1 until syswrite ( $_QUE_W_SOCK, + pack($_que_template, $_chunk_id, tell $_IN_FILE) + ) || ($! && !$!{'EINTR'}); + + $_dat_un->() if $_lock_chn; } else { # Large chunk. - local $/ = $_RS if ($_RS_FLG); + local $/ = $_RS if ($/ ne $_RS); + + if ($_parallel_io && $_RS eq $LF) { + 1 until syswrite ( $_QUE_W_SOCK, + pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size) + ) || ($! && !$!{'EINTR'}); - if ($_parallel_io && ! $_RS_FLG) { - syswrite $_QUE_W_SOCK, - pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size); + $_dat_un->() if $_lock_chn; $_tmp_cs = $_chunk_size; seek $_IN_FILE, $_offset_pos, 0; @@ -190,8 +224,11 @@ sub _worker_read_handle { $_ .= <$_IN_FILE>; - syswrite $_QUE_W_SOCK, - pack($_que_template, $_chunk_id, tell $_IN_FILE); + 1 until syswrite ( $_QUE_W_SOCK, + pack($_que_template, $_chunk_id, tell $_IN_FILE) + ) || ($! && !$!{'EINTR'}); + + $_dat_un->() if $_lock_chn; } } @@ -212,7 +249,7 @@ sub _worker_read_handle { } else { if ($_chunk_size > MAX_RECS_SIZE) { - local $/ = $_RS if ($_RS_FLG); + local $/ = $_RS if ($/ ne $_RS); _sync_buffer_to_array(\$_, \@_recs, $_chop_str); undef $_; } diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm index 252914e..5f8d50e 100644 --- a/lib/MCE/Core/Input/Iterator.pm +++ b/lib/MCE/Core/Input/Iterator.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator; use strict; use warnings; -our $VERSION = '1.810'; +our $VERSION = '1.830'; ## Items below are folded into MCE. @@ -46,14 +46,22 @@ sub _worker_user_iterator { my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; my $_lock_chn = $self->{_lock_chn}; my $_chunk_size = $self->{chunk_size}; - my $_I_FLG = (!$/ || $/ ne $LF); my $_wuf = $self->{_wuf}; - my ($_dat_ex, $_dat_un); + my ($_dat_ex, $_dat_un, $_pid); if ($_lock_chn) { - $_dat_ex = sub { sysread ( $_DAT_LOCK->{_r_sock}, my $_b, 1 ) }; - $_dat_un = sub { syswrite ( $_DAT_LOCK->{_w_sock}, '0' ) }; + $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; + + # inlined for performance + $_dat_ex = sub { + sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 + unless $_DAT_LOCK->{ $_pid }; + }; + $_dat_un = sub { + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + if $_DAT_LOCK->{ $_pid }; + }; } my ($_chunk_id, $_len, $_is_ref); @@ -74,10 +82,11 @@ sub _worker_user_iterator { ## Obtain the next chunk of data. { - local $\ = undef if (defined $\); local $/ = $LF if ($_I_FLG); + local $\ = undef if (defined $\); + local $/ = $LF if ($/ ne $LF ); $_dat_ex->() if $_lock_chn; - print {$_DAT_W_SOCK} OUTPUT_U_ITR . $LF . $_chn . $LF; + print {$_DAT_W_SOCK} OUTPUT_I_REF . $LF . $_chn . $LF; chomp($_len = <$_DAU_W_SOCK>); if ($_len < 0) { diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm index 831635e..f80bce7 100644 --- a/lib/MCE/Core/Input/Request.pm +++ b/lib/MCE/Core/Input/Request.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Request; use strict; use warnings; -our $VERSION = '1.810'; +our $VERSION = '1.830'; ## Items below are folded into MCE. @@ -49,26 +49,37 @@ sub _worker_request_chunk { my $_chunk_size = $self->{chunk_size}; my $_use_slurpio = $self->{use_slurpio}; my $_RS = $self->{RS} || $/; - my $_RS_FLG = (!$_RS || $_RS ne $LF); - my $_I_FLG = (!$/ || $/ ne $LF); my $_wuf = $self->{_wuf}; - my ($_dat_ex, $_dat_un); + my ($_dat_ex, $_dat_un, $_pid); if ($_lock_chn) { - $_dat_ex = sub { sysread ( $_DAT_LOCK->{_r_sock}, my $_b, 1 ) }; - $_dat_un = sub { syswrite ( $_DAT_LOCK->{_w_sock}, '0' ) }; + $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; + + # inlined for performance + $_dat_ex = sub { + sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 + unless $_DAT_LOCK->{ $_pid }; + }; + $_dat_un = sub { + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + if $_DAT_LOCK->{ $_pid }; + }; } my ($_chunk_id, $_len, $_output_tag); my ($_chop_len, $_chop_str, $_p); if ($_proc_type == REQUEST_ARRAY) { - $_output_tag = OUTPUT_A_ARY; + $_output_tag = OUTPUT_A_REF; + $_chop_len = 0; + } + elsif ($_proc_type == REQUEST_HASH) { + $_output_tag = OUTPUT_H_REF; $_chop_len = 0; } else { - $_output_tag = OUTPUT_S_GLB; + $_output_tag = OUTPUT_G_REF; if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") { $_chop_str = substr($_RS, 1); $_chop_len = length $_chop_str; @@ -94,7 +105,8 @@ sub _worker_request_chunk { ## Obtain the next chunk of data. { - local $\ = undef if (defined $\); local $/ = $LF if ($_I_FLG); + local $\ = undef if (defined $\); + local $/ = $LF if ($/ ne $LF ); $_dat_ex->() if $_lock_chn; print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF; @@ -129,6 +141,11 @@ sub _worker_request_chunk { $_wuf->($self, $_chunk_ref, $_chunk_id); } } + elsif ($_proc_type == REQUEST_HASH) { + my $_chunk_ref = { @{ $self->{thaw}($_) } }; undef $_; + $_ = $_chunk_ref; + $_wuf->($self, $_chunk_ref, $_chunk_id); + } else { if ($_use_slurpio) { if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) { @@ -147,7 +164,7 @@ sub _worker_request_chunk { else { my @_recs; { - local $/ = $_RS if ($_RS_FLG); + local $/ = $_RS if ($/ ne $_RS); _sync_buffer_to_array(\$_, \@_recs, $_chop_str); undef $_; } diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm index 92bc2e8..baea8bc 100644 --- a/lib/MCE/Core/Input/Sequence.pm +++ b/lib/MCE/Core/Input/Sequence.pm @@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence; use strict; use warnings; -our $VERSION = '1.810'; +our $VERSION = '1.830'; ## Items below are folded into MCE. @@ -41,14 +41,33 @@ sub _worker_sequence_queue { _croak('MCE::_worker_sequence_queue: (user_func) is not specified') unless (defined $self->{user_func}); + my $_DAT_LOCK = $self->{_dat_lock}; my $_QUE_R_SOCK = $self->{_que_r_sock}; my $_QUE_W_SOCK = $self->{_que_w_sock}; + my $_lock_chn = $self->{_lock_chn}; my $_bounds_only = $self->{bounds_only} || 0; my $_chunk_size = $self->{chunk_size}; my $_wuf = $self->{_wuf}; my ($_next, $_chunk_id, $_seq_n, $_begin, $_end, $_step, $_fmt); - my ($_abort, $_offset); + my ($_dat_ex, $_dat_un, $_pid, $_abort, $_offset); + + if ($_lock_chn) { + $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$; + + # inlined for performance + if ($self->{_data_channels} > 6) { + $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; + } + $_dat_ex = sub { + sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 + unless $_DAT_LOCK->{ $_pid }; + }; + $_dat_un = sub { + syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 + if $_DAT_LOCK->{ $_pid }; + }; + } if (ref $self->{sequence} eq 'ARRAY') { ($_begin, $_end, $_step, $_fmt) = @{ $self->{sequence} }; @@ -77,23 +96,32 @@ sub _worker_sequence_queue { while (1) { ## Obtain the next chunk_id and sequence number. - sysread $_QUE_R_SOCK, $_next, $_que_read_size; + $_dat_ex->() if $_lock_chn; + + 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'}); + ($_chunk_id, $_offset) = unpack($_que_template, $_next); if ($_offset >= $_abort) { - syswrite $_QUE_W_SOCK, pack($_que_template, 0, $_offset); + 1 until syswrite ( + $_QUE_W_SOCK, pack($_que_template, 0, $_offset) + ) || ($! && !$!{'EINTR'}); + + $_dat_un->() if $_lock_chn; return; } - syswrite $_QUE_W_SOCK, - pack($_que_template, $_chunk_id + 1, $_offset + 1); + 1 until syswrite ( + $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1) + ) || ($! && !$!{'EINTR'}); + $_dat_un->() if $_lock_chn; $_chunk_id++; ## Call user function. if ($_chunk_size == 1) { $_ = $_offset * $_step + $_begin; - $_ = sprintf("%$_fmt", $_) if (defined $_fmt); + $_ = _sprintf("%$_fmt", $_) if (defined $_fmt); $_ = [ $_, $_ ] if ($_bounds_only); $_wuf->($self, $_, $_chunk_id); } @@ -110,6 +138,9 @@ sub _worker_sequence_queue { if ($_step * ($_chunk_size - 1) + $_n_begin <= $_end) { $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin; } + elsif ($_step == 1) { + $_tmp_e = $_end; + } else { for my $_i (1 .. $_chunk_size) { last if ($_seq_n > $_end); @@ -122,6 +153,9 @@ sub _worker_sequence_queue { if ($_step * ($_chunk_size - 1) + $_n_begin >= $_end) { $_tmp_e = $_step * ($_chunk_size - 1) + $_n_begin; } + elsif ($_step == -1) { + $_tmp_e = $_end; + } else { for my $_i (1 .. $_chunk_size) { last if ($_seq_n < $_end); @@ -132,7 +166,7 @@ sub _worker_sequence_queue { } @_n = (defined $_fmt) - ? ( sprintf("%$_fmt",$_tmp_b), sprintf("%$_fmt",$_tmp_e) ) + ? ( _sprintf("%$_fmt",$_tmp_b), _sprintf("%$_fmt",$_tmp_e) ) : ( $_tmp_b, $_tmp_e ); } @@ -153,7 +187,7 @@ sub _worker_sequence_queue { last if ($_seq_n > $_end); push @_n, (defined $_fmt) - ? sprintf("%$_fmt", $_seq_n) : $_seq_n; + ? _sprintf("%$_fmt", $_seq_n) : $_seq_n; $_seq_n = $_step * $_i + $_n_begin; } @@ -164,7 +198,7 @@ sub _worker_sequence_queue { last if ($_seq_n < $_end); push @_n, (defined $_fmt) - ? sprintf("%$_fmt", $_seq_n) : $_seq_n; + ? _sprintf("%$_fmt", $_seq_n) : $_seq_n; $_seq_n = $_step * $_i + $_n_begin; } |