`
g21121
  • 浏览: 685909 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Worker

 
阅读更多

        Worker是线程池中实际工作的线程,Worker是 ThreadPoolExecutor的私有内部类,它实现了 Runnable接口,所以可以作为一个线程来使用。

private final class Worker implements Runnable {
        //锁
	private final ReentrantLock runLock = new ReentrantLock();
        //任务
	private Runnable firstTask;
        //完成任务数
	volatile long completedTasks;
        //Worker运行的线程
	Thread thread;
        //构造方法,firstTask为要执行的任务
	Worker(Runnable firstTask) {
		this.firstTask = firstTask;
	}

	boolean isActive() {
		return runLock.isLocked();
	}
        //如果没有任务就中断线程
	void interruptIfIdle() {
		final ReentrantLock runLock = this.runLock;
		if (runLock.tryLock()) {
			try {
				if (thread != Thread.currentThread())
					thread.interrupt();
			} finally {
				runLock.unlock();
			}
		}
	}
        //立即中断线程,即使正在运行任务
	void interruptNow() {
		thread.interrupt();
	}
        //执行任务,在beforeExecute方法之后,afterExecute方法之前执行
	private void runTask(Runnable task) {
		final ReentrantLock runLock = this.runLock;
		runLock.lock();
		try {
			if (runState < STOP && Thread.interrupted() && runState >= STOP)
				thread.interrupt();
			boolean ran = false;
			beforeExecute(thread, task);
			try {
				task.run();
				ran = true;
				afterExecute(task, null);
				++completedTasks;
			} catch (RuntimeException ex) {
				if (!ran)
					afterExecute(task, ex);
				throw ex;
			}
		} finally {
			runLock.unlock();
		}
	}

	/**
	 * 任务执行主循环方法
	 */
	public void run() {
		try {
			Runnable task = firstTask;
			firstTask = null;
			while (task != null || (task = getTask()) != null) {
				runTask(task);
				task = null;
			}
		} finally {
			workerDone(this);
		}
	}
}

 

        1.成员变量

        Worker有四个成员变量:

/**
 * 可重入的互斥锁
 */
private final ReentrantLock runLock = new ReentrantLock();

/**
 * 初始任务在进入run循环之前运行,可能为空的
 */
private Runnable firstTask;

/**
 * 每个线程完成任务的计数器
 */
volatile long completedTasks;

/**
 * Worker运行时的线程
 */
Thread thread;

 

        2.run方法

        其中最为核心的部分就是 Worker的run方法:

	public void run() {
		try {
			Runnable task = firstTask;
			firstTask = null;
			while (task != null || (task = getTask()) != null) {
				runTask(task);
				task = null;
			}
		} finally {
			workerDone(this);
		}
	}

        从run方法的实现可以看出,它首先从成员变量中获取 firstTask(new时传进来的),然后利用while循环不断执行 runTask()方法,在 runTask()执行完之后,循环依然不断通过 getTask()去取新的任务来执行,其中 getTask方法是从任务缓存队列中获取任务。

 

        3.runTask方法

        run方法中获取到新任务后实际上调用的是 runTask方法来执行任务,以下就是 runTask方法的源代码:

private void runTask(Runnable task) {
	
	final ReentrantLock runLock = this.runLock;
	//获取锁
	runLock.lock();
	try {
		if (runState < STOP && Thread.interrupted() && runState >= STOP)
			thread.interrupt();
		boolean ran = false;
		//前置操作
		beforeExecute(thread, task);
		try {
			task.run();
			ran = true;
			//后置操作
			afterExecute(task, null);
			++completedTasks;
		} catch (RuntimeException ex) {
			if (!ran)
				afterExecute(task, ex);
			throw ex;
		}
	} finally {
		//释放锁
		runLock.unlock();
	}
}

        1)首先获得锁。

        2)获得锁后判断线程池的状态如果是 RUNNING或 SHUTDOWN则终止该线程;然后调用Thread.interrupted() 方法,Thread.interrupted() 内部实现为:

public static boolean interrupted() {
	return currentThread().isInterrupted(true);
}

        interrupted方法的作用是清除该线程的中断标志位。当线程处于中断状态时,其标志位为true。

        举一个例子来说明将更加清晰:

public class ThreadB implements Runnable {

	public void run() {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().isInterrupted());
	}

	public static void main(String[] args) {
		Thread t = new Thread(new ThreadB());
		t.start();
		t.interrupt();
	}
}
//结果:
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at ThreadB.run(ThreadB.java:5)
	at java.lang.Thread.run(Thread.java:662)
false

        发现当线程处于中断状态时,再调用sleep,wait等方法会抛出 InterruptedException异常,InterruptedException异常的抛出就是根据这个中断标志位来判断的。我们修改一下代码,利用 interrupted方法人为的清除中断标志位:

public class ThreadB implements Runnable {

	public void run() {
		System.out.println(Thread.currentThread().isInterrupted());
		System.out.println(Thread.currentThread().interrupted());
		System.out.println(Thread.currentThread().interrupted());
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		Thread t = new Thread(new ThreadB());
		t.start();
		t.interrupt();
	}
}
//结果:
true
true
false

         发现已经没有异常抛出了。但是再调用两次 interrupted方法后返回的结果就都是false了,是因为标志位已经被清除,所以再次调用就无效了。

        最后再判断线程池的状态,终止线程。

        3)声明ran变量,用于标志任务是否执行完成。

        4)执行 beforeExecute方法,在任务开始执行之前会首先执行 beforeExecute方法:

 

protected void beforeExecute(Thread t, Runnable r) { }
        beforeExecute是 ThreadPoolExecutor的方法,从上面源代码中发现 ThreadPoolExecutor类并没有实现任何代码,因为 ThreadPoolExecutor时可以扩展的,所以 beforeExecute与 afterExecute一样可在子类中定制,如:
@Override
protected void beforeExecute(Thread t, Runnable r) {
	super.beforeExecute(t, r);
	// 其他操作,如日志等
}
        beforeExecute与 afterExecute可以在任务执行的前后添加日志、计时、监控或统计信息收集等功能。

 

        5)task.run();执行任务,执行成功后ran标志会赋值为true,证明任务已经完成,如果任务执行过程中抛出异常则会跳转到 catch代码中,ran的值仍为false。

        6)完成任务执行后调用 afterExecute方法,并增加完成任务数。

        7)释放锁。

 

        4.interruptNow方法

        interruptNow用于终止正在运行的任务,其源代码为:

void interruptNow() {
	thread.interrupt();
}

        interruptNow中调用的是 interrupt方法。

 

        5.interruptIfIdle方法

        interruptIfIdle方法用于中断空闲的线程。

/**
 * 如果没有运行任务则中止线程
 */
void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {
        try {
    if (thread != Thread.currentThread())
	thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

 

        除了 Worker类中的这些方法外 ThreadPoolExecutor有几个与 Worker相关的实用方法。

 

        6.getTask方法

        getTask 是 ThreadPoolExecutor类中的方法,并不是 Worker类中的方法,下面是 getTask方法的实现:

Runnable getTask() {
	for (;;) {
		try {
			int state = runState;
			if (state > SHUTDOWN)
				return null;
			Runnable r;
			if (state == SHUTDOWN) // Help drain queue
				r = workQueue.poll();
			else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
				r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
			else
				r = workQueue.take();
			if (r != null)
				return r;
			if (workerCanExit()) {
				if (runState >= SHUTDOWN) // Wake up others
					interruptIdleWorkers();
				return null;
			}
			// Else retry
		} catch (InterruptedException ie) {
			// On interruption, re-check runState
		}
	}
}

        在getTask中,首先判断当前线程池状态,如果 runState大于SHUTDOWN(STOP或者TERMINATED),则直接返回null。

   如果 runState为 SHUTDOWN或者 RUNNING,则从任务缓存队列取任务。

   如果当前线程池的线程数大于核心池大小 corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用 poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

        然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出。

        也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许 worker退出。如果允许 worker退出,则调用 interruptIdleWorkers()中断处于空闲状态的 worker。实际上 interruptIdleWorkers方法调用的是 Worker的 interruptIfIdle()方法。

 

        7.workerCanExit方法

        workerCanExit方法判断Worker是否可以退出。

/**
 * 判断Worker是否可以退出
 */
private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

        其中判断语句是关键。

        首先判断运行状态为 STOP或 TERMINATED;

        然后判断worker队列workQueue如果为空;

        之后判断 allowCoreThreadTimeOut属性是否为true;

        最后判断poolSize(线程数)大于1或核心线程数。满足其中任何一个条件都说明该 Worker可以退出。

 

        8.interruptIdleWorkers

        interruptIdleWorkers方法利用循环调用每个 Worker的 interruptIfIdle方法。

/**
 * 唤醒所有可能正在等待任务的线程,这样他们就可以检查终止。注:这种方法也被称为 scheduledthreadpoolexecutor。
 */
void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics