国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術文章
文章詳情頁

java中的forkjoin框架的使用

瀏覽:3日期:2022-09-04 10:14:40

fork join框架是java 7中引入框架,這個框架的引入主要是為了提升并行計算的能力。

fork join主要有兩個步驟,第一就是fork,將一個大任務分成很多個小任務,第二就是join,將第一個任務的結果join起來,生成最后的結果。如果第一步中并沒有任何返回值,join將會等到所有的小任務都結束。

還記得之前的文章我們講到了thread pool的基本結構嗎?

ExecutorService - ForkJoinPool 用來調用任務執(zhí)行。 workerThread - ForkJoinWorkerThread 工作線程,用來執(zhí)行具體的任務。 task - ForkJoinTask 用來定義要執(zhí)行的任務。

下面我們從這三個方面來詳細講解fork join框架。

ForkJoinPool

ForkJoinPool是一個ExecutorService的一個實現(xiàn),它提供了對工作線程和線程池的一些便利管理方法。

public class ForkJoinPool extends AbstractExecutorService

一個work thread一次只能處理一個任務,但是ForkJoinPool并不會為每個任務都創(chuàng)建一個單獨的線程,它會使用一個特殊的數(shù)據(jù)結構double-ended queue來存儲任務。這樣的結構可以方便的進行工作竊取(work-stealing)。

什么是work-stealing呢?

默認情況下,work thread從分配給自己的那個隊列頭中取出任務。如果這個隊列是空的,那么這個work thread會從其他的任務隊列尾部取出任務來執(zhí)行,或者從全局隊列中取出。這樣的設計可以充分利用work thread的性能,提升并發(fā)能力。

下面看下怎么創(chuàng)建一個ForkJoinPool。

最常見的方法就是使用ForkJoinPool.commonPool()來創(chuàng)建,commonPool()為所有的ForkJoinTask提供了一個公共默認的線程池。

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一種方式是使用構造函數(shù):

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

這里的參數(shù)是并行級別,2指的是線程池將會使用2個處理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作線程。

public class ForkJoinWorkerThread extends Thread}

和一般的線程不一樣的是它定義了兩個變量:

final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一個是該worker thread所屬的ForkJoinPool。 另外一個是支持 work-stealing機制的Queue。

再看一下它的run方法:

public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }

簡單點講就是從Queue中取出任務執(zhí)行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中運行的任務類型。通常我們會用到它的兩個子類:RecursiveAction和RecursiveTask<V>。

他們都定義了一個需要實現(xiàn)的compute()方法用來實現(xiàn)具體的業(yè)務邏輯。不同的是RecursiveAction只是用來執(zhí)行任務,而RecursiveTask<V>可以有返回值。

既然兩個類都帶了Recursive,那么具體的實現(xiàn)邏輯也會跟遞歸有關,我們舉個使用RecursiveAction來打印字符串的例子:

public class CustomRecursiveAction extends RecursiveAction { private String workload = ''; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List<CustomRecursiveAction> createSubtasks() { List<CustomRecursiveAction> subtasks = new ArrayList<>(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info('This result - (' + result + ') - was processed by ' + Thread.currentThread().getName()); }}

上面的例子使用了二分法來打印字符串。

我們再看一個RecursiveTask<V>的例子:

public class CustomRecursiveTask extends RecursiveTask<Integer> { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection<CustomRecursiveTask> createSubtasks() { List<CustomRecursiveTask> dividedTasks = new ArrayList<>(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a < 27) .map(a -> a * 10) .sum(); }}

和上面的例子很像,不過這里我們需要有返回值。

在ForkJoinPool中提交Task

有了上面的兩個任務,我們就可以在ForkJoinPool中提交了:

int[] intArray= {12,12,13,14,15}; CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray); int result = forkJoinPool.invoke(customRecursiveTask); System.out.println(result);

上面的例子中,我們使用invoke來提交,invoke將會等待任務的執(zhí)行結果。

如果不使用invoke,我們也可以將其替換成fork()和join():

customRecursiveTask.fork(); int result2= customRecursiveTask.join(); System.out.println(result2);

fork() 是將任務提交給pool,但是并不觸發(fā)執(zhí)行, join()將會真正的執(zhí)行并且得到返回結果。

本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/forkjoin

到此這篇關于java中的fork join框架的使用的文章就介紹到這了,更多相關java fork join框架內容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持好吧啦網(wǎng)!

標簽: Java
相關文章:
主站蜘蛛池模板: 99久久99久久久99精品齐 | 中文国产成人精品久久久 | 国产欧美日韩一区 | 欧美成人免费全部观看天天性色 | 新婚第一次一级毛片 | 精品国产高清久久久久久小说 | 在线亚洲欧美日韩 | 国内久久精品 | 91视频最新网站 | 欧美a欧美1级 | 久久精品亚瑟全部免费观看 | www.亚洲视频| 亚洲人成在线精品 | 日韩一区二区在线免费观看 | 成年女人免费又黄又爽视频 | 久久狠狠躁免费观看2020 | 日韩精品一二三区 | 欧美特黄高清免费观看的 | 国产一区在线观看免费 | 天天看片天天爽 | 久久伊人成人网 | 深夜爽爽爽福利动态图 | 亚洲第一欧美 | 女人张腿让男桶免费视频网站 | 欧美成人久久 | 青青草国产免费久久久91 | 成人伊人 | www.日本高清.com | 大狠狠大臿蕉香蕉大视频 | 又黄又www | 欧美一级大片免费看 | 人成精品视频三区二区一区 | 97精品国产福利一区二区三区 | 亚洲 欧美 丝袜 | 日朝欧美亚洲精品 | 亚洲午夜精品一级在线播放放 | 亚洲天堂免费在线视频 | 亚洲国产午夜精品理论片的软件 | 欧美一级大片免费观看 | 三级网站在线 | 免费观看成年人视频 |