1. 功能說明
2. 多線程任務示例
2.1 線程池
2.2 單個任務
2.3 任務入口
2.4 結果分析
2.5 源碼地址
3. 寫在最后
大家好,今天教大家擼一個 Java 的多線程永動任務,這個示例的原型是公司自研的多線程異步任務項目 ,我把里面涉及到多線程的代碼抽離出來,然后進行一定的改造。
里面涉及的知識點非常多,特別適合有一定工作經驗 的同學學習,或者可以直接拿到項目中使用。
文章結構非常簡單:

1. 功能說明
做這個多線程異步任務,主要是因為我們有很多永動的異步任務,什么是永動呢?就是任務跑起來后,需要一直跑下去。
比如消息 Push 任務,因為一直有消息過來,所以需要一直去消費 DB 中的未推送消息,就需要整一個 Push 的永動異步任務。
我們的需求其實不難,簡單總結一下:
能同時執行多個永動的異步任務 ;
每個異步任務,支持開多個線程 去消費這個任務的數據;
支持永動異步任務的優雅關閉 ,即關閉后,需要把所有的數據消費完畢后,再關閉。
完成上面的需求,需要注意幾個點:
每個永動任務 ,可以開一個線程去執行;
每個子任務 ,因為需要支持并發,需要用線程池控制;
永動任務的關閉,需要通知子任務的并發線程,并支持永動任務和并發子任務的優雅關閉 。
2. 多線程任務示例
2.1 線程池
對于子任務,需要支持并發,如果每個并發都開一個線程,用完就關閉,對資源消耗太大,所以引入線程池:
publicclassTaskProcessUtil{ //每個任務,都有自己單獨的線程池 privatestaticMapexecutors=newConcurrentHashMap<>(); //初始化一個線程池 privatestaticExecutorServiceinit(StringpoolName,intpoolSize){ returnnewThreadPoolExecutor(poolSize,poolSize, 0L,TimeUnit.MILLISECONDS, newLinkedBlockingQueue (), newThreadFactoryBuilder().setNameFormat("Pool-"+poolName).setDaemon(false).build(), newThreadPoolExecutor.CallerRunsPolicy()); } //獲取線程池 publicstaticExecutorServicegetOrInitExecutors(StringpoolName,intpoolSize){ ExecutorServiceexecutorService=executors.get(poolName); if(null==executorService){ synchronized(TaskProcessUtil.class){ executorService=executors.get(poolName); if(null==executorService){ executorService=init(poolName,poolSize); executors.put(poolName,executorService); } } } returnexecutorService; } //回收線程資源 publicstaticvoidreleaseExecutors(StringpoolName){ ExecutorServiceexecutorService=executors.remove(poolName); if(executorService!=null){ executorService.shutdown(); } } }
這是一個線程池的工具類,這里初始化線程池和回收線程資源很簡單,我們主要討論獲取線程池。
獲取線程池可能會存在并發情況,所以需要加一個 synchronized 鎖,然后鎖住后,需要對 executorService 進行二次判空校驗。
2.2 單個任務
為了更好講解單個任務的實現方式,我們的任務主要就是把 Cat 的數據打印出來,Cat 定義如下:
@Data
@Service
publicclassCat{
privateStringcatName;
publicCatsetCatName(Stringname){
this.catName=name;
returnthis;
}
}
單個任務主要包括以下功能:
獲取永動任務數據 :這里一般都是掃描 DB,我直接就簡單用 queryData() 代替。
多線程執行任務 :需要把數據拆分成 4 份,然后分別由多線程并發執行,這里可以通過線程池支持;
永動任務優雅停機 :當外面通知任務需要停機,需要執行完剩余任務數據,并回收線程資源,退出任務;
永動執行 :如果未收到停機命令,任務需要一直執行下去。
直接看代碼:
publicclassChildTask{
privatefinalintPOOL_SIZE=3;//線程池大小
privatefinalintSPLIT_SIZE=4;//數據拆分大小
privateStringtaskName;
//接收jvm關閉信號,實現優雅停機
protectedvolatilebooleanterminal=false;
publicChildTask(StringtaskName){
this.taskName=taskName;
}
//程序執行入口
publicvoiddoExecute(){
inti=0;
while(true){
System.out.println(taskName+":Cycle-"+i+"-Begin");
//獲取數據
Listdatas=queryData();
//處理數據
taskExecute(datas);
System.out.println(taskName+":Cycle-"+i+"-End");
if(terminal){
//只有應用關閉,才會走到這里,用于實現優雅的下線
break;
}
i++;
}
//回收線程池資源
TaskProcessUtil.releaseExecutors(taskName);
}
//優雅停機
publicvoidterminal(){
//關機
terminal=true;
System.out.println(taskName+"shutdown");
}
//處理數據
privatevoiddoProcessData(Listdatas,CountDownLatchlatch){
try{
for(Catcat:datas){
System.out.println(taskName+":"+cat.toString()+",ThreadName:"+Thread.currentThread().getName());
Thread.sleep(1000L);
}
}catch(Exceptione){
System.out.println(e.getStackTrace());
}finally{
if(latch!=null){
latch.countDown();
}
}
}
//處理單個任務數據
privatevoidtaskExecute(ListsourceDatas){
if(CollectionUtils.isEmpty(sourceDatas)){
return;
}
//將數據拆成4份
List>splitDatas=Lists.partition(sourceDatas,SPLIT_SIZE);
finalCountDownLatchlatch=newCountDownLatch(splitDatas.size());
//并發處理拆分的數據,共用一個線程池
for(finalListdatas:splitDatas){
ExecutorServiceexecutorService=TaskProcessUtil.getOrInitExecutors(taskName,POOL_SIZE);
executorService.submit(newRunnable(){
@Override
publicvoidrun(){
doProcessData(datas,latch);
}
});
}
try{
latch.await();
}catch(Exceptione){
System.out.println(e.getStackTrace());
}
}
//獲取永動任務數據
privateListqueryData(){
Listdatas=newArrayList<>();
for(inti=0;i5;?i?++)?{
????????????datas.add(new?Cat().setCatName("羅小黑"?+?i));
????????}
????????return?datas;
????}
}
簡單解釋一下:
queryData :用于獲取數據,實際應用中其實是需要把 queryData 定為抽象方法,然后由各個任務實現自己的方法。
doProcessData :數據處理邏輯,實際應用中其實是需要把 doProcessData 定為抽象方法,然后由各個任務實現自己的方法。
taskExecute :將數據拆分成 4 份,獲取該任務的線程池,并交給線程池并發執行,然后通過 latch.await() 阻塞。當這 4 份數據都執行成功后,阻塞結束,該方法才返回。
terminal :僅用于接受停機命令,這里該變量定義為 volatile,所以多線程內存可見;
doExecute :程序執行入口,封裝了每個任務執行的流程,當 terminal=true 時,先執行完任務數據,然后回收線程池,最后退出。
2.3 任務入口
直接上代碼:
publicclassLoopTask{
privateListchildTasks;
publicvoidinitLoopTask(){
childTasks=newArrayList();
childTasks.add(newChildTask("childTask1"));
childTasks.add(newChildTask("childTask2"));
for(finalChildTaskchildTask:childTasks){
newThread(newRunnable(){
@Override
publicvoidrun(){
childTask.doExecute();
}
}).start();
}
}
publicvoidshutdownLoopTask(){
if(!CollectionUtils.isEmpty(childTasks)){
for(ChildTaskchildTask:childTasks){
childTask.terminal();
}
}
}
publicstaticvoidmain(Stringargs[])throwsException{
LoopTaskloopTask=newLoopTask();
loopTask.initLoopTask();
Thread.sleep(5000L);
loopTask.shutdownLoopTask();
}
}
每個任務都開一個單獨的 Thread,這里我初始化了 2 個永動任務,分別為 childTask1 和 childTask2,然后分別執行,后面 Sleep 了 5 秒后,再關閉任務,我們可以看看是否可以按照我們的預期優雅退出。
2.4 結果分析
執行結果如下:
childTask1:Cycle-0-Begin childTask2:Cycle-0-Begin childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1 childTask2:Cycle-0-End childTask2:Cycle-1-Begin childTask1:Cycle-0-End childTask1:Cycle-1-Begin childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1 childTask1shutdown childTask2shutdown childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2 childTask1:Cycle-1-End childTask2:Cycle-1-End
輸出數據:
“Pool-childTask” 是線程池名稱;
“childTask” 是任務名稱;
“Cat(catName=羅小黑)” 是執行的結果;
“childTask shut down” 是關閉標記;
“childTask:Cycle-X-Begin” 和“childTask:Cycle-X-End” 是每一輪循環的開始和結束標記。
我們分析一下執行結果:
childTask1 和 childTask2 分別執行,在第一輪循環中都正常輸出了 5 條羅小黑數據;
第二輪執行過程中,我啟動了關閉指令,這次第二輪執行沒有直接停止,而是先執行完任務中的數據,再執行退出,所以完全符合我們的優雅退出結論。
2.5 源碼地址
GitHub 地址:
https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc
3. 寫在最后
對于這個經典的線程池使用示例,原項目是我好友一灰 寫的,技術水平阿里 P7級別,實現得也非常優雅,涉及的知識點非常多 ,非常值得大家學習。
-
JAVA
+關注
關注
20文章
3001瀏覽量
116419 -
編程
+關注
關注
90文章
3716瀏覽量
97178 -
多線程
+關注
關注
0文章
279瀏覽量
21027 -
代碼
+關注
關注
30文章
4967瀏覽量
73956 -
Thread
+關注
關注
2文章
93瀏覽量
27467
原文標題:新來個阿里 P7,僅花 2 小時,擼出一個多線程永動任務,看完直接跪了,真牛逼!
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
什么時候要使用多線程
java多線程編程實例 (源程序)
java多線程設計模式_結城浩
Java多線程總結之Queue
Java多線程永動任務 多線程異步任務項目解讀
評論