java.util.concurrent.locks.LockSupport用法

java

  在看AQS内部的时候发现很多使用java.util.concurrent.locks.LockSupport类的东西。 比如CountDownLatch.await 阻塞的时候以及使用阻塞队列进行take、take 方法在线程阻塞的时候也是使用的该类。下面研究其主要的使用方法。

1. 线程状态简单理解

一开始学习线程的时候线程的状态如下:

1、新建状态NEW

  new了线程但是没有开始执行,比如: Thread t1 = new Thread();t1就是一个新建状态的线程。

2、可运行状态RUNNABLE

  new出来线程,调用start()方法即处于RUNNABLE状态了。处于RUNNABLE状态的线程可能正在Java虚拟机中运行,也可能正在等待处理器的资源,因为一个线程必须获得CPU的资源后,才可以运行其run()方法中的内容,否则排队等待

3、阻塞BLOCKED

  如果某一线程正在等待监视器锁,以便进入一个同步的块/方法,那么这个线程的状态就是阻塞BLOCKED

4、等待WAITING

  某一线程因为调用不带超时的Object的wait()方法、不带超时的Thread的join()方法、LockSupport的park()方法,就会处于等待WAITING状态

5、超时等待TIMED_WAITING

  某一线程因为调用带有指定正等待时间的Object的wait()方法、Thread的join()方法、Thread的sleep()方法、LockSupport的parkNanos()方法、LockSupport的parkUntil()方法,就会处于超时等待TIMED_WAITING状态

6、终止状态TERMINATED

  线程调用终止或者run()方法执行结束后,线程即处于终止状态。处于终止状态的线程不具备继续运行的能力。

  可以看到当调用park 方法之后进入WAITING 状态。

2. 主要API

1. 主要API如下:

源码如下:

/*

* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*

*/

/*

*

*

*

*

*

* Written by Doug Lea with assistance from members of JCP JSR-166

* Expert Group and released to the public domain, as explained at

* http://creativecommons.org/publicdomain/zero/1.0/

*/

package java.util.concurrent.locks;

import sun.misc.Unsafe;

/**

* Basic thread blocking primitives for creating locks and other

* synchronization classes.

*

* <p>This class associates, with each thread that uses it, a permit

* (in the sense of the {@link java.util.concurrent.Semaphore

* Semaphore} class). A call to {@code park} will return immediately

* if the permit is available, consuming it in the process; otherwise

* it <em>may</em> block. A call to {@code unpark} makes the permit

* available, if it was not already available. (Unlike with Semaphores

* though, permits do not accumulate. There is at most one.)

*

* <p>Methods {@code park} and {@code unpark} provide efficient

* means of blocking and unblocking threads that do not encounter the

* problems that cause the deprecated methods {@code Thread.suspend}

* and {@code Thread.resume} to be unusable for such purposes: Races

* between one thread invoking {@code park} and another thread trying

* to {@code unpark} it will preserve liveness, due to the

* permit. Additionally, {@code park} will return if the caller's

* thread was interrupted, and timeout versions are supported. The

* {@code park} method may also return at any other time, for "no

* reason", so in general must be invoked within a loop that rechecks

* conditions upon return. In this sense {@code park} serves as an

* optimization of a "busy wait" that does not waste as much time

* spinning, but must be paired with an {@code unpark} to be

* effective.

*

* <p>The three forms of {@code park} each also support a

* {@code blocker} object parameter. This object is recorded while

* the thread is blocked to permit monitoring and diagnostic tools to

* identify the reasons that threads are blocked. (Such tools may

* access blockers using method {@link #getBlocker(Thread)}.)

* The use of these forms rather than the original forms without this

* parameter is spanly encouraged. The normal argument to supply as

* a {@code blocker} within a lock implementation is {@code this}.

*

* <p>These methods are designed to be used as tools for creating

* higher-level synchronization utilities, and are not in themselves

* useful for most concurrency control applications. The {@code park}

* method is designed for use only in constructions of the form:

*

* <pre> {@code

* while (!canProceed()) { ... LockSupport.park(this); }}</pre>

*

* where neither {@code canProceed} nor any other actions prior to the

* call to {@code park} entail locking or blocking. Because only one

* permit is associated with each thread, any intermediary uses of

* {@code park} could interfere with its intended effects.

*

* <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out

* non-reentrant lock class:

* <pre> {@code

* class FIFOMutex {

* private final AtomicBoolean locked = new AtomicBoolean(false);

* private final Queue<Thread> waiters

* = new ConcurrentLinkedQueue<Thread>();

*

* public void lock() {

* boolean wasInterrupted = false;

* Thread current = Thread.currentThread();

* waiters.add(current);

*

* // Block while not first in queue or cannot acquire lock

* while (waiters.peek() != current ||

* !locked.compareAndSet(false, true)) {

* LockSupport.park(this);

* if (Thread.interrupted()) // ignore interrupts while waiting

* wasInterrupted = true;

* }

*

* waiters.remove();

* if (wasInterrupted) // reassert interrupt status on exit

* current.interrupt();

* }

*

* public void unlock() {

* locked.set(false);

* LockSupport.unpark(waiters.peek());

* }

* }}</pre>

*/

