diff options
author | intrigeri <intrigeri@boum.org> | 2019-07-25 02:23:34 +0000 |
---|---|---|
committer | intrigeri <intrigeri@boum.org> | 2019-07-25 02:23:34 +0000 |
commit | a0567bc8c39a9f006202a716ca0da1e5b12771a5 (patch) | |
tree | 086bfbe53d72fe8722339cd762a8670ed4897419 /lib/MCE/Channel | |
parent | ced942ca55cb90ddce354f56c6696bb34ba8c2ff (diff) |
New upstream version 1.843
Diffstat (limited to 'lib/MCE/Channel')
-rw-r--r-- | lib/MCE/Channel/Mutex.pm | 356 | ||||
-rw-r--r-- | lib/MCE/Channel/Simple.pm | 332 | ||||
-rw-r--r-- | lib/MCE/Channel/Threads.pm | 361 |
3 files changed, 1049 insertions, 0 deletions
diff --git a/lib/MCE/Channel/Mutex.pm b/lib/MCE/Channel/Mutex.pm new file mode 100644 index 0000000..27696ef --- /dev/null +++ b/lib/MCE/Channel/Mutex.pm @@ -0,0 +1,356 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Channel for producer(s) and many consumers supporting processes and threads. +## +############################################################################### + +package MCE::Channel::Mutex; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +use base 'MCE::Channel'; +use MCE::Mutex (); +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $freeze = MCE::Channel::_get_freeze(); +my $thaw = MCE::Channel::_get_thaw(); + +sub new { + my ( $class, %obj ) = ( @_, impl => 'Mutex' ); + + $obj{init_pid} = MCE::Channel::_pid(); + MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' ); + + # locking for the consumer side of the channel + $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' ); + + # optionally, support many-producers writing and reading + $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp}; + + bless \%obj, $class; + + if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) { + MCE::Mutex::Channel::_save_for_global_destruction($obj{c_mutex}); + MCE::Mutex::Channel::_save_for_global_destruction($obj{p_mutex}) + if $obj{mp}; + } + + return \%obj; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like methods. +## +############################################################################### + +sub end { + my ( $self ) = @_; + return if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', -1); + + $self->{ended} = 1; +} + +sub enqueue { + my $self = shift; + return MCE::Channel::_ended('enqueue') if $self->{ended}; + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock2 if $p_mutex; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + + while ( @_ ) { + my $data; + if ( ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ shift ]), $data .= '1'; + } else { + $data = shift, $data .= '0'; + } + print { $self->{p_sock} } pack('i', length $data), $data; + } + + $p_mutex->unlock2 if $p_mutex; + + return 1; +} + +sub dequeue { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + if ( $count == 1 ) { + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end, $c_mutex->unlock; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + $c_mutex->unlock; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; + } + else { + my ( $plen, @ret ); + + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + while ( $count-- ) { + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + $c_mutex->unlock; + + wantarray ? @ret : $ret[-1]; + } +} + +sub dequeue_nb { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + my ( $plen, @ret ); + ( my $c_mutex = $self->{c_mutex} )->lock; + + while ( $count-- ) { + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + $c_mutex->unlock; + + wantarray ? @ret : $ret[-1]; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; producer to consumer. +## +############################################################################### + +sub send { + my $self = shift; + return MCE::Channel::_ended('send') if $self->{ended}; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock2 if $p_mutex; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', length $data), $data; + + $p_mutex->unlock2 if $p_mutex; + + return 1; +} + +sub recv { + my ( $self ) = @_; + + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end, $c_mutex->unlock; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + $c_mutex->unlock; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv_nb { + my ( $self ) = @_; + + ( my $c_mutex = $self->{c_mutex} )->lock; + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + $c_mutex->unlock; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + $c_mutex->unlock; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; consumer to producer. +## +############################################################################### + +sub send2 { + my $self = shift; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + ( my $c_mutex = $self->{c_mutex} )->lock2; + MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32; + print { $self->{c_sock} } pack('i', length $data), $data; + $c_mutex->unlock2; + + return 1; +} + +sub recv2 { + my ( $self ) = @_; + my ( $plen, $data ); + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock if $p_mutex; + + MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32; + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + $p_mutex->unlock if $p_mutex; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv2_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + my $p_mutex = $self->{p_mutex}; + $p_mutex->lock if $p_mutex; + + MCE::Util::_nonblocking( $self->{p_sock}, 1 ); + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{p_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len ) { + $p_mutex->unlock if $p_mutex; + return wantarray ? () : undef; + } + + ( $p_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + $p_mutex->unlock if $p_mutex; + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel::Mutex - Channel for producer(s) and many consumers + +=head1 VERSION + +This document describes MCE::Channel::Mutex version 1.843 + +=head1 DESCRIPTION + +A channel class providing queue-like and two-way communication +for processes and threads. Locking is handled using MCE::Mutex. + + use MCE::Channel; + + # The default is tuned for one producer and many consumers. + my $chnl_a = MCE::Channel->new( impl => 'Mutex' ); + + # Specify the 'mp' option for safe use by two or more producers + # sending or recieving on the left side of the channel. + # E.g. C<->enqueue/->send> or C<->recv2/->recv2_nb> + + my $chnl_b = MCE::Channel->new( impl => 'Mutex', mp => 1 ); + +The API is described in L<MCE::Channel>. + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Channel/Simple.pm b/lib/MCE/Channel/Simple.pm new file mode 100644 index 0000000..7593a36 --- /dev/null +++ b/lib/MCE/Channel/Simple.pm @@ -0,0 +1,332 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Channel tuned for one producer and one consumer involving no locking. +## +############################################################################### + +package MCE::Channel::Simple; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +use base 'MCE::Channel'; +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $freeze = MCE::Channel::_get_freeze(); +my $thaw = MCE::Channel::_get_thaw(); + +sub new { + my ( $class, %obj ) = ( @_, impl => 'Simple' ); + + $obj{init_pid} = MCE::Channel::_pid(); + MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' ); + + return bless \%obj, $class; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like methods. +## +############################################################################### + +sub end { + my ( $self ) = @_; + return if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', -1); + + $self->{ended} = 1; +} + +sub enqueue { + my $self = shift; + return MCE::Channel::_ended('enqueue') if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + + while ( @_ ) { + my $data; + if ( ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ shift ]), $data .= '1'; + } else { + $data = shift, $data .= '0'; + } + print { $self->{p_sock} } pack('i', length $data) . $data; + } + + return 1; +} + +sub dequeue { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + if ( $count == 1 ) { + my ( $plen, $data ); + + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; + } + else { + my ( $plen, @ret ); + + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + while ( $count-- ) { + my $data; + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + last; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + wantarray ? @ret : $ret[-1]; + } +} + +sub dequeue_nb { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + my ( $plen, @ret ); + + while ( $count-- ) { + my $data; + + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + last; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + + wantarray ? @ret : $ret[-1]; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; producer(s) to consumers. +## +############################################################################### + +sub send { + my $self = shift; + return MCE::Channel::_ended('send') if $self->{ended}; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', length $data) . $data; + + return 1; +} + +sub recv { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + + $is_MSWin32 + ? sysread( $self->{c_sock}, $plen, 4 ) + : read( $self->{c_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + return wantarray ? () : undef; + } + + $is_MSWin32 + ? MCE::Channel::_read( $self->{c_sock}, $data, $len ) + : read( $self->{c_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; consumers to producer(s). +## +############################################################################### + +sub send2 { + my $self = shift; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32; + print { $self->{c_sock} } pack('i', length $data) . $data; + + return 1; +} + +sub recv2 { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32; + + $is_MSWin32 + ? sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + + $is_MSWin32 + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv2_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + MCE::Util::_nonblocking( $self->{p_sock}, 1 ); + + $is_MSWin32 + ? sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{p_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + return wantarray ? () : undef unless $len; + + $is_MSWin32 + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel::Simple - Channel tuned for one producer and one consumer + +=head1 VERSION + +This document describes MCE::Channel::Simple version 1.843 + +=head1 DESCRIPTION + +A channel class providing queue-like and two-way communication +for one process or thread on either end; no locking needed. + + use MCE::Channel; + + my $chnl = MCE::Channel->new( impl => 'Simple' ); + +The API is described in L<MCE::Channel>. + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + diff --git a/lib/MCE/Channel/Threads.pm b/lib/MCE/Channel/Threads.pm new file mode 100644 index 0000000..e41b606 --- /dev/null +++ b/lib/MCE/Channel/Threads.pm @@ -0,0 +1,361 @@ +############################################################################### +## ---------------------------------------------------------------------------- +## Channel for producer(s) and many consumers supporting threads only. +## +############################################################################### + +package MCE::Channel::Threads; + +use strict; +use warnings; + +no warnings qw( uninitialized once ); + +our $VERSION = '1.843'; + +use threads; +use threads::shared; + +use base 'MCE::Channel'; +use bytes; + +my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; +my $freeze = MCE::Channel::_get_freeze(); +my $thaw = MCE::Channel::_get_thaw(); + +sub new { + my ( $class, %obj ) = ( @_, impl => 'Threads' ); + + $obj{init_pid} = MCE::Channel::_pid(); + MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' ); + + # locking for the consumer side of the channel + $obj{cr_mutex} = threads::shared::share( my $cr_mutex ); + $obj{cw_mutex} = threads::shared::share( my $cw_mutex ); + + # optionally, support many-producers writing and reading + $obj{pr_mutex} = threads::shared::share( my $pr_mutex ) if $obj{mp}; + $obj{pw_mutex} = threads::shared::share( my $pw_mutex ) if $obj{mp}; + + return bless \%obj, $class; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Queue-like methods. +## +############################################################################### + +sub end { + my ( $self ) = @_; + return if $self->{ended}; + + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', -1); + + $self->{ended} = 1; +} + +sub enqueue { + my $self = shift; + return MCE::Channel::_ended('enqueue') if $self->{ended}; + + { + CORE::lock $self->{pw_mutex} if $self->{pw_mutex}; + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + + while ( @_ ) { + my $data; + if ( ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ shift ]), $data .= '1'; + } else { + $data = shift, $data .= '0'; + } + print { $self->{p_sock} } pack('i', length $data), $data; + } + } + + return 1; +} + +sub dequeue { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + if ( $count == 1 ) { + my ( $plen, $data ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; + } + else { + my ( $plen, @ret ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + + while ( $count-- ) { + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + } + + wantarray ? @ret : $ret[-1]; + } +} + +sub dequeue_nb { + my ( $self, $count ) = @_; + $count = 1 if ( !$count || $count < 1 ); + + my ( $plen, @ret ); + + { + CORE::lock $self->{cr_mutex}; + + while ( $count-- ) { + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + last; + } + + MCE::Channel::_read( $self->{c_sock}, my($data), $len ); + push @ret, chop($data) ? @{ $thaw->($data) } : $data; + } + } + + wantarray ? @ret : $ret[-1]; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; producer(s) to consumers. +## +############################################################################### + +sub send { + my $self = shift; + return MCE::Channel::_ended('send') if $self->{ended}; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + { + CORE::lock $self->{pw_mutex} if $self->{pw_mutex}; + MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32; + print { $self->{p_sock} } pack('i', length $data), $data; + } + + return 1; +} + +sub recv { + my ( $self ) = @_; + my ( $plen, $data ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32; + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + if ( $len < 0 ) { + $self->end; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + { + CORE::lock $self->{cr_mutex}; + MCE::Util::_nonblocking( $self->{c_sock}, 1 ); + MCE::Util::_sysread( $self->{c_sock}, $plen, 4 ); + MCE::Util::_nonblocking( $self->{c_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + if ( !$len || $len < 0 ) { + $self->end if defined $len && $len < 0; + return wantarray ? () : undef; + } + + MCE::Channel::_read( $self->{c_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +############################################################################### +## ---------------------------------------------------------------------------- +## Methods for two-way communication; consumers to producer(s). +## +############################################################################### + +sub send2 { + my $self = shift; + + my $data; + if ( @_ > 1 || ref $_[0] || !defined $_[0] ) { + $data = $freeze->([ @_ ]), $data .= '1'; + } else { + $data = $_[0], $data .= '0'; + } + + { + CORE::lock $self->{cw_mutex}; + MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32; + print { $self->{c_sock} } pack('i', length $data), $data; + } + + return 1; +} + +sub recv2 { + my ( $self ) = @_; + my ( $plen, $data ); + + { + my $pr_mutex = $self->{pr_mutex}; + CORE::lock $pr_mutex if $pr_mutex; + + MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32; + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + my $len = unpack('i', $plen); + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +sub recv2_nb { + my ( $self ) = @_; + my ( $plen, $data ); + + { + my $pr_mutex = $self->{pr_mutex}; + CORE::lock $pr_mutex if $pr_mutex; + + MCE::Util::_nonblocking( $self->{p_sock}, 1 ); + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 ) + : read( $self->{p_sock}, $plen, 4 ); + + MCE::Util::_nonblocking( $self->{p_sock}, 0 ); + + my $len; $len = unpack('i', $plen) if $plen; + + return wantarray ? () : undef unless $len; + + ( $pr_mutex || $is_MSWin32 ) + ? MCE::Channel::_read( $self->{p_sock}, $data, $len ) + : read( $self->{p_sock}, $data, $len ); + } + + chop( $data ) + ? wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1] + : wantarray ? ( $data ) : $data; +} + +1; + +__END__ + +############################################################################### +## ---------------------------------------------------------------------------- +## Module usage. +## +############################################################################### + +=head1 NAME + +MCE::Channel::Threads - Channel for producer(s) and many consumers + +=head1 VERSION + +This document describes MCE::Channel::Threads version 1.843 + +=head1 DESCRIPTION + +A channel class providing queue-like and two-way communication +for threads only. Locking is handled using threads::shared. + + use MCE::Channel; + + # The default is tuned for one producer and many consumers. + my $chnl_a = MCE::Channel->new( impl => 'Threads' ); + + # Specify the 'mp' option for safe use by two or more producers + # sending or recieving on the left side of the channel. + # E.g. C<->enqueue/->send> or C<->recv2/->recv2_nb> + + my $chnl_b = MCE::Channel->new( impl => 'Threads', mp => 1 ); + +The API is described in L<MCE::Channel>. + +=head1 LIMITATIONS + +The t/04_channel_threads tests are disabled on Unix platforms for Perl +less than 5.10.1. Basically, the MCE::Channel::Threads implementation +is not supported on older Perls unless the OS vendor applied upstream +patches (i.e. works on RedHat/CentOS 5.x running Perl 5.8.x). + +=head1 AUTHOR + +Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> + +=cut + |