summaryrefslogtreecommitdiff
path: root/lib/MCE/Core/Input
diff options
context:
space:
mode:
authorgregor herrmann <gregoa@debian.org>2022-12-17 17:56:58 +0100
committergregor herrmann <gregoa@debian.org>2022-12-17 17:56:58 +0100
commit77ee6d443c4d1831e776a45040752642dcc2995b (patch)
tree6d5c6ccffe331c328883a203591b8db1c4a824b3 /lib/MCE/Core/Input
parentd0d9d54f0466b8c620498dc0646518699a62d2d0 (diff)
New upstream version 1.882
Diffstat (limited to 'lib/MCE/Core/Input')
-rw-r--r--lib/MCE/Core/Input/Generator.pm4
-rw-r--r--lib/MCE/Core/Input/Handle.pm50
-rw-r--r--lib/MCE/Core/Input/Iterator.pm4
-rw-r--r--lib/MCE/Core/Input/Request.pm4
-rw-r--r--lib/MCE/Core/Input/Sequence.pm46
5 files changed, 48 insertions, 60 deletions
diff --git a/lib/MCE/Core/Input/Generator.pm b/lib/MCE/Core/Input/Generator.pm
index 718aea8..c089a70 100644
--- a/lib/MCE/Core/Input/Generator.pm
+++ b/lib/MCE/Core/Input/Generator.pm
@@ -15,7 +15,7 @@ package MCE::Core::Input::Generator;
use strict;
use warnings;
-our $VERSION = '1.881';
+our $VERSION = '1.882';
## Items below are folded into MCE.
@@ -220,7 +220,7 @@ MCE::Core::Input::Generator - Sequence of numbers (for task_id > 0)
=head1 VERSION
-This document describes MCE::Core::Input::Generator version 1.881
+This document describes MCE::Core::Input::Generator version 1.882
=head1 DESCRIPTION
diff --git a/lib/MCE/Core/Input/Handle.pm b/lib/MCE/Core/Input/Handle.pm
index 41e9a2d..f144d9f 100644
--- a/lib/MCE/Core/Input/Handle.pm
+++ b/lib/MCE/Core/Input/Handle.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Handle;
use strict;
use warnings;
-our $VERSION = '1.881';
+our $VERSION = '1.882';
## Items below are folded into MCE.
@@ -49,10 +49,8 @@ sub _worker_read_handle {
unless (defined $self->{user_func});
my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
- my $_DAT_LOCK = $self->{_dat_lock};
my $_QUE_R_SOCK = $self->{_que_r_sock};
my $_QUE_W_SOCK = $self->{_que_w_sock};
- my $_lock_chn = $self->{_lock_chn};
my $_chunk_size = $self->{chunk_size};
my $_use_slurpio = $self->{use_slurpio};
my $_parallel_io = $self->{parallel_io};
@@ -60,25 +58,21 @@ sub _worker_read_handle {
my $_wuf = $self->{_wuf};
my ($_data_size, $_next, $_chunk_id, $_offset_pos, $_IN_FILE, $_tmp_cs);
- my ($_dat_ex, $_dat_un, $_pid, $_chop_len, $_chop_str, $_p);
-
- if ($_lock_chn) {
- $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
-
- # inlined for performance
- if ($self->{_data_channels} > 5) {
- $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 5 + 1 )};
- }
- $_dat_ex = sub {
- MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock}) if $_is_MSWin32;
- MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
- unless $_DAT_LOCK->{ $_pid };
- };
- $_dat_un = sub {
- syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
- if $_DAT_LOCK->{ $_pid };
- };
- }
+ my ($_DAT_LOCK, $_dat_ex, $_dat_un, $_pid, $_chop_len, $_chop_str, $_p);
+
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 2 + 10 )};
+ $_dat_ex = sub {
+ MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock}) if $_is_MSWin32;
+ MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
$_chop_str = substr($_RS, 1);
@@ -111,7 +105,7 @@ sub _worker_read_handle {
$_ = '';
## Obtain the next chunk_id and offset position.
- $_dat_ex->() if $_lock_chn;
+ $_dat_ex->();
MCE::Util::_sock_ready($_QUE_R_SOCK) if $_is_MSWin32;
MCE::Util::_sysread($_QUE_R_SOCK, $_next, $_que_read_size);
@@ -119,7 +113,7 @@ sub _worker_read_handle {
if ($_offset_pos >= $_data_size) {
syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos));
- $_dat_un->() if $_lock_chn;
+ $_dat_un->();
close $_IN_FILE; undef $_IN_FILE;
return;
}
@@ -176,7 +170,7 @@ sub _worker_read_handle {
syswrite(
$_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE)
);
- $_dat_un->() if $_lock_chn;
+ $_dat_un->();
}
else { # Large chunk.
local $/ = $_RS if ($/ ne $_RS);
@@ -186,7 +180,7 @@ sub _worker_read_handle {
$_QUE_W_SOCK,
pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size)
);
- $_dat_un->() if $_lock_chn;
+ $_dat_un->();
$_tmp_cs = $_chunk_size;
seek $_IN_FILE, $_offset_pos, 0;
@@ -222,7 +216,7 @@ sub _worker_read_handle {
syswrite(
$_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE)
);
- $_dat_un->() if $_lock_chn;
+ $_dat_un->();
}
}
@@ -283,7 +277,7 @@ MCE::Core::Input::Handle - File path and Scalar reference input reader
=head1 VERSION
-This document describes MCE::Core::Input::Handle version 1.881
+This document describes MCE::Core::Input::Handle version 1.882
=head1 DESCRIPTION
diff --git a/lib/MCE/Core/Input/Iterator.pm b/lib/MCE/Core/Input/Iterator.pm
index 5ff2047..87c8531 100644
--- a/lib/MCE/Core/Input/Iterator.pm
+++ b/lib/MCE/Core/Input/Iterator.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Iterator;
use strict;
use warnings;
-our $VERSION = '1.881';
+our $VERSION = '1.882';
## Items below are folded into MCE.
@@ -128,7 +128,7 @@ MCE::Core::Input::Iterator - Iterator reader
=head1 VERSION
-This document describes MCE::Core::Input::Iterator version 1.881
+This document describes MCE::Core::Input::Iterator version 1.882
=head1 DESCRIPTION
diff --git a/lib/MCE/Core/Input/Request.pm b/lib/MCE/Core/Input/Request.pm
index 451ba2b..da96d15 100644
--- a/lib/MCE/Core/Input/Request.pm
+++ b/lib/MCE/Core/Input/Request.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Request;
use strict;
use warnings;
-our $VERSION = '1.881';
+our $VERSION = '1.882';
## Items below are folded into MCE.
@@ -199,7 +199,7 @@ MCE::Core::Input::Request - Array reference and Glob reference input reader
=head1 VERSION
-This document describes MCE::Core::Input::Request version 1.881
+This document describes MCE::Core::Input::Request version 1.882
=head1 DESCRIPTION
diff --git a/lib/MCE/Core/Input/Sequence.pm b/lib/MCE/Core/Input/Sequence.pm
index 5125c1f..1a49c74 100644
--- a/lib/MCE/Core/Input/Sequence.pm
+++ b/lib/MCE/Core/Input/Sequence.pm
@@ -14,7 +14,7 @@ package MCE::Core::Input::Sequence;
use strict;
use warnings;
-our $VERSION = '1.881';
+our $VERSION = '1.882';
## Items below are folded into MCE.
@@ -42,34 +42,28 @@ sub _worker_sequence_queue {
unless (defined $self->{user_func});
my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
- my $_DAT_LOCK = $self->{_dat_lock};
my $_QUE_R_SOCK = $self->{_que_r_sock};
my $_QUE_W_SOCK = $self->{_que_w_sock};
- my $_lock_chn = $self->{_lock_chn};
my $_bounds_only = $self->{bounds_only} || 0;
my $_chunk_size = $self->{chunk_size};
my $_wuf = $self->{_wuf};
my ($_next, $_chunk_id, $_seq_n, $_begin, $_end, $_step, $_fmt);
- my ($_dat_ex, $_dat_un, $_pid, $_abort, $_offset);
-
- if ($_lock_chn) {
- $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
-
- # inlined for performance
- if ($self->{_data_channels} > 5) {
- $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 5 + 1 )};
- }
- $_dat_ex = sub {
- MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock}) if $_is_MSWin32;
- MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
- unless $_DAT_LOCK->{ $_pid };
- };
- $_dat_un = sub {
- syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
- if $_DAT_LOCK->{ $_pid };
- };
- }
+ my ($_DAT_LOCK, $_dat_ex, $_dat_un, $_pid, $_abort, $_offset);
+
+ $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
+
+ # inlined for performance
+ $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 2 + 10 )};
+ $_dat_ex = sub {
+ MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock}) if $_is_MSWin32;
+ MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
+ unless $_DAT_LOCK->{ $_pid };
+ };
+ $_dat_un = sub {
+ syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
+ if $_DAT_LOCK->{ $_pid };
+ };
if (ref $self->{sequence} eq 'ARRAY') {
($_begin, $_end, $_step, $_fmt) = @{ $self->{sequence} };
@@ -98,7 +92,7 @@ sub _worker_sequence_queue {
while (1) {
## Obtain the next chunk_id and sequence number.
- $_dat_ex->() if $_lock_chn;
+ $_dat_ex->();
MCE::Util::_sock_ready($_QUE_R_SOCK) if $_is_MSWin32;
MCE::Util::_sysread($_QUE_R_SOCK, $_next, $_que_read_size);
@@ -106,7 +100,7 @@ sub _worker_sequence_queue {
if ($_offset >= $_abort) {
syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset));
- $_dat_un->() if $_lock_chn;
+ $_dat_un->();
return;
}
@@ -114,7 +108,7 @@ sub _worker_sequence_queue {
$_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1)
);
- $_dat_un->() if $_lock_chn;
+ $_dat_un->();
$_chunk_id++;
## Call user function.
@@ -234,7 +228,7 @@ MCE::Core::Input::Sequence - Sequence of numbers (for task_id == 0)
=head1 VERSION
-This document describes MCE::Core::Input::Sequence version 1.881
+This document describes MCE::Core::Input::Sequence version 1.882
=head1 DESCRIPTION