publicvoidadd(final T bagEntry) { if (closed) { LOGGER.info("ConcurrentBag has been closed, ignoring add()"); thrownew IllegalStateException("ConcurrentBag has been closed, ignoring add()"); } // 添加新资源到 sharedList sharedList.add(bagEntry);
// 如果存在等待的线程,则直接交给等待线程 // spin until a thread takes it or none are waiting while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) { Thread.yield(); } }
删除资源对象,ConcurrentBag#remove:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
publicbooleanremove(final T bagEntry) { // 如果不是使用中,或被预定状态,则返回失败 if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) { LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry); returnfalse; } // 从 sharedList 中移除 finalboolean removed = sharedList.remove(bagEntry); if (!removed && !closed) { LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry); } // 从 threadList 中移除 threadList.get().remove(bagEntry);
public T borrow(long timeout, final TimeUnit timeUnit)throws InterruptedException { // 优先从当前线程本地的 threadList 中获取 // Try the thread-local list first final List<Object> list = threadList.get(); for (int i = list.size() - 1; i >= 0; i--) { final Object entry = list.remove(i); @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; // CAS 获取 if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } }
// 等待获取资源的线程数+1 // Otherwise, scan the shared list ... then poll the handoff queue finalint waiting = waiters.incrementAndGet(); try { // 从 sharedList 中获取 for (T bagEntry : sharedList) { // cas 乐观锁获取 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiter's connection, request another bag add. // 如果从 sharedList 中获取到了资源,但是存在等待线程,则是抢占了其他线程的资源,需要再申请补充一个资源 if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; } }
publicfinalvoidclose()throws SQLException { // 关闭 statements // Closing statements can cause connection eviction, so this must run before the conditional below closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) { leakTask.cancel();
try { // 存在未提交的事务,并且未开启自动提交,则进行回滚 if (isCommitStateDirty && !isAutoCommit) { delegate.rollback(); lastAccess = currentTime(); LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate); }
delegate.clearWarnings(); } catch (SQLException e) { // when connections are aborted, exceptions are often thrown that should not reach the application if (!poolEntry.isMarkedEvicted()) { throw checkException(e); } } finally { delegate = ClosedConnection.CLOSED_CONNECTION; // 回收连接 poolEntry.recycle(lastAccess); } } }
最终在回收连接时,是通过 ConcurrentBag#requite 方法完成的。
总结
HikariCP 数据库连接池对数据结构的使用可谓是“知人善用”,ConcurrentBag 和 FastList 都非常适合资源的池化分配,前者其通过 CAS 替换了传统的重量级锁,并通过 ThreadLocal 将资源本地化,减少了共享资源的竞争;另外就是“细微之处见真章”,虽然可能都是一些不被人关注和在乎的小优化,但累加起来对性能却能有很大的提升。