summaryrefslogtreecommitdiff
path: root/lib/MCE/Core/Input/Iterator.pm
blob: 07346505a3c01fd81f0255d81e16b7971ca543a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
###############################################################################
## ----------------------------------------------------------------------------
## Iterator reader.
##
## This package, used internally by the worker process, provides support for
## user specified iterators assigned to input_data.
##
## There is no public API.
##
###############################################################################

package MCE::Core::Input::Iterator;

use strict;
use warnings;

our $VERSION = '1.889';

## Items below are folded into MCE.

package # hide from rpm
   MCE;

no warnings qw( threads recursion uninitialized );

###############################################################################
## ----------------------------------------------------------------------------
## Worker process -- User Iterator.
##
###############################################################################

sub _worker_user_iterator {

   my ($self) = @_;

   @_ = ();

   _croak('MCE::_worker_user_iterator: (user_func) is not specified')
      unless (defined $self->{user_func});

   my $_is_MSWin32  = ($^O eq 'MSWin32') ? 1 : 0;
   my $_chn         = $self->{_chn};
   my $_DAT_LOCK    = $self->{_dat_lock};
   my $_DAT_W_SOCK  = $self->{_dat_w_sock}->[0];
   my $_DAU_W_SOCK  = $self->{_dat_w_sock}->[$_chn];
   my $_lock_chn    = $self->{_lock_chn};
   my $_chunk_size  = $self->{chunk_size};
   my $_wuf         = $self->{_wuf};

   my ($_dat_ex, $_dat_un, $_pid);

   if ($_lock_chn) {
      $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;

      # inlined for performance
      $_dat_ex = sub {
         CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
            if $_is_MSWin32;
         MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
            unless $_DAT_LOCK->{ $_pid };
      };
      $_dat_un = sub {
         syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
            if $_DAT_LOCK->{ $_pid };
      };
   }

   my ($_chunk_id, $_len);

   ## -------------------------------------------------------------------------

   $self->{_next_jmp} = sub { goto _WORKER_USER_ITERATOR__NEXT; };
   $self->{_last_jmp} = sub { goto _WORKER_USER_ITERATOR__LAST; };

   local $_;

   _WORKER_USER_ITERATOR__NEXT:

   while (1) {
      undef $_ if (length > MAX_GC_SIZE);

      $_ = '';

      ## Obtain the next chunk of data.
      {
         local $\ = undef if (defined $\);
         local $/ = $LF   if ($/ ne $LF );

         $_dat_ex->() if $_lock_chn;
         print {$_DAT_W_SOCK} OUTPUT_I_REF . $LF . $_chn . $LF;
         MCE::Util::_sock_ready($_DAU_W_SOCK, -1) if $_is_MSWin32;
         chomp($_len = <$_DAU_W_SOCK>);

         if ($_len < 0) {
            $_dat_un->() if $_lock_chn;
            return;
         }

         chomp($_chunk_id = <$_DAU_W_SOCK>);
         read $_DAU_W_SOCK, $_, $_len;

         $_dat_un->() if $_lock_chn;
      }

      ## Call user function.
      my $_chunk_ref = $self->{thaw}($_); undef $_;
      $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
      $_wuf->($self, $_chunk_ref, $_chunk_id);
   }

   _WORKER_USER_ITERATOR__LAST:

   return;
}

1;

__END__

###############################################################################
## ----------------------------------------------------------------------------
## Module usage.
##
###############################################################################

=head1 NAME

MCE::Core::Input::Iterator - Iterator reader

=head1 VERSION

This document describes MCE::Core::Input::Iterator version 1.889

=head1 DESCRIPTION

This package, used internally by the worker process, provides support for
user specified iterators assigned to C<input_data>.

There is no public API.

=head1 SEE ALSO

The syntax for the C<input_data> option is described in L<MCE::Core>.

=head1 AUTHOR

Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>

=cut