public class LockSupport {

private LockSupport() {} // Cannot be instantiated.

private static void setBlocker(Thread t, Object arg) {

// Even though volatile, hotspot doesn't need a write barrier here.

UNSAFE.putObject(t, parkBlockerOffset, arg);

}

/**

* Makes available the permit for the given thread, if it

* was not already available. If the thread was blocked on

* {@code park} then it will unblock. Otherwise, its next call

* to {@code park} is guaranteed not to block. This operation

* is not guaranteed to have any effect at all if the given

* thread has not been started.

*

* @param thread the thread to unpark, or {@code null}, in which case

* this operation has no effect

*/

public static void unpark(Thread thread) {

if (thread != null)

UNSAFE.unpark(thread);

}

/**

* Disables the current thread for thread scheduling purposes unless the

* permit is available.

*

* <p>If the permit is available then it is consumed and the call returns

* immediately; otherwise

* the current thread becomes disabled for thread scheduling

* purposes and lies dormant until one of three things happens:

*

* <ul>

* <li>Some other thread invokes {@link #unpark unpark} with the

* current thread as the target; or

*

* <li>Some other thread {@linkplain Thread#interrupt interrupts}

* the current thread; or

*

* <li>The call spuriously (that is, for no reason) returns.

* </ul>

*

* <p>This method does <em>not</em> report which of these caused the

* method to return. Callers should re-check the conditions which caused

* the thread to park in the first place. Callers may also determine,

* for example, the interrupt status of the thread upon return.

*

* @param blocker the synchronization object responsible for this

* thread parking

* @since 1.6

*/

public static void park(Object blocker) {

Thread t = Thread.currentThread();

setBlocker(t, blocker);

UNSAFE.park(false, 0L);

setBlocker(t, null);

}

/**

* Disables the current thread for thread scheduling purposes, for up to

* the specified waiting time, unless the permit is available.

*

* <p>If the permit is available then it is consumed and the call

* returns immediately; otherwise the current thread becomes disabled

* for thread scheduling purposes and lies dormant until one of four

* things happens:

*

* <ul>

* <li>Some other thread invokes {@link #unpark unpark} with the

* current thread as the target; or

*

* <li>Some other thread {@linkplain Thread#interrupt interrupts}

* the current thread; or

*

* <li>The specified waiting time elapses; or

*

* <li>The call spuriously (that is, for no reason) returns.

* </ul>

*

* <p>This method does <em>not</em> report which of these caused the

* method to return. Callers should re-check the conditions which caused

* the thread to park in the first place. Callers may also determine,

* for example, the interrupt status of the thread, or the elapsed time

* upon return.

*

* @param blocker the synchronization object responsible for this

* thread parking

* @param nanos the maximum number of nanoseconds to wait

* @since 1.6

*/

public static void parkNanos(Object blocker, long nanos) {

if (nanos > 0) {

Thread t = Thread.currentThread();

setBlocker(t, blocker);

UNSAFE.park(false, nanos);

setBlocker(t, null);

}

}

/**

* Disables the current thread for thread scheduling purposes, until

* the specified deadline, unless the permit is available.

*

* <p>If the permit is available then it is consumed and the call

* returns immediately; otherwise the current thread becomes disabled

* for thread scheduling purposes and lies dormant until one of four

* things happens:

*

* <ul>

* <li>Some other thread invokes {@link #unpark unpark} with the

* current thread as the target; or

*

* <li>Some other thread {@linkplain Thread#interrupt interrupts} the

* current thread; or

*

* <li>The specified deadline passes; or

*

* <li>The call spuriously (that is, for no reason) returns.

* </ul>

*

* <p>This method does <em>not</em> report which of these caused the

* method to return. Callers should re-check the conditions which caused

* the thread to park in the first place. Callers may also determine,

* for example, the interrupt status of the thread, or the current time

* upon return.

*

* @param blocker the synchronization object responsible for this

* thread parking

* @param deadline the absolute time, in milliseconds from the Epoch,

* to wait until

* @since 1.6

*/

public static void parkUntil(Object blocker, long deadline) {

Thread t = Thread.currentThread();

setBlocker(t, blocker);

UNSAFE.park(true, deadline);

setBlocker(t, null);

}

/**

* Returns the blocker object supplied to the most recent

* invocation of a park method that has not yet unblocked, or null

* if not blocked. The value returned is just a momentary

* snapshot -- the thread may have since unblocked or blocked on a

* different blocker object.

*

* @param t the thread

* @return the blocker

* @throws NullPointerException if argument is null

* @since 1.6

*/

public static Object getBlocker(Thread t) {

if (t == null)

throw new NullPointerException();

return UNSAFE.getObjectVolatile(t, parkBlockerOffset);

}

/**

* Disables the current thread for thread scheduling purposes unless the

* permit is available.

*

* <p>If the permit is available then it is consumed and the call

* returns immediately; otherwise the current thread becomes disabled

* for thread scheduling purposes and lies dormant until one of three

* things happens:

*

* <ul>

*

* <li>Some other thread invokes {@link #unpark unpark} with the

* current thread as the target; or

*

* <li>Some other thread {@linkplain Thread#interrupt interrupts}

* the current thread; or

*

* <li>The call spuriously (that is, for no reason) returns.

* </ul>

*

* <p>This method does <em>not</em> report which of these caused the

* method to return. Callers should re-check the conditions which caused

* the thread to park in the first place. Callers may also determine,

* for example, the interrupt status of the thread upon return.

*/

public static void park() {

UNSAFE.park(false, 0L);

}

/**

* Disables the current thread for thread scheduling purposes, for up to

* the specified waiting time, unless the permit is available.

*

* <p>If the permit is available then it is consumed and the call

* returns immediately; otherwise the current thread becomes disabled

* for thread scheduling purposes and lies dormant until one of four

* things happens:

*

* <ul>

* <li>Some other thread invokes {@link #unpark unpark} with the

* current thread as the target; or

*

* <li>Some other thread {@linkplain Thread#interrupt interrupts}

* the current thread; or

*

* <li>The specified waiting time elapses; or

*

* <li>The call spuriously (that is, for no reason) returns.

* </ul>

*

* <p>This method does <em>not</em> report which of these caused the

* method to return. Callers should re-check the conditions which caused

* the thread to park in the first place. Callers may also determine,

* for example, the interrupt status of the thread, or the elapsed time

* upon return.

*

* @param nanos the maximum number of nanoseconds to wait

*/

public static void parkNanos(long nanos) {

if (nanos > 0)

UNSAFE.park(false, nanos);

}

/**

* Disables the current thread for thread scheduling purposes, until

* the specified deadline, unless the permit is available.

*

* <p>If the permit is available then it is consumed and the call

* returns immediately; otherwise the current thread becomes disabled

* for thread scheduling purposes and lies dormant until one of four

* things happens:

*

* <ul>

* <li>Some other thread invokes {@link #unpark unpark} with the

* current thread as the target; or

*

* <li>Some other thread {@linkplain Thread#interrupt interrupts}

* the current thread; or

*

* <li>The specified deadline passes; or

*

* <li>The call spuriously (that is, for no reason) returns.

* </ul>

*

* <p>This method does <em>not</em> report which of these caused the

* method to return. Callers should re-check the conditions which caused

* the thread to park in the first place. Callers may also determine,

* for example, the interrupt status of the thread, or the current time

* upon return.

*

* @param deadline the absolute time, in milliseconds from the Epoch,

* to wait until

*/

public static void parkUntil(long deadline) {

UNSAFE.park(true, deadline);

}

/**

* Returns the pseudo-randomly initialized or updated secondary seed.

* Copied from ThreadLocalRandom due to package access restrictions.

*/

static final int nextSecondarySeed() {

int r;

Thread t = Thread.currentThread();

if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {

r ^= r << 13; // xorshift

r ^= r >>> 17;

r ^= r << 5;

}

else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)

r = 1; // avoid zero

UNSAFE.putInt(t, SECONDARY, r);

return r;

}

