summaryrefslogtreecommitdiff
path: root/lib/Future/Queue.pm
blob: c0e8cdc6f80f147a60660c70f1b1b6637a336014 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2019-2023 -- leonerd@leonerd.org.uk

package Future::Queue 0.51;

use v5.14;
use warnings;

use Carp;

=head1 NAME

C<Future::Queue> - a FIFO queue of values that uses L<Future>s

=head1 SYNOPSIS

   use Future::Queue;
   use Future::AsyncAwait;

   my $queue = Future::Queue->new;

   async sub process_queue
   {
      while(1) {
         my $thing = await $queue->shift;
         ...
      }
   }

   my $f = process_queue();
   $queue->push( "a thing" );

=head1 DESCRIPTION

Objects in this class provide a simple FIFO queue the stores arbitrary perl
values. Values may be added into the queue using the L</push> method, and
retrieved from it using the L</shift> method.

Values may be stored within the queue object for C<shift> to retrieve later,
or if the queue is empty then the future that C<shift> returns will be
completed once an item becomes available.

=cut

=head1 CONSTRUCTOR

=cut

=head2 new

   $queue = Future::Queue->new( %params );

Returns a new C<Future::Queue> instance.

Takes the following named arguments:

=over 4

=item max_items => INT

I<Since version 0.50.>

Optional. If defined, there can be at most the given number of items in the
queue. Attempts to call L</push> beyond that will yield a future that remains
pending, until a subsequent L</shift> operation makes enough space.

=item prototype => STRING or OBJECT or CODE

I<Since verison 0.51.>

Optional. If defined, gives either a class name, an object instance to clone
or a code reference to invoke when a new pending C<Future> instance is needed
by the C<shift> or C<push> methods when they cannot complete immediately.

   $f = $prototype->();    # if CODE reference
   $f = $prototype->new;   # otherwise

If not provided, a default of C<Future> will be used.

=back

=cut

sub new
{
   my $class = shift;
   my %params = @_;

   my $prototype = $params{prototype};

   return bless {
      items => [],
      max_items => $params{max_items},
      shift_waiters => [],
      ( ref $prototype eq "CODE" ) ?
         ( f_factory => $prototype ) :
         ( f_prototype => $prototype // "Future" ),
   }, $class;
}

=head2 push

   $queue->push( @items );

   await $queue->push( @items );

Adds more items into the queue. If the queue was previously empty and there is
at least one C<shift> future waiting, then the next one will be completed by
this method.

I<Since version 0.50> this can take multiple items; earlier versions can only
take one value at once.

This method always returns a L<Future> instance. If C<max_items> is defined
then it is possible that this future will be in a still-pending state;
indicating that there was not yet space in the queue to add the items. It will
become completed once enough L</shift> calls have been made to make space for
them.

If C<max_items> is not defined then these instances will always be immediately
complete; it is safe to drop or ignore it, or call the method in void context.

If the queue has been finished then more items cannot be pushed and an
exception will be thrown.

=cut

sub _manage_shift_waiters
{
   my $self = shift;

   my $items = $self->{items};
   my $shift_waiters = $self->{shift_waiters};

   ( shift @$shift_waiters )->()
      while @$shift_waiters and @$items;
}

sub push :method
{
   my $self = shift;
   my @more = @_;

   $self->{finished} and
      croak "Cannot ->push more items to a Future::Queue that has been finished";

   my $items = $self->{items};
   my $max = $self->{max_items};

   if( defined $max ) {
      my $count = $max - @$items;
      push @$items, splice @more, 0, $count;
   }
   else {
      push @$items, @more;
      @more = ();
   }

   $self->_manage_shift_waiters;
   return Future->done if !@more;

   my $f = $self->{f_factory} ? $self->{f_factory}->() : $self->{f_prototype}->new;
   push @{ $self->{push_waiters} //= [] }, sub {
      my $count = $max - @$items;
      push @$items, splice @more, 0, $count;
      $self->_manage_push_waiters;

      return 0 if @more;

      $f->done;
      return 1;
   };
   return $f;
}

=head2 shift

   $item = await $queue->shift;

Returns a C<Future> that will yield the next item from the queue. If there is
already an item then this will be taken and the returned future will be
immediate. If not, then the returned future will be pending, and the next
C<push> method will complete it.

If the queue has been finished then the future will yield an empty list, or
C<undef> in scalar context.

If C<undef> is a valid item in your queue, make sure to test this condition
carefully. For example:

   while( ( my $item ) = await $queue->shift ) {
      ...
   }

Here, the C<await> expression and the assignment are in list context, so the
loop will continue to iterate while I<any> value is assigned, even if that
value is C<undef>. The loop will only stop once no items are returned,
indicating the end of the queue.

=cut

sub _manage_push_waiters
{
   my $self = shift;

   my $items = $self->{items};
   my $max_items = $self->{max_items};
   my $push_waiters = $self->{push_waiters} || [];

   shift @$push_waiters
      while @$push_waiters and
         ( !defined $max_items or @$items < $max_items )
         and $push_waiters->[0]->();
}

sub shift :method
{
   my $self = shift;

   my $items = $self->{items};

   if( @$items ) {
      my @more = shift @$items;
      $self->_manage_push_waiters;
      return Future->done( @more );
   }

   return Future->done if $self->{finished};

   my $f = $self->{f_factory} ? $self->{f_factory}->() : $self->{f_prototype}->new;
   push @{ $self->{shift_waiters} }, sub {
      return $f->done if !@$items and $self->{finished};
      $f->done( shift @$items );
      $self->_manage_push_waiters;
   };
   return $f;
}

=head2 shift_atmost

   @items = await $queue->shift_atmost( $count );

I<Since version 0.50.>

A bulk version of L</shift> that can return multiple items at once.

Returns a C<Future> that will yield the next few items from the queue. If
there is already at least one item in the queue then up to C<$count> items
will be taken, and the returned future will be immediate. If not, then the
returned future will be pending and the next C<push> method will complete it.

=cut

sub shift_atmost
{
   my $self = shift;
   my ( $count ) = @_;

   my $items = $self->{items};

   if( @$items ) {
      my @more = splice @$items, 0, $count;
      $self->_manage_push_waiters;
      return Future->done( @more );
   }

   return Future->done if $self->{finished};

   my $f = $self->{f_factory} ? $self->{f_factory}->() : $self->{f_prototype}->new;
   push @{ $self->{shift_waiters} }, sub {
      return $f->done if !@$items and $self->{finished};
      $f->done( splice @$items, 0, $count );
      $self->_manage_push_waiters;
   };
   return $f;
}

=head2 finish

   $queue->finish;

I<Since version 0.50.>

Marks that the queue is now finished. Once the current list of items has been
exhausted, any further attempts to C<shift> more will yield empty.

=cut

sub finish
{
   my $self = shift;
   $self->{finished}++;

   ( shift @{ $self->{shift_waiters} } )->() while @{ $self->{shift_waiters} };
}

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;