package com.example.demo;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Nullable;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <b><code>Thread</code></b>
* <p>
* Description
* <p>
* <b>Create Time:</b> 2021/9/16 16:39.
*
* @author qd.yt
* @version 0.0.1
* @since cola-demo v0.0.1
*/
public class Thread1 {
// 线程数
public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();;
public static void main(String[] args) throws InterruptedException {
double startT = System.currentTimeMillis();
// 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
//ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(“hyn-demo-pool-%d”).build();
ThreadFactory namedThreadFactory = new SelfDefinedThreadFactory(“demo-thread”);
// 创建线程池,其中任务队列需要结合实际情况设置合理的容量
ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
THREAD_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// 新建 1000 个任务,每个任务是打印当前线程名称
for (int i = 0; i < 1000; i++) {
executor.execute(() -> System.out.println(Thread.currentThread().getName()));
}
// 优雅关闭线程池
executor.shutdown();
executor.awaitTermination(1000L, TimeUnit.SECONDS);
// 任务执行完毕后打印”Done”
System.out.println(“Done”);
double startE = System.currentTimeMillis();
System.out.println(“”+(startE-startT));
}
static class SelfDefinedThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
SelfDefinedThreadFactory(String namePrefix) {
this.namePrefix = namePrefix+”-“;
}
@Override
public Thread newThread(@Nullable Runnable r) {
Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
也可以使用谷歌的工具类增加依赖:
com.google.guava
guava
30.1.1-jre
执行改进版,可以增加详细日志
package com.example.demo; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.annotation.Nullable; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * <b><code>Thread</code></b> * <p> * Description * <p> * <b>Create Time:</b> 2021/9/16 16:39. * * @author qd.yt * @version 0.0.1 * @since cola-demo v0.0.1 */ public class Thread1 { // 线程数 public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();; public static void main(String[] args) throws InterruptedException { double startT = System.currentTimeMillis(); // 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory //ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("hyn-demo-pool-%d").build(); ThreadFactory namedThreadFactory = new SelfDefinedThreadFactory("demo-thread"); // 创建线程池,其中任务队列需要结合实际情况设置合理的容量 ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, //队列改小后,会出现rejected,不接受执行任务的日志 new LinkedBlockingQueue<>(2), namedThreadFactory, new MyIgnorePolicy()); executor.prestartAllCoreThreads(); // 预启动所有核心线程 // 新建 1000 个任务,每个任务是打印当前线程名称 for (int i = 0; i < 1000; i++) { MyTask task = new MyTask(String.valueOf(i)); executor.execute(task); } // 优雅关闭线程池 executor.shutdown(); executor.awaitTermination(1000L, TimeUnit.SECONDS); // 任务执行完毕后打印"Done" System.out.println("Done"); double startE = System.currentTimeMillis(); System.out.println(""+(startE-startT)); } public static class MyIgnorePolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { doLog(r, e); } private void doLog(Runnable r, ThreadPoolExecutor e) { // 可做日志记录等 System.err.println( r.toString() + " rejected"); //System.out.println("completedTaskCount: " + e.getCompletedTaskCount()); } } static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.toString() + " is running!"); Thread.sleep(3000); //让任务执行慢点 } catch (InterruptedException e) { e.printStackTrace(); } } public String getName() { return name; } @Override public String toString() { return "MyTask [name=" + name + "]"; } } /** * 自定义线程名 */ static class SelfDefinedThreadFactory implements ThreadFactory { final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; SelfDefinedThreadFactory(String namePrefix) { this.namePrefix = namePrefix+"-"; } @Override public Thread newThread(@Nullable Runnable r) { Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement()); if (t.isDaemon()) { t.setDaemon(true); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } System.out.println(t.getName() + " has been created"); return t; } } }
扩展阅读:
两种情况会拒绝处理任务:
1):当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务;
2):当线程池调用shutdown()后,会等待队列里的任务执行完毕,再关闭。如果在调用shutdown()和线程池真正关闭之间提交任务,会拒绝新任务;
线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是 AbortPolicy,会抛出异常
线程池提供的拒绝策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务,抛运行时异常
ThreadPoolExecutor.CallerRunsPolicy:执行任务
ThreadPoolExecutor.DiscardPolicy:忽视,什么都不会发生
ThreadPoolExecutor.DiscardOldestPolicy:从队列中踢出最先进入队列(最后一个执行)的任务
实现RejectedExecutionHandler接口,可自定义处理器