// Hotspot implementation via intrinsics API

private static final sun.misc.Unsafe UNSAFE;

private static final long parkBlockerOffset;

private static final long SEED;

private static final long PROBE;

private static final long SECONDARY;

static {

try {

UNSAFE = sun.misc.Unsafe.getUnsafe();

Class<?> tk = Thread.class;

parkBlockerOffset = UNSAFE.objectFieldOffset

(tk.getDeclaredField("parkBlocker"));

SEED = UNSAFE.objectFieldOffset

(tk.getDeclaredField("threadLocalRandomSeed"));

PROBE = UNSAFE.objectFieldOffset

(tk.getDeclaredField("threadLocalRandomProbe"));

SECONDARY = UNSAFE.objectFieldOffset

(tk.getDeclaredField("threadLocalRandomSecondarySeed"));

} catch (Exception ex) { throw new Error(ex); }

}

}

View Code

2. 使用

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) throws InterruptedException {

Thread thread = new Thread(() -> {

System.out.println("111222");

LockSupport.park();

System.out.println("222333");

try {

Thread.sleep(3*1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("333444");

});

thread.start();

Thread.sleep(1*1000);

System.out.println("thread.getState(): " + thread.getState() + "\t1");

LockSupport.unpark(thread);

Thread.sleep(1*1000);

System.out.println("thread.getState(): " + thread.getState() + "\t2");

Thread.sleep(3*1000);

System.out.println("thread.getState(): " + thread.getState() + "\t3");

}

}

结果:

111222

thread.getState(): WAITING 1

222333

thread.getState(): TIMED_WAITING 2

333444

thread.getState(): TERMINATED 3

2. park 方法

  park用于挂起当前线程。 当前线程进入等待或者超时等待状态。其恢复的条件是调用unpark、其它线程中断了线程、带参数的park 时间到达指定时间。

1. park 可以设置一个blocker 参数, 也可以不设置。设置之后可以获取到当前线程阻塞的信息。

设置的时候会通过unsafe(parkBlockerOffset 偏移量获取到thread 对象的parkBlocker 的偏移量), 然后设置到java.lang.Thread#parkBlocker。

java.lang.Thread#parkBlocker:

    /**

* The argument supplied to the current call to

* java.util.concurrent.locks.LockSupport.park.

* Set by (private) java.util.concurrent.locks.LockSupport.setBlocker

* Accessed using java.util.concurrent.locks.LockSupport.getBlocker

*/

volatile Object parkBlocker;

(1) 不设置:

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) {

LockSupport.park();

}

}

jstack查看线程信息:

"main" #1 prio=5 os_prio=0 tid=0x00000224f146d000 nid=0x3524 waiting on condition [0x00000081e89fe000]

java.lang.Thread.State: WAITING (parking)

at sun.misc.Unsafe.park(Native Method)

at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)

at PlainTest.main(PlainTest.java:6)

(2) 设置blocker

代码:

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) {

LockSupport.park(new Object());

}

}

结果:

"main" #1 prio=5 os_prio=0 tid=0x000002402c6ad800 nid=0x4a14 waiting on condition [0x000000d0ed6ff000]

java.lang.Thread.State: WAITING (parking)

at sun.misc.Unsafe.park(Native Method)

- parking to wait for <0x000000076caeab80> (a java.lang.Object)

at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

at PlainTest.main(PlainTest.java:6)

(3) 设置超时时间

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) {

LockSupport.parkNanos(Long.MAX_VALUE);

}

}

结果:

"main" #1 prio=5 os_prio=0 tid=0x000001e39c82c800 nid=0x3e70 waiting on condition [0x000000215d5ff000]

java.lang.Thread.State: TIMED_WAITING (parking)

at sun.misc.Unsafe.park(Native Method)

at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)

at PlainTest.main(PlainTest.java:6)

3. unpark 需要设置一个线程进行解除阻塞

  用于解除线程的阻塞。注意也可以先unpark, 在park。 只不过先unpark、后park, 调用park的时候相当于线程不会进行阻塞。多次unpark 和 一次unpark 的效果一样, 只能对一次park 生效。

  下面研究其原理。

