summaryrefslogtreecommitdiff
path: root/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java')
-rwxr-xr-xsrc/main/java/com/zaxxer/hikari/util/ConcurrentBag.java32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
index ac0ccb1..9822563 100755
--- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
+++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
@@ -15,20 +15,23 @@
*/
package com.zaxxer.hikari.util;
+import static java.lang.Thread.yield;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.locks.LockSupport.parkNanos;
+
import static com.zaxxer.hikari.util.ClockSource.currentTime;
import static com.zaxxer.hikari.util.ClockSource.elapsedNanos;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_IN_USE;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_NOT_IN_USE;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_REMOVED;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_RESERVED;
-import static java.lang.Thread.yield;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,7 +76,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
private final SynchronousQueue<T> handoffQueue;
- public static interface IConcurrentBagEntry
+ public interface IConcurrentBagEntry
{
int STATE_NOT_IN_USE = 0;
int STATE_IN_USE = 1;
@@ -85,9 +88,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
int getState();
}
- public static interface IBagStateListener
+ public interface IBagStateListener
{
- Future<Boolean> addBagItem(int waiting);
+ void addBagItem(int waiting);
}
/**
@@ -147,7 +150,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
}
listener.addBagItem(waiting);
-
+
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
@@ -179,11 +182,16 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
{
bagEntry.setState(STATE_NOT_IN_USE);
- while (waiters.get() > 0) {
- if (handoffQueue.offer(bagEntry)) {
+ for (int i = 0; waiters.get() > 0; i++) {
+ if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
- yield();
+ else if ((i & 0xff) == 0xff) {
+ parkNanos(MICROSECONDS.toNanos(10));
+ }
+ else {
+ yield();
+ }
}
final List<Object> threadLocalList = threadList.get();
@@ -254,7 +262,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
*/
public List<T> values(final int state)
{
- return sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList());
+ final List<T> list = sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList());
+ Collections.reverse(list);
+ return list;
}
/**