-
Notifications
You must be signed in to change notification settings - Fork 5.8k
/
Copy pathSubmissionPublisher.java
1503 lines (1421 loc) · 58.6 KB
/
SubmissionPublisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import jdk.internal.invoke.MhUtil;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import static java.util.concurrent.Flow.Publisher;
import static java.util.concurrent.Flow.Subscriber;
import static java.util.concurrent.Flow.Subscription;
/**
* A {@link Flow.Publisher} that asynchronously issues submitted
* (non-null) items to current subscribers until it is closed. Each
* current subscriber receives newly submitted items in the same order
* unless drops or exceptions are encountered. Using a
* SubmissionPublisher allows item generators to act as compliant <a
* href="http://www.reactive-streams.org/"> reactive-streams</a>
* Publishers relying on drop handling and/or blocking for flow
* control.
*
* <p>A SubmissionPublisher uses the {@link Executor} supplied in its
* constructor for delivery to subscribers. The best choice of
* Executor depends on expected usage. If the generator(s) of
* submitted items run in separate threads, and the number of
* subscribers can be estimated, consider using a {@link
* Executors#newFixedThreadPool}. Otherwise consider using the
* default, normally the {@link ForkJoinPool#commonPool}.
*
* <p>Buffering allows producers and consumers to transiently operate
* at different rates. Each subscriber uses an independent buffer.
* Buffers are created upon first use and expanded as needed up to the
* given maximum. (The enforced capacity may be rounded up to the
* nearest power of two and/or bounded by the largest value supported
* by this implementation.) Invocations of {@link
* Flow.Subscription#request(long) request} do not directly result in
* buffer expansion, but risk saturation if unfilled requests exceed
* the maximum capacity. The default value of {@link
* Flow#defaultBufferSize()} may provide a useful starting point for
* choosing a capacity based on expected rates, resources, and usages.
*
* <p>A single SubmissionPublisher may be shared among multiple
* sources. Actions in a source thread prior to publishing an item or
* issuing a signal <a href="package-summary.html#MemoryVisibility">
* <i>happen-before</i></a> actions subsequent to the corresponding
* access by each subscriber. But reported estimates of lag and demand
* are designed for use in monitoring, not for synchronization
* control, and may reflect stale or inaccurate views of progress.
*
* <p>Publication methods support different policies about what to do
* when buffers are saturated. Method {@link #submit(Object) submit}
* blocks until resources are available. This is simplest, but least
* responsive. The {@code offer} methods may drop items (either
* immediately or with bounded timeout), but provide an opportunity to
* interpose a handler and then retry.
*
* <p>If any Subscriber method throws an exception, its subscription
* is cancelled. If a handler is supplied as a constructor argument,
* it is invoked before cancellation upon an exception in method
* {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
* {@link Flow.Subscriber#onSubscribe onSubscribe},
* {@link Flow.Subscriber#onError(Throwable) onError} and
* {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
* handled before cancellation. If the supplied Executor throws
* {@link RejectedExecutionException} (or any other RuntimeException
* or Error) when attempting to execute a task, or a drop handler
* throws an exception when processing a dropped item, then the
* exception is rethrown. In these cases, not all subscribers will
* have been issued the published item. It is usually good practice to
* {@link #closeExceptionally closeExceptionally} in these cases.
*
* <p>Method {@link #consume(Consumer)} simplifies support for a
* common case in which the only action of a subscriber is to request
* and process all items using a supplied function.
*
* <p>This class may also serve as a convenient base for subclasses
* that generate items, and use the methods in this class to publish
* them. For example here is a class that periodically publishes the
* items generated from a supplier. (In practice you might add methods
* to independently start and stop generation, to share Executors
* among publishers, and so on, or use a SubmissionPublisher as a
* component rather than a superclass.)
*
* <pre> {@code
* class PeriodicPublisher<T> extends SubmissionPublisher<T> {
* final ScheduledFuture<?> periodicTask;
* final ScheduledExecutorService scheduler;
* PeriodicPublisher(Executor executor, int maxBufferCapacity,
* Supplier<? extends T> supplier,
* long period, TimeUnit unit) {
* super(executor, maxBufferCapacity);
* scheduler = new ScheduledThreadPoolExecutor(1);
* periodicTask = scheduler.scheduleAtFixedRate(
* () -> submit(supplier.get()), 0, period, unit);
* }
* public void close() {
* periodicTask.cancel(false);
* scheduler.shutdown();
* super.close();
* }
* }}</pre>
*
* <p>Here is an example of a {@link Flow.Processor} implementation.
* It uses single-step requests to its publisher for simplicity of
* illustration. A more adaptive version could monitor flow using the
* lag estimate returned from {@code submit}, along with other utility
* methods.
*
* <pre> {@code
* class TransformProcessor<S,T> extends SubmissionPublisher<T>
* implements Flow.Processor<S,T> {
* final Function<? super S, ? extends T> function;
* Flow.Subscription subscription;
* TransformProcessor(Executor executor, int maxBufferCapacity,
* Function<? super S, ? extends T> function) {
* super(executor, maxBufferCapacity);
* this.function = function;
* }
* public void onSubscribe(Flow.Subscription subscription) {
* (this.subscription = subscription).request(1);
* }
* public void onNext(S item) {
* subscription.request(1);
* submit(function.apply(item));
* }
* public void onError(Throwable ex) { closeExceptionally(ex); }
* public void onComplete() { close(); }
* }}</pre>
*
* @param <T> the published item type
* @author Doug Lea
* @since 9
*/
public class SubmissionPublisher<T> implements Publisher<T>,
AutoCloseable {
/*
* Most mechanics are handled by BufferedSubscription. This class
* mainly tracks subscribers and ensures sequentiality, by using
* locks across public methods, to ensure thread-safety in the
* presence of multiple sources and maintain acquire-release
* ordering around user operations. However, we also track whether
* there is only a single source, and if so streamline some buffer
* operations by avoiding some atomics.
*/
/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
/**
* Initial buffer capacity used when maxBufferCapacity is
* greater. Must be a power of two.
*/
static final int INITIAL_CAPACITY = 32;
/** Round capacity to power of 2, at most limit. */
static final int roundCapacity(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n <= 0) ? 1 : // at least 1
(n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
}
/**
* Clients (BufferedSubscriptions) are maintained in a linked list
* (via their "next" fields). This works well for publish loops.
* It requires O(n) traversal to check for duplicate subscribers,
* but we expect that subscribing is much less common than
* publishing. Unsubscribing occurs only during traversal loops,
* when BufferedSubscription methods return negative values
* signifying that they have been closed. To reduce
* head-of-line blocking, submit and offer methods first call
* BufferedSubscription.offer on each subscriber, and place
* saturated ones in retries list (using nextRetry field), and
* retry, possibly blocking or dropping.
*/
BufferedSubscription<T> clients;
/** Lock for exclusion across multiple sources */
final ReentrantLock lock;
/** Run status, updated only within locks */
volatile boolean closed;
/** Set true on first call to subscribe, to initialize possible owner */
boolean subscribed;
/** The first caller thread to subscribe, or null if thread ever changed */
Thread owner;
/** If non-null, the exception in closeExceptionally */
volatile Throwable closedException;
// Parameters for constructing BufferedSubscriptions
final Executor executor;
final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
final int maxBufferCapacity;
/**
* Creates a new SubmissionPublisher using the given Executor for
* async delivery to subscribers, with the given maximum buffer size
* for each subscriber, and, if non-null, the given handler invoked
* when any Subscriber throws an exception in method {@link
* Flow.Subscriber#onNext(Object) onNext}.
*
* @param executor the executor to use for async delivery,
* supporting creation of at least one independent thread
* @param maxBufferCapacity the maximum capacity for each
* subscriber's buffer (the enforced capacity may be rounded up to
* the nearest power of two and/or bounded by the largest value
* supported by this implementation; method {@link #getMaxBufferCapacity}
* returns the actual value)
* @param handler if non-null, procedure to invoke upon exception
* thrown in method {@code onNext}
* @throws NullPointerException if executor is null
* @throws IllegalArgumentException if maxBufferCapacity not
* positive
*/
public SubmissionPublisher(Executor executor, int maxBufferCapacity,
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
if (executor == null)
throw new NullPointerException();
if (maxBufferCapacity <= 0)
throw new IllegalArgumentException("capacity must be positive");
this.lock = new ReentrantLock();
this.executor = executor;
this.onNextHandler = handler;
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
}
/**
* Creates a new SubmissionPublisher using the given Executor for
* async delivery to subscribers, with the given maximum buffer size
* for each subscriber, and no handler for Subscriber exceptions in
* method {@link Flow.Subscriber#onNext(Object) onNext}.
*
* @param executor the executor to use for async delivery,
* supporting creation of at least one independent thread
* @param maxBufferCapacity the maximum capacity for each
* subscriber's buffer (the enforced capacity may be rounded up to
* the nearest power of two and/or bounded by the largest value
* supported by this implementation; method {@link #getMaxBufferCapacity}
* returns the actual value)
* @throws NullPointerException if executor is null
* @throws IllegalArgumentException if maxBufferCapacity not
* positive
*/
public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
this(executor, maxBufferCapacity, null);
}
/**
* Creates a new SubmissionPublisher using the {@link
* ForkJoinPool#commonPool()} for async delivery to subscribers
* (unless it does not support a parallelism level of at least two,
* in which case, a new Thread is created to run each task), with
* maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
* handler for Subscriber exceptions in method {@link
* Flow.Subscriber#onNext(Object) onNext}.
*/
public SubmissionPublisher() {
this(ForkJoinPool.asyncCommonPool(), Flow.defaultBufferSize(), null);
}
/**
* Adds the given Subscriber unless already subscribed. If already
* subscribed, the Subscriber's {@link
* Flow.Subscriber#onError(Throwable) onError} method is invoked on
* the existing subscription with an {@link IllegalStateException}.
* Otherwise, upon success, the Subscriber's {@link
* Flow.Subscriber#onSubscribe onSubscribe} method is invoked
* asynchronously with a new {@link Flow.Subscription}. If {@link
* Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
* subscription is cancelled. Otherwise, if this SubmissionPublisher
* was closed exceptionally, then the subscriber's {@link
* Flow.Subscriber#onError onError} method is invoked with the
* corresponding exception, or if closed without exception, the
* subscriber's {@link Flow.Subscriber#onComplete() onComplete}
* method is invoked. Subscribers may enable receiving items by
* invoking the {@link Flow.Subscription#request(long) request}
* method of the new Subscription, and may unsubscribe by invoking
* its {@link Flow.Subscription#cancel() cancel} method.
*
* @param subscriber the subscriber
* @throws NullPointerException if subscriber is null
*/
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
ReentrantLock lock = this.lock;
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
lock.lock();
try {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
if (b == null) {
Throwable ex;
subscription.onSubscribe();
if ((ex = closedException) != null)
subscription.onError(ex);
else if (closed)
subscription.onComplete();
else if (pred == null)
clients = subscription;
else
pred.next = subscription;
break;
}
BufferedSubscription<T> next = b.next;
if (b.isClosed()) { // remove
b.next = null; // detach
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber)) {
b.onError(new IllegalStateException("Duplicate subscribe"));
break;
}
else
pred = b;
b = next;
}
} finally {
lock.unlock();
}
}
/**
* Common implementation for all three forms of submit and offer.
* Acts as submit if nanos == Long.MAX_VALUE, else offer.
*/
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
ReentrantLock lock = this.lock;
lock.lock();
try {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
complete = closed;
else {
complete = false;
boolean cleanMe = false;
BufferedSubscription<T> retries = null, rtail = null, next;
do {
next = b.next;
int stat = b.offer(item, unowned);
if (stat == 0) { // saturated; add to retry list
b.nextRetry = null; // avoid garbage on exceptions
if (rtail == null)
retries = b;
else
rtail.nextRetry = b;
rtail = b;
}
else if (stat < 0) // closed
cleanMe = true; // remove later
else if (stat > lag)
lag = stat;
} while ((b = next) != null);
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
} finally {
lock.unlock();
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
}
/**
* Helps, (timed) waits for, and/or drops buffers on list; returns
* lag or negative drops (for use in offer).
*/
private int retryOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop,
BufferedSubscription<T> retries, int lag,
boolean cleanMe) {
for (BufferedSubscription<T> r = retries; r != null;) {
BufferedSubscription<T> nextRetry = r.nextRetry;
r.nextRetry = null;
if (nanos > 0L)
r.awaitSpace(nanos);
int stat = r.retryOffer(item);
if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
stat = r.retryOffer(item);
if (stat == 0)
lag = (lag >= 0) ? -1 : lag - 1;
else if (stat < 0)
cleanMe = true;
else if (lag >= 0 && stat > lag)
lag = stat;
r = nextRetry;
}
if (cleanMe)
cleanAndCount();
return lag;
}
/**
* Returns current list count after removing closed subscribers.
* Call only while holding lock. Used mainly by retryOffer for
* cleanup.
*/
private int cleanAndCount() {
int count = 0;
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
pred = b;
++count;
}
}
return count;
}
/**
* Publishes the given item to each current subscriber by
* asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
* onNext} method, blocking uninterruptibly while resources for any
* subscriber are unavailable. This method returns an estimate of
* the maximum lag (number of items submitted but not yet consumed)
* among all current subscribers. This value is at least one
* (accounting for this submitted item) if there are any
* subscribers, else zero.
*
* <p>If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers,
* then this exception is rethrown, in which case not all
* subscribers will have been issued this item.
*
* @param item the (non-null) item to publish
* @return the estimated maximum lag among subscribers
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
/**
* Publishes the given item, if possible, to each current subscriber
* by asynchronously invoking its {@link
* Flow.Subscriber#onNext(Object) onNext} method. The item may be
* dropped by one or more subscribers if resource limits are
* exceeded, in which case the given handler (if non-null) is
* invoked, and if it returns true, retried once. Other calls to
* methods in this class by other threads are blocked while the
* handler is invoked. Unless recovery is assured, options are
* usually limited to logging the error and/or issuing an {@link
* Flow.Subscriber#onError(Throwable) onError} signal to the
* subscriber.
*
* <p>This method returns a status indicator: If negative, it
* represents the (negative) number of drops (failed attempts to
* issue the item to a subscriber). Otherwise it is an estimate of
* the maximum lag (number of items submitted but not yet
* consumed) among all current subscribers. This value is at least
* one (accounting for this submitted item) if there are any
* subscribers, else zero.
*
* <p>If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers, or
* the drop handler throws an exception when processing a dropped
* item, then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @param onDrop if non-null, the handler invoked upon a drop to a
* subscriber, with arguments of the subscriber and item; if it
* returns true, an offer is re-attempted (once)
* @return if negative, the (negative) number of drops; otherwise
* an estimate of maximum lag
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
public int offer(T item,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
return doOffer(item, 0L, onDrop);
}
/**
* Publishes the given item, if possible, to each current subscriber
* by asynchronously invoking its {@link
* Flow.Subscriber#onNext(Object) onNext} method, blocking while
* resources for any subscription are unavailable, up to the
* specified timeout or until the caller thread is interrupted, at
* which point the given handler (if non-null) is invoked, and if it
* returns true, retried once. (The drop handler may distinguish
* timeouts from interrupts by checking whether the current thread
* is interrupted.) Other calls to methods in this class by other
* threads are blocked while the handler is invoked. Unless
* recovery is assured, options are usually limited to logging the
* error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
* onError} signal to the subscriber.
*
* <p>This method returns a status indicator: If negative, it
* represents the (negative) number of drops (failed attempts to
* issue the item to a subscriber). Otherwise it is an estimate of
* the maximum lag (number of items submitted but not yet
* consumed) among all current subscribers. This value is at least
* one (accounting for this submitted item) if there are any
* subscribers, else zero.
*
* <p>If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers, or
* the drop handler throws an exception when processing a dropped
* item, then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @param timeout how long to wait for resources for any subscriber
* before giving up, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @param onDrop if non-null, the handler invoked upon a drop to a
* subscriber, with arguments of the subscriber and item; if it
* returns true, an offer is re-attempted (once)
* @return if negative, the (negative) number of drops; otherwise
* an estimate of maximum lag
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
public int offer(T item, long timeout, TimeUnit unit,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
long nanos = unit.toNanos(timeout);
// distinguishes from untimed (only wrt interrupt policy)
if (nanos == Long.MAX_VALUE) --nanos;
return doOffer(item, nanos, onDrop);
}
/**
* Unless already closed, issues {@link
* Flow.Subscriber#onComplete() onComplete} signals to current
* subscribers, and disallows subsequent attempts to publish.
* Upon return, this method does <em>NOT</em> guarantee that all
* subscribers have yet completed.
*/
public void close() {
ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
lock.lock();
try {
// no need to re-check closed here
b = clients;
clients = null;
owner = null;
closed = true;
} finally {
lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onComplete();
b = next;
}
}
}
/**
* Unless already closed, issues {@link
* Flow.Subscriber#onError(Throwable) onError} signals to current
* subscribers with the given error, and disallows subsequent
* attempts to publish. Future subscribers also receive the given
* error. Upon return, this method does <em>NOT</em> guarantee
* that all subscribers have yet completed.
*
* @param error the {@code onError} argument sent to subscribers
* @throws NullPointerException if error is null
*/
public void closeExceptionally(Throwable error) {
if (error == null)
throw new NullPointerException();
ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
lock.lock();
try {
b = clients;
if (!closed) { // don't clobber racing close
closedException = error;
clients = null;
owner = null;
closed = true;
}
} finally {
lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onError(error);
b = next;
}
}
}
/**
* Returns true if this publisher is not accepting submissions.
*
* @return true if closed
*/
public boolean isClosed() {
return closed;
}
/**
* Returns the exception associated with {@link
* #closeExceptionally(Throwable) closeExceptionally}, or null if
* not closed or if closed normally.
*
* @return the exception, or null if none
*/
public Throwable getClosedException() {
return closedException;
}
/**
* Returns true if this publisher has any subscribers.
*
* @return true if this publisher has any subscribers
*/
public boolean hasSubscribers() {
boolean nonEmpty = false;
ReentrantLock lock = this.lock;
lock.lock();
try {
for (BufferedSubscription<T> b = clients; b != null;) {
BufferedSubscription<T> next = b.next;
if (b.isClosed()) {
b.next = null;
b = clients = next;
}
else {
nonEmpty = true;
break;
}
}
} finally {
lock.unlock();
}
return nonEmpty;
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
int n;
ReentrantLock lock = this.lock;
lock.lock();
try {
n = cleanAndCount();
} finally {
lock.unlock();
}
return n;
}
/**
* Returns the Executor used for asynchronous delivery.
*
* @return the Executor used for asynchronous delivery
*/
public Executor getExecutor() {
return executor;
}
/**
* Returns the maximum per-subscriber buffer capacity.
*
* @return the maximum per-subscriber buffer capacity
*/
public int getMaxBufferCapacity() {
return maxBufferCapacity;
}
/**
* Returns a list of current subscribers for monitoring and
* tracking purposes, not for invoking {@link Flow.Subscriber}
* methods on the subscribers.
*
* @return list of current subscribers
*/
public List<Subscriber<? super T>> getSubscribers() {
ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
ReentrantLock lock = this.lock;
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
subs.add(b.subscriber);
pred = b;
}
}
} finally {
lock.unlock();
}
return subs;
}
/**
* Returns true if the given Subscriber is currently subscribed.
*
* @param subscriber the subscriber
* @return true if currently subscribed
* @throws NullPointerException if subscriber is null
*/
public boolean isSubscribed(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
boolean subscribed = false;
ReentrantLock lock = this.lock;
if (!closed) {
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscribed = subscriber.equals(b.subscriber))
break;
else
pred = b;
}
} finally {
lock.unlock();
}
}
return subscribed;
}
/**
* Returns an estimate of the minimum number of items requested
* (via {@link Flow.Subscription#request(long) request}) but not
* yet produced, among all current subscribers.
*
* @return the estimate, or zero if no subscribers
*/
public long estimateMinimumDemand() {
long min = Long.MAX_VALUE;
boolean nonEmpty = false;
ReentrantLock lock = this.lock;
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n; long d;
next = b.next;
if ((n = b.estimateLag()) < 0) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
if ((d = b.demand - n) < min)
min = d;
nonEmpty = true;
pred = b;
}
}
} finally {
lock.unlock();
}
return nonEmpty ? min : 0;
}
/**
* Returns an estimate of the maximum number of items produced but
* not yet consumed among all current subscribers.
*
* @return the estimate
*/
public int estimateMaximumLag() {
int max = 0;
ReentrantLock lock = this.lock;
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n;
next = b.next;
if ((n = b.estimateLag()) < 0) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
else {
if (n > max)
max = n;
pred = b;
}
}
} finally {
lock.unlock();
}
return max;
}
/**
* Processes all published items using the given Consumer function.
* Returns a CompletableFuture that is completed normally when this
* publisher signals {@link Flow.Subscriber#onComplete()
* onComplete}, or completed exceptionally upon any error, or an
* exception is thrown by the Consumer, or the returned
* CompletableFuture is cancelled, in which case no further items
* are processed.
*
* @param consumer the function applied to each onNext item
* @return a CompletableFuture that is completed normally
* when the publisher signals onComplete, and exceptionally
* upon any error or cancellation
* @throws NullPointerException if consumer is null
*/
public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
if (consumer == null)
throw new NullPointerException();
CompletableFuture<Void> status = new CompletableFuture<>();
subscribe(new ConsumerSubscriber<T>(status, consumer));
return status;
}
/** Subscriber for method consume */
static final class ConsumerSubscriber<T> implements Subscriber<T> {
final CompletableFuture<Void> status;
final Consumer<? super T> consumer;
Subscription subscription;
ConsumerSubscriber(CompletableFuture<Void> status,
Consumer<? super T> consumer) {
this.status = status; this.consumer = consumer;
}
public final void onSubscribe(Subscription subscription) {
this.subscription = subscription;
status.whenComplete((v, e) -> subscription.cancel());
if (!status.isDone())
subscription.request(Long.MAX_VALUE);
}
public final void onError(Throwable ex) {
status.completeExceptionally(ex);
}
public final void onComplete() {
status.complete(null);
}
public final void onNext(T item) {
try {
consumer.accept(item);
} catch (Throwable ex) {
subscription.cancel();
status.completeExceptionally(ex);
}
}
}
/**
* A task for consuming buffer items and signals, created and
* executed whenever they become available. A task consumes as
* many items/signals as possible before terminating, at which
* point another task is created when needed. The dual Runnable
* and ForkJoinTask declaration saves overhead when executed by
* ForkJoinPools, without impacting other kinds of Executors.
*/
@SuppressWarnings("serial")
static final class ConsumerTask<T> extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
final BufferedSubscription<T> consumer;
ConsumerTask(BufferedSubscription<T> consumer) {
this.consumer = consumer;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { consumer.consume(); return false; }
public final void run() { consumer.consume(); }
}
/**
* A resizable array-based ring buffer with integrated control to
* start a consumer task whenever items are available. The buffer
* algorithm is specialized for the case of at most one concurrent
* producer and consumer, and power of two buffer sizes. It relies
* primarily on atomic operations (CAS or getAndSet) at the next
* array slot to put or take an element, at the "tail" and "head"
* indices written only by the producer and consumer respectively.
*
* We ensure internally that there is at most one active consumer
* task at any given time. The publisher guarantees a single
* producer via its lock. Sync among producers and consumers
* relies on volatile fields "ctl", "demand", and "waiting" (along
* with element access). Other variables are accessed in plain
* mode, relying on outer ordering and exclusion, and/or enclosing
* them within other volatile accesses. Some atomic operations are
* avoided by tracking single threaded ownership by producers (in
* the style of biased locking).
*
* Execution control and protocol state are managed using field
* "ctl". Methods to subscribe, close, request, and cancel set
* ctl bits (mostly using atomic boolean method getAndBitwiseOr),
* and ensure that a task is running. (The corresponding consumer
* side actions are in method consume.) To avoid starting a new
* task on each action, ctl also includes a keep-alive bit
* (ACTIVE) that is refreshed if needed on producer actions.
* (Maintaining agreement about keep-alives requires most atomic
* updates to be full SC/Volatile strength, which is still much
* cheaper than using one task per item.) Error signals
* additionally null out items and/or fields to reduce termination
* latency. The cancel() method is supported by treating as ERROR
* but suppressing onError signal.
*
* Support for blocking also exploits the fact that there is only