3. 原理

1. park 原理

park 调用的最终是: sun.misc.Unsafe#park, 是一个native 方法

    public native void park(boolean var1, long var2);

  其参数有两个, 第一个是是否是相对时间(isAbsolute), 第二个参数是时间。

  对于第一个参数,LockSupport 使用的时候只有Ijava.util.concurrent.locks.LockSupport#parkUntil(java.lang.Object, long)传递的是true(代表是相对时间), 其他是false。

接下来查看其调用到C++相关方法。

1. \openjdk\hotspot\src\share\vm\prims\unsafe.cpp 内部的方法:

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))

UnsafeWrapper("Unsafe_Park");

EventThreadPark event;

#ifndef USDT2

HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);

#else /* USDT2 */

HOTSPOT_THREAD_PARK_BEGIN(

(uintptr_t) thread->parker(), (int) isAbsolute, time);

#endif /* USDT2 */

JavaThreadParkedState jtps(thread, time != 0);

thread->parker()->park(isAbsolute != 0, time);

#ifndef USDT2

HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());

#else /* USDT2 */

HOTSPOT_THREAD_PARK_END(

(uintptr_t) thread->parker());

#endif /* USDT2 */

if (event.should_commit()) {

oop obj = thread->current_park_blocker();

event.set_klass((obj != NULL) ? obj->klass() : NULL);

event.set_timeout(time);

event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);

event.commit();

}

UNSAFE_END

 核心是在thread->parker()->park(isAbsolute != 0, time); 这一行代码,调用线程内部的parker() 获取到parker 之后继续调用 park 方法。(每个线程对象都有一个parker对象)

parker 对象:openjdk\hotspot\src\share\vm\runtime\park.hpp

class Parker : public os::PlatformParker {

private:

volatile int _counter ;

Parker * FreeNext ;

JavaThread * AssociatedWith ; // Current association

public:

Parker() : PlatformParker() {

_counter = 0 ;

FreeNext = NULL ;

AssociatedWith = NULL ;

}

protected:

~Parker() { ShouldNotReachHere(); }

public:

// For simplicity of interface with Java, all forms of park (indefinite,

// relative, and absolute) are multiplexed into one call.

void park(bool isAbsolute, jlong time);

void unpark();

// Lifecycle operators

static Parker * Allocate (JavaThread * t) ;

static void Release (Parker * e) ;

private:

static Parker * volatile FreeList ;

static volatile int ListLock ;

};

  _counter 属性是起重要作用的属性。

  其父类有互斥变量等属性:\openjdk\hotspot\src\os\linux\vm\os_linux.hpp

class PlatformParker : public CHeapObj<mtInternal> {

protected:

enum {

REL_INDEX = 0,

ABS_INDEX = 1

};

int _cur_index; // which cond is in use: -1, 0, 1

pthread_mutex_t _mutex [1] ;

pthread_cond_t _cond [2] ; // one for relative times and one for abs.

public: // TODO-FIXME: make dtor private

~PlatformParker() { guarantee (0, "invariant") ; }

public:

PlatformParker() {

int status;

status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());

assert_status(status == 0, status, "cond_init rel");

status = pthread_cond_init (&_cond[ABS_INDEX], NULL);

assert_status(status == 0, status, "cond_init abs");

status = pthread_mutex_init (_mutex, NULL);

assert_status(status == 0, status, "mutex_init");

_cur_index = -1; // mark as unused

}

};

