多线程任务批处理通用工具类
多线程任务批处理通用工具类
需求
使用线程池批量发送短信,当短信发送完毕之后,方法继续向下走。
技术点
线程池:ExecutorService
CountDownLatch:可以让一个或者多个线程等待一批任务执行完毕之后,继续向下走
简易代码实现
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
*
* description: 简单的任务批处理
*
*/
public class SimpleBatchTask {
public static void main(String[] args) {
batchTaskTest();
}
/**
* 使用线程池批量发送短信,发送完毕后,方法继续向下走
*/
public static void batchTaskTest() {
long startTime = System.currentTimeMillis();
//待发送的短信列表
List<String> taskList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
taskList.add("短信-" + i);
}
//使用线程池批量处理任务
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建CountDownLatch,构造器参数为任务数量
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (String task : taskList) {
executorService.execute(() -> {
try {
//交个线程池处理任务
disposeTask(task);
} finally {
//处理完成后调用 countDownLatch.countDown()
countDownLatch.countDown();
}
});
}
try {
//阻塞当前线程池
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务处理完毕,耗时(ms):" + (System.currentTimeMillis() - startTime));
executorService.shutdown();
}
public static void disposeTask(String task) {
System.out.println(String.format("【%s】发送成功", task));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
通用场景如何实现
这个场景属于通用型的场景,很多业务都会用到,将通用的代码提取出来,可以丢到一个工具类中来实现这个功能。
通用代码实现
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
*
* description: 任务批处理通用工具类
*
*/
public class TaskDisposeUtils {
/**
* 使用线程池批处理文件,当所有任务处理完毕后才会返回
*
* @param taskList 任务列表
* @param consumer 处理任务的方法
* @param executor 线程池
* @param <T>
* @throws InterruptedException
*/
public static <T> void dispose(List<T> taskList, Consumer<? super T> consumer, Executor executor) throws InterruptedException {
if (taskList == null || taskList.size() == 0) {
return;
}
Objects.nonNull(consumer);
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (T item : taskList) {
executor.execute(() -> {
try {
consumer.accept(item);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
//任务列表
List<String> taskList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
taskList.add("短信-" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(10);
//调用工具类批处理任务
TaskDisposeUtils.dispose(taskList, TaskDisposeUtils::disposeTask, executorService);
System.out.println("任务处理完毕,耗时(ms):" + (System.currentTimeMillis() - startTime));
executorService.shutdown();
}
public static void disposeTask(String task) {
System.out.println(String.format("【%s】发送成功", task));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
重点在于下面2行代码,简化了很多
ExecutorService executorService = Executors.newFixedThreadPool(10);
//调用工具类批处理任务
TaskDisposeUtils.dispose(taskList, TaskDisposeUtils::disposeTask, executorService);