diff options
Diffstat (limited to 'src/main/java/com/zaxxer/hikari/pool')
10 files changed, 284 insertions, 168 deletions
diff --git a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index e9b7364..94e4e43 100755 --- a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -16,11 +16,6 @@ package com.zaxxer.hikari.pool; -import static java.util.Collections.unmodifiableCollection; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - -import static com.zaxxer.hikari.pool.PoolEntry.LASTACCESS_REVERSE_COMPARABLE; import static com.zaxxer.hikari.util.ClockSource.currentTime; import static com.zaxxer.hikari.util.ClockSource.elapsedDisplayString; import static com.zaxxer.hikari.util.ClockSource.elapsedMillis; @@ -29,17 +24,19 @@ import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_IN_ import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_NOT_IN_USE; import static com.zaxxer.hikari.util.UtilityElf.createThreadPoolExecutor; import static com.zaxxer.hikari.util.UtilityElf.quietlySleep; +import static java.util.Collections.unmodifiableCollection; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLTimeoutException; import java.sql.SQLTransientConnectionException; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -48,6 +45,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; +import com.zaxxer.hikari.metrics.micrometer.MicrometerMetricsTrackerFactory; +import io.micrometer.core.instrument.MeterRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,13 +83,14 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag private final long HOUSEKEEPING_PERIOD_MS = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30)); private final PoolEntryCreator POOL_ENTRY_CREATOR = new PoolEntryCreator(null); + private final PoolEntryCreator POST_FILL_POOL_ENTRY_CREATOR = new PoolEntryCreator("After adding "); private final Collection<Runnable> addConnectionQueue; private final ThreadPoolExecutor addConnectionExecutor; private final ThreadPoolExecutor closeConnectionExecutor; private final ConcurrentBag<PoolEntry> connectionBag; - private final ProxyLeakTask leakTask; + private final ProxyLeakTaskFactory leakTaskFactory; private final SuspendResumeLock suspendResumeLock; private ScheduledExecutorService houseKeepingExecutorService; @@ -108,7 +108,7 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag this.connectionBag = new ConcurrentBag<>(this); this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; - initializeHouseKeepingExecutorService(); + this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); checkFailFast(); @@ -130,9 +130,9 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); - this.leakTask = new ProxyLeakTask(config.getLeakDetectionThreshold(), houseKeepingExecutorService); + this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); - this.houseKeeperTask = this.houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS); + this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS); } /** @@ -175,9 +175,11 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag } else { metricsTracker.recordBorrowStats(poolEntry, startTime); - return poolEntry.createProxyConnection(leakTask.schedule(poolEntry), now); + return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); } } while (timeout > 0L); + + metricsTracker.recordBorrowTimeoutStats(startTime); } catch (InterruptedException e) { if (poolEntry != null) { @@ -219,7 +221,7 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag softEvictConnections(); addConnectionExecutor.shutdown(); - addConnectionExecutor.awaitTermination(5L, SECONDS); + addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS); destroyHouseKeepingExecutorService(); @@ -232,16 +234,16 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag do { abortActiveConnections(assassinExecutor); softEvictConnections(); - } while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(5)); + } while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10)); } finally { assassinExecutor.shutdown(); - assassinExecutor.awaitTermination(5L, SECONDS); + assassinExecutor.awaitTermination(10L, SECONDS); } shutdownNetworkTimeoutExecutor(); closeConnectionExecutor.shutdown(); - closeConnectionExecutor.awaitTermination(5L, SECONDS); + closeConnectionExecutor.awaitTermination(10L, SECONDS); } finally { logPoolState("After shutdown "); @@ -270,9 +272,12 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag public void setMetricRegistry(Object metricRegistry) { - if (metricRegistry != null) { + if (metricRegistry instanceof MetricRegistry) { setMetricsTrackerFactory(new CodahaleMetricsTrackerFactory((MetricRegistry) metricRegistry)); } + else if (metricRegistry instanceof MeterRegistry) { + setMetricsTrackerFactory(new MicrometerMetricsTrackerFactory((MeterRegistry) metricRegistry)); + } else { setMetricsTrackerFactory(null); } @@ -301,14 +306,14 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag /** {@inheritDoc} */ @Override - public Future<Boolean> addBagItem(final int waiting) + public void addBagItem(final int waiting) { final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional. if (shouldAdd) { - return addConnectionExecutor.submit(POOL_ENTRY_CREATOR); + addConnectionExecutor.submit(POOL_ENTRY_CREATOR); } - return CompletableFuture.completedFuture(Boolean.TRUE); + CompletableFuture.completedFuture(Boolean.TRUE); } // *********************************************************************** @@ -424,6 +429,7 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag } } + @SuppressWarnings("unused") int[] getPoolStateCounts() { return connectionBag.getStateCounts(); @@ -447,7 +453,11 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long lifetime = maxLifetime - variance; poolEntry.setFutureEol(houseKeepingExecutorService.schedule( - () -> softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */), + () -> { + if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) { + addBagItem(connectionBag.getWaitingThreadCount()); + } + }, lifetime, MILLISECONDS)); } @@ -469,7 +479,7 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) - addConnectionQueue.size(); for (int i = 0; i < connectionsToAdd; i++) { - addConnectionExecutor.submit((i < connectionsToAdd - 1) ? POOL_ENTRY_CREATOR : new PoolEntryCreator("After adding ")); + addConnectionExecutor.submit((i < connectionsToAdd - 1) ? POOL_ENTRY_CREATOR : POST_FILL_POOL_ENTRY_CREATOR); } } @@ -499,8 +509,12 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag */ private void checkFailFast() { + final long initializationTimeout = config.getInitializationFailTimeout(); + if (initializationTimeout < 0) { + return; + } + final long startTime = currentTime(); - Throwable throwable = new SQLTimeoutException("HikariCP was unable to initialize connections in pool " + poolName); do { final PoolEntry poolEntry = createPoolEntry(); if (poolEntry != null) { @@ -509,23 +523,21 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag LOGGER.debug("{} - Added connection {}", poolName, poolEntry.connection); } else { - final Connection connection = poolEntry.close(); - quietlyCloseConnection(connection, "(initialization check complete and minimumIdle is zero)"); + quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)"); } return; } - throwable = getLastConnectionFailure(); - if (throwable instanceof ConnectionSetupException) { - throwPoolInitializationException(throwable.getCause()); + if (getLastConnectionFailure() instanceof ConnectionSetupException) { + throwPoolInitializationException(getLastConnectionFailure().getCause()); } quietlySleep(1000L); - } while (elapsedMillis(startTime) < config.getInitializationFailTimeout()); + } while (elapsedMillis(startTime) < initializationTimeout); - if (config.getInitializationFailTimeout() > 0) { - throwPoolInitializationException(throwable); + if (initializationTimeout > 0) { + throwPoolInitializationException(getLastConnectionFailure()); } } @@ -536,25 +548,28 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag throw new PoolInitializationException(t); } - private void softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) + private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) { poolEntry.markEvicted(); if (owner || connectionBag.reserve(poolEntry)) { closeConnection(poolEntry, reason); + return true; } + + return false; } - private void initializeHouseKeepingExecutorService() + private ScheduledExecutorService initializeHouseKeepingExecutorService() { if (config.getScheduledExecutor() == null) { final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true)); final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy()); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); executor.setRemoveOnCancelPolicy(true); - this.houseKeepingExecutorService = executor; + return executor; } else { - this.houseKeepingExecutorService = config.getScheduledExecutor(); + return config.getScheduledExecutor(); } } @@ -605,11 +620,11 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag */ private final class PoolEntryCreator implements Callable<Boolean> { - private final String afterPrefix; + private final String loggingPrefix; - PoolEntryCreator(String afterPrefix) + PoolEntryCreator(String loggingPrefix) { - this.afterPrefix = afterPrefix; + this.loggingPrefix = loggingPrefix; } @Override @@ -621,8 +636,8 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag if (poolEntry != null) { connectionBag.add(poolEntry); LOGGER.debug("{} - Added connection {}", poolName, poolEntry.connection); - if (afterPrefix != null) { - logPoolState(afterPrefix); + if (loggingPrefix != null) { + logPoolState(loggingPrefix); } return Boolean.TRUE; } @@ -657,7 +672,7 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag // refresh timeouts in case they changed via MBean connectionTimeout = config.getConnectionTimeout(); validationTimeout = config.getValidationTimeout(); - leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); + leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); final long idleTimeout = config.getIdleTimeout(); final long now = currentTime(); @@ -683,14 +698,16 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag logPoolState("Before cleanup "); afterPrefix = "After cleanup "; - connectionBag - .values(STATE_NOT_IN_USE) - .stream() - .sorted(LASTACCESS_REVERSE_COMPARABLE) - .skip(config.getMinimumIdle()) - .filter(p -> elapsedMillis(p.lastAccessed, now) > idleTimeout) - .filter(p -> connectionBag.reserve(p)) - .forEachOrdered(p -> closeConnection(p, "(connection has passed idleTimeout)")); + final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE); + int removed = 0; + for (PoolEntry entry : notInUse) { + if (elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) { + closeConnection(entry, "(connection has passed idleTimeout)"); + if (++removed > config.getMinimumIdle()) { + break; + } + } + } } logPoolState(afterPrefix); diff --git a/src/main/java/com/zaxxer/hikari/pool/PoolBase.java b/src/main/java/com/zaxxer/hikari/pool/PoolBase.java index b463dae..9063d81 100755 --- a/src/main/java/com/zaxxer/hikari/pool/PoolBase.java +++ b/src/main/java/com/zaxxer/hikari/pool/PoolBase.java @@ -16,6 +16,7 @@ package com.zaxxer.hikari.pool; +import static com.zaxxer.hikari.pool.ProxyConnection.DIRTY_BIT_SCHEMA; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -29,12 +30,16 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.management.MBeanServer; import javax.management.ObjectName; +import javax.naming.InitialContext; +import javax.naming.NamingException; import javax.sql.DataSource; +import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,11 +66,11 @@ abstract class PoolBase protected final HikariConfig config; protected final String poolName; - protected long connectionTimeout; - protected long validationTimeout; - protected IMetricsTrackerDelegate metricsTracker; + long connectionTimeout; + long validationTimeout; + IMetricsTrackerDelegate metricsTracker; - private static final String[] RESET_STATES = {"readOnly", "autoCommit", "isolation", "catalog", "netTimeout"}; + private static final String[] RESET_STATES = {"readOnly", "autoCommit", "isolation", "catalog", "netTimeout", "schema"}; private static final int UNINITIALIZED = -1; private static final int TRUE = 1; private static final int FALSE = 0; @@ -79,6 +84,7 @@ abstract class PoolBase private DataSource dataSource; private final String catalog; + private final String schema; private final boolean isReadOnly; private final boolean isAutoCommit; @@ -94,6 +100,7 @@ abstract class PoolBase this.networkTimeout = UNINITIALIZED; this.catalog = config.getCatalog(); + this.schema = config.getSchema(); this.isReadOnly = config.isReadOnly(); this.isAutoCommit = config.isAutoCommit(); this.transactionIsolation = UtilityElf.getTransactionIsolation(config.getTransactionIsolation()); @@ -146,28 +153,30 @@ abstract class PoolBase { try { try { + setNetworkTimeout(connection, validationTimeout); + + final long validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000; + if (isUseJdbc4Validation) { - return connection.isValid((int) MILLISECONDS.toSeconds(Math.max(1000L, validationTimeout))); + return connection.isValid((int) validationSeconds); } - setNetworkTimeout(connection, validationTimeout); - try (Statement statement = connection.createStatement()) { if (isNetworkTimeoutSupported != TRUE) { - setQueryTimeout(statement, (int) MILLISECONDS.toSeconds(Math.max(1000L, validationTimeout))); + setQueryTimeout(statement, (int) validationSeconds); } statement.execute(config.getConnectionTestQuery()); } } finally { + setNetworkTimeout(connection, networkTimeout); + if (isIsolateInternalQueries && !isAutoCommit) { connection.rollback(); } } - setNetworkTimeout(connection, networkTimeout); - return true; } catch (Exception e) { @@ -225,6 +234,11 @@ abstract class PoolBase resetBits |= DIRTY_BIT_NETTIMEOUT; } + if ((dirtyBits & DIRTY_BIT_SCHEMA) != 0 && schema != null && !schema.equals(proxyConnection.getSchemaState())) { + connection.setSchema(schema); + resetBits |= DIRTY_BIT_SCHEMA; + } + if (resetBits != 0 && LOGGER.isDebugEnabled()) { LOGGER.debug("{} - Reset ({}) on connection {}", poolName, stringFromResetBits(resetBits), connection); } @@ -237,6 +251,15 @@ abstract class PoolBase } } + long getLoginTimeout() + { + try { + return (dataSource != null) ? dataSource.getLoginTimeout() : SECONDS.toSeconds(5); + } catch (SQLException e) { + return SECONDS.toSeconds(5); + } + } + // *********************************************************************** // JMX methods // *********************************************************************** @@ -244,7 +267,7 @@ abstract class PoolBase /** * Register MBeans for HikariConfig and HikariPool. * - * @param pool a HikariPool instance + * @param hikariPool a HikariPool instance */ void registerMBeans(final HikariPool hikariPool) { @@ -300,8 +323,6 @@ abstract class PoolBase /** * Create/initialize the underlying DataSource. - * - * @return a DataSource instance */ private void initializeDataSource() { @@ -310,6 +331,7 @@ abstract class PoolBase final String password = config.getPassword(); final String dsClassName = config.getDataSourceClassName(); final String driverClassName = config.getDriverClassName(); + final String dataSourceJNDI = config.getDataSourceJNDI(); final Properties dataSourceProperties = config.getDataSourceProperties(); DataSource dataSource = config.getDataSource(); @@ -320,6 +342,14 @@ abstract class PoolBase else if (jdbcUrl != null && dataSource == null) { dataSource = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password); } + else if (dataSourceJNDI != null && dataSource == null) { + try { + InitialContext ic = new InitialContext(); + dataSource = (DataSource) ic.lookup(dataSourceJNDI); + } catch (NamingException e) { + throw new PoolInitializationException(e); + } + } if (dataSource != null) { setLoginTimeout(dataSource); @@ -334,7 +364,7 @@ abstract class PoolBase * * @return a Connection connection */ - Connection newConnection() throws Exception + private Connection newConnection() throws Exception { final long start = currentTime(); @@ -375,7 +405,7 @@ abstract class PoolBase * Setup a connection initial state. * * @param connection a Connection - * @throws SQLException thrown from driver + * @throws ConnectionSetupException thrown if any exception is encountered */ private void setupConnection(final Connection connection) throws ConnectionSetupException { @@ -400,6 +430,10 @@ abstract class PoolBase connection.setCatalog(catalog); } + if (schema != null) { + connection.setSchema(schema); + } + executeSql(connection, config.getConnectionInitSql(), true); setNetworkTimeout(connection, networkTimeout); @@ -438,10 +472,12 @@ abstract class PoolBase } catch (SQLException e) { LOGGER.warn("{} - Default transaction isolation level detection failed ({}).", poolName, e.getMessage()); + if (e.getSQLState() != null && !e.getSQLState().startsWith("08")) { + throw e; + } } - finally { - isValidChecked = true; - } + + isValidChecked = true; } } @@ -610,7 +646,7 @@ abstract class PoolBase { private static final long serialVersionUID = 929872118275916521L; - public ConnectionSetupException(Throwable t) + ConnectionSetupException(Throwable t) { super(t); } @@ -635,12 +671,14 @@ abstract class PoolBase } } - static interface IMetricsTrackerDelegate extends AutoCloseable + interface IMetricsTrackerDelegate extends AutoCloseable { default void recordConnectionUsage(PoolEntry poolEntry) {} default void recordConnectionCreated(long connectionCreatedMillis) {} + default void recordBorrowTimeoutStats(long startTime) {} + default void recordBorrowStats(final PoolEntry poolEntry, final long startTime) {} default void recordConnectionTimeout() {} @@ -676,6 +714,12 @@ abstract class PoolBase } @Override + public void recordBorrowTimeoutStats(long startTime) + { + tracker.recordConnectionAcquiredNanos(elapsedNanos(startTime)); + } + + @Override public void recordBorrowStats(final PoolEntry poolEntry, final long startTime) { final long now = currentTime(); diff --git a/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java b/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java index 5b16047..2b45256 100644 --- a/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java +++ b/src/main/java/com/zaxxer/hikari/pool/PoolEntry.java @@ -15,22 +15,18 @@ */ package com.zaxxer.hikari.pool; -import static com.zaxxer.hikari.util.ClockSource.currentTime; -import static com.zaxxer.hikari.util.ClockSource.elapsedDisplayString; -import static com.zaxxer.hikari.util.ClockSource.elapsedMillis; +import com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry; +import com.zaxxer.hikari.util.FastList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.util.Comparator; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry; -import com.zaxxer.hikari.util.FastList; +import static com.zaxxer.hikari.util.ClockSource.*; /** * Entry used in the ConcurrentBag to track Connection instances. @@ -42,12 +38,12 @@ final class PoolEntry implements IConcurrentBagEntry private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class); private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater; - static final Comparator<PoolEntry> LASTACCESS_REVERSE_COMPARABLE; - Connection connection; long lastAccessed; long lastBorrowed; - private volatile int state; + + @SuppressWarnings("FieldCanBeLocal") + private volatile int state = 0; private volatile boolean evict; private volatile ScheduledFuture<?> endOfLife; @@ -60,13 +56,6 @@ final class PoolEntry implements IConcurrentBagEntry static { - LASTACCESS_REVERSE_COMPARABLE = new Comparator<PoolEntry>() { - @Override - public int compare(final PoolEntry entryOne, final PoolEntry entryTwo) { - return Long.compare(entryTwo.lastAccessed, entryOne.lastAccessed); - } - }; - stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state"); } @@ -94,7 +83,9 @@ final class PoolEntry implements IConcurrentBagEntry } /** - * @param endOfLife + * Set the end of life {@link ScheduledFuture}. + * + * @param endOfLife this PoolEntry/Connection's end of life {@link ScheduledFuture} */ void setFutureEol(final ScheduledFuture<?> endOfLife) { diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyConnection.java b/src/main/java/com/zaxxer/hikari/pool/ProxyConnection.java index 7143bae..2d07e31 100644 --- a/src/main/java/com/zaxxer/hikari/pool/ProxyConnection.java +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyConnection.java @@ -19,10 +19,10 @@ package com.zaxxer.hikari.pool; import static com.zaxxer.hikari.util.ClockSource.currentTime; import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.CallableStatement; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Savepoint; @@ -44,16 +44,18 @@ import com.zaxxer.hikari.util.FastList; */ public abstract class ProxyConnection implements Connection { - static final int DIRTY_BIT_READONLY = 0b00001; - static final int DIRTY_BIT_AUTOCOMMIT = 0b00010; - static final int DIRTY_BIT_ISOLATION = 0b00100; - static final int DIRTY_BIT_CATALOG = 0b01000; - static final int DIRTY_BIT_NETTIMEOUT = 0b10000; + static final int DIRTY_BIT_READONLY = 0b000001; + static final int DIRTY_BIT_AUTOCOMMIT = 0b000010; + static final int DIRTY_BIT_ISOLATION = 0b000100; + static final int DIRTY_BIT_CATALOG = 0b001000; + static final int DIRTY_BIT_NETTIMEOUT = 0b010000; + static final int DIRTY_BIT_SCHEMA = 0b100000; private static final Logger LOGGER; private static final Set<String> ERROR_STATES; private static final Set<Integer> ERROR_CODES; + @SuppressWarnings("WeakerAccess") protected Connection delegate; private final PoolEntry poolEntry; @@ -69,6 +71,7 @@ public abstract class ProxyConnection implements Connection private int networkTimeout; private int transactionIsolation; private String dbcatalog; + private String dbschema; // static initializer static { @@ -101,10 +104,7 @@ public abstract class ProxyConnection implements Connection @Override public final String toString() { - return new StringBuilder(64) - .append(this.getClass().getSimpleName()).append('@').append(System.identityHashCode(this)) - .append(" wrapping ") - .append(delegate).toString(); + return this.getClass().getSimpleName() + '@' + System.identityHashCode(this) + " wrapping " + delegate; } // *********************************************************************** @@ -121,6 +121,11 @@ public abstract class ProxyConnection implements Connection return dbcatalog; } + final String getSchemaState() + { + return dbschema; + } + final int getTransactionIsolationState() { return transactionIsolation; @@ -186,29 +191,32 @@ public abstract class ProxyConnection implements Connection leakTask.cancel(); } - private final synchronized <T extends Statement> T trackStatement(final T statement) + private synchronized <T extends Statement> T trackStatement(final T statement) { openStatements.add(statement); return statement; } - private final void closeStatements() + @SuppressWarnings("EmptyTryBlock") + private synchronized void closeStatements() { final int size = openStatements.size(); if (size > 0) { for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) { - try (Statement statement = openStatements.get(i)) { + try (Statement ignored = openStatements.get(i)) { // automatic resource cleanup } catch (SQLException e) { - checkException(e); + LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()", + poolEntry.getPoolName(), delegate); + leakTask.cancel(); + poolEntry.evict("(exception closing Statements during Connection.close())"); + delegate = ClosedConnection.CLOSED_CONNECTION; } } - synchronized (this) { - openStatements.clear(); - } + openStatements.clear(); } } @@ -346,6 +354,14 @@ public abstract class ProxyConnection implements Connection /** {@inheritDoc} */ @Override + public DatabaseMetaData getMetaData() throws SQLException + { + markCommitStateDirty(); + return delegate.getMetaData(); + } + + /** {@inheritDoc} */ + @Override public void commit() throws SQLException { delegate.commit(); @@ -419,6 +435,15 @@ public abstract class ProxyConnection implements Connection /** {@inheritDoc} */ @Override + public void setSchema(String schema) throws SQLException + { + delegate.setSchema(schema); + dbschema = schema; + dirtyBits |= DIRTY_BIT_SCHEMA; + } + + /** {@inheritDoc} */ + @Override public final boolean isWrapperFor(Class<?> iface) throws SQLException { return iface.isInstance(delegate) || (delegate instanceof Wrapper && delegate.isWrapperFor(iface)); @@ -449,24 +474,19 @@ public abstract class ProxyConnection implements Connection private static Connection getClosedConnection() { - InvocationHandler handler = new InvocationHandler() { - - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable - { - final String methodName = method.getName(); - if ("abort".equals(methodName)) { - return Void.TYPE; - } - else if ("isValid".equals(methodName)) { - return Boolean.FALSE; - } - else if ("toString".equals(methodName)) { - return ClosedConnection.class.getCanonicalName(); - } - - throw new SQLException("Connection is closed"); + InvocationHandler handler = (proxy, method, args) -> { + final String methodName = method.getName(); + if ("abort".equals(methodName)) { + return Void.TYPE; + } + else if ("isValid".equals(methodName)) { + return Boolean.FALSE; } + else if ("toString".equals(methodName)) { + return ClosedConnection.class.getCanonicalName(); + } + + throw new SQLException("Connection is closed"); }; return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler); diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyFactory.java b/src/main/java/com/zaxxer/hikari/pool/ProxyFactory.java index 026debb..4a074da 100644 --- a/src/main/java/com/zaxxer/hikari/pool/ProxyFactory.java +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyFactory.java @@ -30,6 +30,7 @@ import com.zaxxer.hikari.util.FastList; * * @author Brett Wooldridge */ +@SuppressWarnings("unused") public final class ProxyFactory { private ProxyFactory() @@ -39,13 +40,13 @@ public final class ProxyFactory /** * Create a proxy for the specified {@link Connection} instance. - * @param poolEntry - * @param connection - * @param openStatements - * @param leakTask - * @param now - * @param isReadOnly - * @param isAutoCommit + * @param poolEntry the PoolEntry holding pool state + * @param connection the raw database Connection + * @param openStatements a reusable list to track open Statement instances + * @param leakTask the ProxyLeakTask for this connection + * @param now the current timestamp + * @param isReadOnly the default readOnly state of the connection + * @param isAutoCommit the default autoCommit state of the connection * @return a proxy that wraps the specified {@link Connection} */ static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit) diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTask.java b/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTask.java index 0fdc93e..f72615e 100644 --- a/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTask.java +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTask.java @@ -32,10 +32,8 @@ import org.slf4j.LoggerFactory; class ProxyLeakTask implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class); - private static final ProxyLeakTask NO_LEAK; + static final ProxyLeakTask NO_LEAK; - private ScheduledExecutorService executorService; - private long leakDetectionThreshold; private ScheduledFuture<?> scheduledFuture; private String connectionName; private Exception exception; @@ -45,35 +43,29 @@ class ProxyLeakTask implements Runnable { NO_LEAK = new ProxyLeakTask() { @Override + void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {} + + @Override + public void run() {} + + @Override public void cancel() {} }; } - ProxyLeakTask(final long leakDetectionThreshold, final ScheduledExecutorService executorService) - { - this.executorService = executorService; - this.leakDetectionThreshold = leakDetectionThreshold; - } - - private ProxyLeakTask(final ProxyLeakTask parent, final PoolEntry poolEntry) + ProxyLeakTask(final PoolEntry poolEntry) { this.exception = new Exception("Apparent connection leak detected"); this.connectionName = poolEntry.connection.toString(); - scheduledFuture = parent.executorService.schedule(this, parent.leakDetectionThreshold, TimeUnit.MILLISECONDS); } private ProxyLeakTask() { } - - ProxyLeakTask schedule(final PoolEntry bagEntry) - { - return (leakDetectionThreshold == 0) ? NO_LEAK : new ProxyLeakTask(this, bagEntry); - } - void updateLeakDetectionThreshold(final long leakDetectionThreshold) + void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) { - this.leakDetectionThreshold = leakDetectionThreshold; + scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS); } /** {@inheritDoc} */ diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTaskFactory.java b/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTaskFactory.java new file mode 100644 index 0000000..fde6074 --- /dev/null +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyLeakTaskFactory.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2013, 2014 Brett Wooldridge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zaxxer.hikari.pool; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * A factory for {@link ProxyLeakTask} Runnables that are scheduled in the future to report leaks. + * + * @author Brett Wooldridge + * @author Andreas Brenk + */ +class ProxyLeakTaskFactory +{ + private ScheduledExecutorService executorService; + private long leakDetectionThreshold; + + ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService) + { + this.executorService = executorService; + this.leakDetectionThreshold = leakDetectionThreshold; + } + + ProxyLeakTask schedule(final PoolEntry poolEntry) + { + return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry); + } + + void updateLeakDetectionThreshold(final long leakDetectionThreshold) + { + this.leakDetectionThreshold = leakDetectionThreshold; + } + + private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) { + ProxyLeakTask task = new ProxyLeakTask(poolEntry); + task.schedule(executorService, leakDetectionThreshold); + + return task; + } +} diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyPreparedStatement.java b/src/main/java/com/zaxxer/hikari/pool/ProxyPreparedStatement.java index e2d96c9..68e47af 100644 --- a/src/main/java/com/zaxxer/hikari/pool/ProxyPreparedStatement.java +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyPreparedStatement.java @@ -27,7 +27,7 @@ import java.sql.SQLException; */ public abstract class ProxyPreparedStatement extends ProxyStatement implements PreparedStatement { - protected ProxyPreparedStatement(ProxyConnection connection, PreparedStatement statement) + ProxyPreparedStatement(ProxyConnection connection, PreparedStatement statement) { super(connection, statement); } @@ -50,7 +50,7 @@ public abstract class ProxyPreparedStatement extends ProxyStatement implements P { connection.markCommitStateDirty(); ResultSet resultSet = ((PreparedStatement) delegate).executeQuery(); - return ProxyFactory.getProxyResultSet(connection, this, resultSet); + return ProxyFactory.getProxyResultSet(connection, this, resultSet); } /** {@inheritDoc} */ diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyResultSet.java b/src/main/java/com/zaxxer/hikari/pool/ProxyResultSet.java index 1933979..e2c4950 100644 --- a/src/main/java/com/zaxxer/hikari/pool/ProxyResultSet.java +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyResultSet.java @@ -19,7 +19,6 @@ package com.zaxxer.hikari.pool; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Wrapper; /** * This is the proxy class for java.sql.ResultSet. @@ -30,7 +29,7 @@ public abstract class ProxyResultSet implements ResultSet { protected final ProxyConnection connection; protected final ProxyStatement statement; - protected final ResultSet delegate; + final ResultSet delegate; protected ProxyResultSet(ProxyConnection connection, ProxyStatement statement, ResultSet resultSet) { @@ -39,6 +38,7 @@ public abstract class ProxyResultSet implements ResultSet this.delegate = resultSet; } + @SuppressWarnings("unused") final SQLException checkException(SQLException e) { return connection.checkException(e); @@ -48,10 +48,7 @@ public abstract class ProxyResultSet implements ResultSet @Override public String toString() { - return new StringBuilder(64) - .append(this.getClass().getSimpleName()).append('@').append(System.identityHashCode(this)) - .append(" wrapping ") - .append(delegate).toString(); + return this.getClass().getSimpleName() + '@' + System.identityHashCode(this) + " wrapping " + delegate; } // ********************************************************************** @@ -97,7 +94,7 @@ public abstract class ProxyResultSet implements ResultSet if (iface.isInstance(delegate)) { return (T) delegate; } - else if (delegate instanceof Wrapper) { + else if (delegate != null) { return delegate.unwrap(iface); } diff --git a/src/main/java/com/zaxxer/hikari/pool/ProxyStatement.java b/src/main/java/com/zaxxer/hikari/pool/ProxyStatement.java index 1d92cd8..bb5ac69 100644 --- a/src/main/java/com/zaxxer/hikari/pool/ProxyStatement.java +++ b/src/main/java/com/zaxxer/hikari/pool/ProxyStatement.java @@ -20,7 +20,6 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Wrapper; /** * This is the proxy class for java.sql.Statement. @@ -30,17 +29,18 @@ import java.sql.Wrapper; public abstract class ProxyStatement implements Statement { protected final ProxyConnection connection; - protected final Statement delegate; + final Statement delegate; private boolean isClosed; private ResultSet proxyResultSet; - protected ProxyStatement(ProxyConnection connection, Statement statement) + ProxyStatement(ProxyConnection connection, Statement statement) { this.connection = connection; this.delegate = statement; } + @SuppressWarnings("unused") final SQLException checkException(SQLException e) { return connection.checkException(e); @@ -51,10 +51,7 @@ public abstract class ProxyStatement implements Statement public final String toString() { final String delegateToString = delegate.toString(); - return new StringBuilder(64 + delegateToString.length()) - .append(this.getClass().getSimpleName()).append('@').append(System.identityHashCode(this)) - .append(" wrapping ") - .append(delegateToString).toString(); + return this.getClass().getSimpleName() + '@' + System.identityHashCode(this) + " wrapping " + delegateToString; } // ********************************************************************** @@ -65,11 +62,14 @@ public abstract class ProxyStatement implements Statement @Override public final void close() throws SQLException { - if (isClosed) { - return; + synchronized (this) { + if (isClosed) { + return; + } + + isClosed = true; } - isClosed = true; connection.untrackStatement(delegate); try { @@ -231,7 +231,7 @@ public abstract class ProxyStatement implements Statement if (iface.isInstance(delegate)) { return (T) delegate; } - else if (delegate instanceof Wrapper) { + else if (delegate != null) { return delegate.unwrap(iface); } |