#endif // OS_LINUX_VM_OS_LINUX_HPP

2. park 方法根据操作系统不同交给对应的实现:

比如: openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp

void Parker::park(bool isAbsolute, jlong time) {

// Ideally we'd do something useful while spinning, such

// as calling unpackTime().

// Optional fast-path check:

// Return immediately if a permit is available.

// We depend on Atomic::xchg() having full barrier semantics

// since we are doing a lock-free update to _counter.

// 如果_counter 属性大于0, 代表有许可,直接返回

if (Atomic::xchg(0, &_counter) > 0) return;

// Optional fast-exit: Check interrupt before trying to wait

Thread* thread = Thread::current();

assert(thread->is_Java_thread(), "Must be JavaThread");

JavaThread *jt = (JavaThread *)thread;

if (Thread::is_interrupted(thread, false)) {

return;

}

// First, demultiplex/decode time arguments

timespec absTime;

// 如果time 参数小于0, 或者是绝对时间且时间等于0, 直接返回

if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all

return;

}

if (time > 0) {

// Warning: this code might be exposed to the old Solaris time

// round-down bugs. Grep "roundingFix" for details.

// 将时间换算后保存起来

unpackTime(&absTime, isAbsolute, time);

}

// Enter safepoint region

// Beware of deadlocks such as 6317397.

// The per-thread Parker:: _mutex is a classic leaf-lock.

// In particular a thread must never block on the Threads_lock while

// holding the Parker:: mutex. If safepoints are pending both the

// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.

ThreadBlockInVM tbivm(jt);

// Don't wait if cannot get lock since interference arises from

// unblocking. Also. check interrupt before trying wait

// 如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回

if (Thread::is_interrupted(thread, false) ||

os::Solaris::mutex_trylock(_mutex) != 0) {

return;

}

int status ;

// 走到这里代表有_counter 大于0, 则将其重置为0。

if (_counter > 0) { // no wait needed

_counter = 0;

// 对互斥变量解锁

status = os::Solaris::mutex_unlock(_mutex);

assert (status == 0, "invariant") ;

// Paranoia to ensure our locked and lock-free paths interact

// correctly with each other and Java-level accesses.

OrderAccess::fence();

return;

}

#ifdef ASSERT

// Don't catch signals while blocked; let the running threads have the signals.

// (This allows a debugger to break into the running thread.)

sigset_t oldsigs;

sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals();

thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

#endif

OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);

jt->set_suspend_equivalent();

// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

// Do this the hard way by blocking ...

// See http://monaco.sfbay/detail.jsf?cr=5094058.

// TODO-FIXME: for Solaris SPARC set fprs.FEF=0 prior to parking.

// Only for SPARC >= V8PlusA

#if defined(__sparc) && defined(COMPILER2)

if (ClearFPUAtPark) { _mark_fpu_nosave() ; }

#endif

if (time == 0) {

status = os::Solaris::cond_wait (_cond, _mutex) ;

} else {

status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime);

}

// Note that an untimed cond_wait() can sometimes return ETIME on older

// versions of the Solaris.

assert_status(status == 0 || status == EINTR ||

status == ETIME || status == ETIMEDOUT,

status, "cond_timedwait");

#ifdef ASSERT

thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL);

#endif

_counter = 0 ;

status = os::Solaris::mutex_unlock(_mutex);

assert_status(status == 0, status, "mutex_unlock") ;

// Paranoia to ensure our locked and lock-free paths interact

// correctly with each other and Java-level accesses.

OrderAccess::fence();

// If externally suspended while waiting, re-suspend

if (jt->handle_special_suspend_equivalent_condition()) {

jt->java_suspend_self();

}

}

(1) ThreadBlockInVM tbivm(jt); 是修改线程状态为阻塞。 相当于创建一个ThreadBlockInVM  对象, 变量名为tbivm, 参数为jt

\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp

class ThreadBlockInVM : public ThreadStateTransition {

public:

ThreadBlockInVM(JavaThread *thread)

: ThreadStateTransition(thread) {

// Once we are blocked vm expects stack to be walkable

thread->frame_anchor()->make_walkable(thread);

trans_and_fence(_thread_in_vm, _thread_blocked);

}

~ThreadBlockInVM() {

trans_and_fence(_thread_blocked, _thread_in_vm);

// We don't need to clear_walkable because it will happen automagically when we return to java

}

};

 然后调用到: D:\study\sourcecode\openjdk\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp 中的 transition_and_fence

  // transition_and_fence must be used on any thread state transition

// where there might not be a Java call stub on the stack, in

// particular on Windows where the Structured Exception Handler is

// set up in the call stub. os::write_memory_serialize_page() can

// fault and we can't recover from it on Windows without a SEH in

// place.

static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {

assert(thread->thread_state() == from, "coming from wrong thread state");

assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states");

// Change to transition state (assumes total store ordering! -Urs)

thread->set_thread_state((JavaThreadState)(from + 1));

// Make sure new state is seen by VM thread

if (os::is_MP()) {

if (UseMembar) {

// Force a fence between the write above and read below

OrderAccess::fence();

} else {

// Must use this rather than serialization page in particular on Windows

InterfaceSupport::serialize_memory(thread);

}

}

if (SafepointSynchronize::do_call_back()) {

SafepointSynchronize::block(thread);

}

thread->set_thread_state(to);

CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();)

}

定义的线程状态: openjdk\hotspot\src\share\vm\utilities\globalDefinitions.hpp

enum JavaThreadState {

_thread_uninitialized = 0, // should never happen (missing initialization)

_thread_new = 2, // just starting up, i.e., in process of being initialized

_thread_new_trans = 3, // corresponding transition state (not used, included for completness)

_thread_in_native = 4, // running in native code

_thread_in_native_trans = 5, // corresponding transition state

_thread_in_vm = 6, // running in VM

_thread_in_vm_trans = 7, // corresponding transition state

_thread_in_Java = 8, // running in Java or in stub code

_thread_in_Java_trans = 9, // corresponding transition state (not used, included for completness)

_thread_blocked = 10, // blocked in vm

_thread_blocked_trans = 11, // corresponding transition state

_thread_max_state = 12 // maximum thread state+1 - used for statistics allocation

};

2. unpark 原理

\openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp 中 unpark 方法如下:

void Parker::unpark() {

int s, status ;

// 获取互斥锁

status = os::Solaris::mutex_lock (_mutex) ;

assert (status == 0, "invariant") ;

// s记录原来的_counter 的值

s = _counter;

// _counter 设置为0

_counter = 1;

// 释放互斥锁

status = os::Solaris::mutex_unlock (_mutex) ;

assert (status == 0, "invariant") ;

// 如果原来的_counter为0, 证明有线程调用park 在等待信号。 则调用下面方法通知线程解除阻塞。 则原来park 等待的线程会继续后面的代码

if (s < 1) {

status = os::Solaris::cond_signal (_cond) ;

assert (status == 0, "invariant") ;

}

}

  unpark本身就是将_counter 设置为1,并通知条件阻塞的线程已经可以结束等待了. 如果多次连续调用unpark 方法,则s < 1 不成立, 也就不会走下面的方法。

cond_signal 调用 \openjdk\hotspot\src\os\solaris\vm\os_solaris.hpp 中 下面代码:

  static int cond_signal(cond_t *cv)            { return _cond_signal(cv); }

总结: 每个线程都有一个parker 对象,内部包含_counter 可以视作许可证。 每次park 的时候相当于等待该许可证(等待该变量改为1), 调用unpark 相当于将许可证变量改为1。

以上是 java.util.concurrent.locks.LockSupport用法 的全部内容, 来源链接: utcz.com/z/392776.html

回到顶部