亚洲av成人无遮挡网站在线观看,少妇性bbb搡bbb爽爽爽,亚洲av日韩精品久久久久久,兔费看少妇性l交大片免费,无码少妇一区二区三区

  免費注冊 查看新帖 |

Chinaunix

  平臺 論壇 博客 文庫
最近訪問板塊 發(fā)新帖
查看: 3002 | 回復(fù): 0
打印 上一主題 下一主題

JDK 7 中的 Fork/Join 模式 [復(fù)制鏈接]

論壇徽章:
0
跳轉(zhuǎn)到指定樓層
1 [收藏(0)] [報告]
發(fā)表于 2010-01-18 16:13 |只看該作者 |倒序瀏覽
隨著多核時代的來臨,軟件開發(fā)人員不得不開始關(guān)注并行編程領(lǐng)域。而 JDK 7 中將會加入的 Fork/Join
模式是處理并行編程的一個經(jīng)典的方法。雖然不能解決所有的問題,但是在它的適用范圍之內(nèi),能夠輕松的利用多個 CPU
提供的計算資源來協(xié)作完成一個復(fù)雜的計算任務(wù)。通過利用 Fork/Join 模式,我們能夠更加順暢的過渡到多核的時代。本文將介紹使用 JDK 7
中 Fork/Join 模式的方法和其他相關(guān)改進。閱讀本文之后,讀者將能夠獨立地在軟件開發(fā)中使用 Fork/Join 模式來改進程序的性能。

介紹



著多核芯片逐漸成為主流,大多數(shù)軟件開發(fā)人員不可避免地需要了解并行編程的知識。而同時,主流程序語言正在將越來越多的并行特性合并到標(biāo)準(zhǔn)庫或者語言本身
之中。我們可以看到,JDK 在這方面同樣走在潮流的前方。在 JDK 標(biāo)準(zhǔn)版 5 中,由 Doug Lea
提供的并行框架成為了標(biāo)準(zhǔn)庫的一部分(JSR-166)。隨后,在 JDK 6 中,一些新的并行特性,例如并行 collection
框架,合并到了標(biāo)準(zhǔn)庫中(JSR-166x)。直到今天,盡管 Java SE 7 還沒有正式發(fā)布,一些并行相關(guān)的新特性已經(jīng)出現(xiàn)在
JSR-166y 中:


  • Fork/Join 模式;
  • TransferQueue,它繼承自 BlockingQueue 并能在隊列滿時阻塞“生產(chǎn)者”;
  • ArrayTasks/ListTasks,用于并行執(zhí)行某些數(shù)組/列表相關(guān)任務(wù)的類;
  • IntTasks/LongTasks/DoubleTasks,用于并行處理數(shù)字類型數(shù)組的工具類,提供了排序、查找、求和、求最小值、求最大值等功能;



    中,對 Fork/Join 模式的支持可能是對開發(fā)并行軟件來說最通用的新特性。在 JSR-166y 中,Doug Lea 實現(xiàn)
    ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 時就大量的用到了 Fork/Join
    模式。讀者還需要注意一點,因為 JDK 7 還沒有正式發(fā)布,因此本文涉及到的功能和發(fā)布版本有可能不一樣。


    Fork/Join 模式有自己的適用范圍。如果一個應(yīng)用能被分解成多個子任務(wù),并且組合多個子任務(wù)的結(jié)果就能夠獲得最終的答案,那么這個應(yīng)用就適合用 Fork/Join 模式來解決。
    [color="#996699"]圖 1
    給出了一個 Fork/Join 模式的示意圖,位于圖上部的 Task 依賴于位于其下的 Task 的執(zhí)行,只有當(dāng)所有的子任務(wù)都完成之后,調(diào)用者才能獲得 Task 0 的返回結(jié)果。


    圖 1. Fork/Join 模式示意圖





    以說,F(xiàn)ork/Join 模式能夠解決很多種類的并行問題。通過使用 Doug Lea 提供的 Fork/Join
    框架,軟件開發(fā)人員只需要關(guān)注任務(wù)的劃分和中間結(jié)果的組合就能充分利用并行平臺的優(yōu)良性能。其他和并行相關(guān)的諸多難于處理的問題,例如負載平衡、同步等,
    都可以由框架采用統(tǒng)一的方式解決。這樣,我們就能夠輕松地獲得并行的好處而避免了并行編程的困難且容易出錯的缺點。




    使用 Fork/Join 模式


    在開始嘗試 Fork/Join 模式之前,我們需要從 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下載 JSR-166y 的源代碼,并且我們還需要安裝最新版本的 JDK 6(下載網(wǎng)址請參閱
    [color="#996699"]參考資源
    )。Fork/Join 模式的使用方式非常直觀。首先,我們需要編寫一個 ForkJoinTask 來完成子任務(wù)的分割、中間結(jié)果的合并等工作。隨后,我們將這個 ForkJoinTask 交給 ForkJoinPool 來完成應(yīng)用的執(zhí)行。



    常我們并不直接繼承 ForkJoinTask,它包含了太多的抽象方法。針對特定的問題,我們可以選擇 ForkJoinTask
    的不同子類來完成任務(wù)。RecursiveAction 是 ForkJoinTask 的一個子類,它代表了一類最簡單的
    ForkJoinTask:不需要返回值,當(dāng)子任務(wù)都執(zhí)行完畢之后,不需要進行中間結(jié)果的組合。如果我們從 RecursiveAction
    開始繼承,那么我們只需要重載 protected void compute() 方法。下面,我們來看看怎么為快速排序算法建立一個 ForkJoinTask 的子類:


    清單 1. ForkJoinTask 的子類


         
             
                
             
         
         
                 class SortTask extends RecursiveAction {
                 final long[] array;
                 final int lo;
                 final int hi;
                 private int THRESHOLD = 30;
                 public SortTask(long[] array) {
                 this.array = array;
                 this.lo = 0;
                 this.hi = array.length - 1;
                 }
                 public SortTask(long[] array, int lo, int hi) {
                 this.array = array;
                 this.lo = lo;
                 this.hi = hi;
                 }
                 protected void compute() {
                 if (hi - lo  
                     




    [color="#996699"]清單 1
    中,SortTask 首先通過 partition() 方法將數(shù)組分成兩個部分。隨后,兩個子任務(wù)將被生成并分別排序數(shù)組的兩個部分。當(dāng)子任務(wù)足夠小時,再將其分割為更小的任務(wù)反而引起性能的降低。因此,這里我們使用一個 THRESHOLD,限定在子任務(wù)規(guī)模較小時,使用直接排序,而不是再將其分割成為更小的任務(wù)。其中,我們用到了 RecursiveAction 提供的方法 coInvoke()。它表示:啟動所有的任務(wù),并在所有任務(wù)都正常結(jié)束后返回。如果其中一個任務(wù)出現(xiàn)異常,則其它所有的任務(wù)都取消。coInvoke() 的參數(shù)還可以是任務(wù)的數(shù)組。


    現(xiàn)在剩下的工作就是將 SortTask 提交到 ForkJoinPool 了。ForkJoinPool() 默認(rèn)建立具有與 CPU 可使用線程數(shù)相等線程個數(shù)的線程池。我們在一個 JUnit 的 test 方法中將 SortTask 提交給一個新建的 ForkJoinPool:


    清單 2. 新建的 ForkJoinPool


         
             
                
             
         
         
                 @Test
                 public void testSort() throws Exception {
                 ForkJoinTask sort = new SortTask(array);
                 ForkJoinPool fjpool = new ForkJoinPool();
                 fjpool.submit(sort);
                 fjpool.shutdown();
                 fjpool.awaitTermination(30, TimeUnit.SECONDS);
                 assertTrue(checkSorted(array));
                 }
                  
                     



    在上面的代碼中,我們用到了 ForkJoinPool 提供的如下函數(shù):


  • submit():將 ForkJoinTask 類的對象提交給 ForkJoinPool,F(xiàn)orkJoinPool 將立刻開始執(zhí)行 ForkJoinTask。
  • shutdown():執(zhí)行此方法之后,F(xiàn)orkJoinPool 不再接受新的任務(wù),但是已經(jīng)提交的任務(wù)可以繼續(xù)執(zhí)行。如果希望立刻停止所有的任務(wù),可以嘗試 shutdownNow() 方法。
  • awaitTermination():阻塞當(dāng)前線程直到 ForkJoinPool 中所有的任務(wù)都執(zhí)行結(jié)束。


    并行快速排序的完整代碼如下所示:


    清單 3. 并行快速排序的完整代碼


         
             
                
             
         
         
                 package tests;
                 import static org.junit.Assert.*;
                 import java.util.Arrays;
                 import java.util.Random;
                 import java.util.concurrent.TimeUnit;
                 import jsr166y.forkjoin.ForkJoinPool;
                 import jsr166y.forkjoin.ForkJoinTask;
                 import jsr166y.forkjoin.RecursiveAction;
                 import org.junit.Before;
                 import org.junit.Test;
                 class SortTask extends RecursiveAction {
                 final long[] array;
                 final int lo;
                 final int hi;
                 private int THRESHOLD = 0; //For demo only
                 public SortTask(long[] array) {
                 this.array = array;
                 this.lo = 0;
                 this.hi = array.length - 1;
                 }
                 public SortTask(long[] array, int lo, int hi) {
                 this.array = array;
                 this.lo = lo;
                 this.hi = hi;
                 }
                 protected void compute() {
                 if (hi - lo  (a[i + 1])) {
                 return false;
                 }
                 }
                 return true;
                 }
                 }
                  
                     



    運行以上代碼,我們可以得到以下結(jié)果:


         
             
                
             
         
         
                 Initial Array: [46, -12, 74, -67, 76, -13, -91, -96]
                 pivot = 0, low = 0, high = 7
                 array[-96, -12, 74, -67, 76, -13, -91, 46]
                 pivot = 5, low = 1, high = 7
                 array[-96, -12, -67, -13, -91, 46, 76, 74]
                 pivot = 1, low = 1, high = 4
                 array[-96, -91, -67, -13, -12, 46, 74, 76]
                 pivot = 4, low = 2, high = 4
                 array[-96, -91, -67, -13, -12, 46, 74, 76]
                 pivot = 3, low = 2, high = 3
                 array[-96, -91, -67, -13, -12, 46, 74, 76]
                 pivot = 2, low = 2, high = 2
                 array[-96, -91, -67, -13, -12, 46, 74, 76]
                 pivot = 6, low = 6, high = 7
                 array[-96, -91, -67, -13, -12, 46, 74, 76]
                 pivot = 7, low = 7, high = 7
                 array[-96, -91, -67, -13, -12, 46, 74, 76]
                  
                     



         
             
                
             
         
       

       

         
             
                
             
         
       


                
                     
                         
                            
                            
                         
                     
                   

       
                     

    Fork/Join 模式高級特性


    使用 RecursiveTask


    除了 RecursiveAction,F(xiàn)ork/Join 框架還提供了其他 ForkJoinTask 子類:帶有返回值的 RecursiveTask,使用 finish() 方法顯式中止的 AsyncAction 和 LinkedAsyncAction,以及可使用 TaskBarrier 為每個任務(wù)設(shè)置不同中止條件的 CyclicAction。


    從 RecursiveTask 繼承的子類同樣需要重載 protected void compute() 方法。與 RecursiveAction 稍有不同的是,它可使用泛型指定一個返回值的類型。下面,我們來看看如何使用 RecursiveTask 的子類。


    清單 4. RecursiveTask 的子類


         
             
                
             
         
         
                 class Fibonacci extends RecursiveTask {
                 final int n;
                 Fibonacci(int n) {
                 this.n = n;
                 }
                 private int compute(int small) {
                 final int[] results = { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89 };
                 return results[small];
                 }
                 public Integer compute() {
                 if (n  
                     




    [color="#996699"]清單 4
    中, Fibonacci 的返回值為 Integer 類型。其 compute() 函數(shù)首先建立兩個子任務(wù),啟動子任務(wù)執(zhí)行,阻塞以等待子任務(wù)的結(jié)果返回,相加后得到最終結(jié)果。同樣,當(dāng)子任務(wù)足夠小時,通過查表得到其結(jié)果,以減小因過多地分割任務(wù)引起的性能降低。其中,我們用到了 RecursiveTask 提供的方法 fork() 和 join()。它們分別表示:子任務(wù)的異步執(zhí)行和阻塞等待結(jié)果完成。


    現(xiàn)在剩下的工作就是將 Fibonacci 提交到 ForkJoinPool 了,我們在一個 JUnit 的 test 方法中作了如下處理:


    清單 5. 將 Fibonacci 提交到 ForkJoinPool


         
             
                
             
         
         
                 @Test
                 public void testFibonacci() throws InterruptedException, ExecutionException {
                 ForkJoinTask fjt = new Fibonacci(45);
                 ForkJoinPool fjpool = new ForkJoinPool();
                 Future result = fjpool.submit(fjt);
                 // do something
                 System.out.println(result.get());
                 }
                  
                     



    使用 CyclicAction 來處理循環(huán)任務(wù)


    CyclicAction
    的用法稍微復(fù)雜一些。如果一個復(fù)雜任務(wù)需要幾個線程協(xié)作完成,并且線程之間需要在某個點等待所有其他線程到達,那么我們就能方便的用
    CyclicAction 和 TaskBarrier 來完成。圖 2 描述了使用 CyclicAction 和 TaskBarrier
    的一個典型場景。


    圖 2. 使用 CyclicAction 和 TaskBarrier 執(zhí)行多線程任務(wù)




    繼承自 CyclicAction 的子類需要 TaskBarrier 為每個任務(wù)設(shè)置不同的中止條件。從 CyclicAction 繼承的子類需要重載 protected void compute() 方法,定義在 barrier 的每個步驟需要執(zhí)行的動作。compute() 方法將被反復(fù)執(zhí)行直到 barrier 的 isTerminated() 方法返回 True。TaskBarrier 的行為類似于 CyclicBarrier。下面,我們來看看如何使用 CyclicAction 的子類。


    清單 6. 使用 CyclicAction 的子類


         
             
                
             
         
         
                 class ConcurrentPrint extends RecursiveAction {
                 protected void compute() {
                 TaskBarrier b = new TaskBarrier() {
                 protected boolean terminate(int cycle, int registeredParties) {
                 System.out.println("Cycle is " + cycle + ";"
                 + registeredParties + " parties");
                 return cycle >= 10;
                 }
                 };
                 int n = 3;
                 CyclicAction[] actions = new CyclicAction[n];
                 for (int i = 0; i  
                     




    [color="#996699"]清單 6
    中,CyclicAction[] 數(shù)組建立了三個任務(wù),打印各自的工作次數(shù)和序號。而在 b.terminate() 方法中,我們設(shè)置的中止條件表示重復(fù) 10 次計算后中止。現(xiàn)在剩下的工作就是將 ConcurrentPrint 提交到 ForkJoinPool 了。我們可以在 ForkJoinPool 的構(gòu)造函數(shù)中指定需要的線程數(shù)目,例如 ForkJoinPool(4) 就表明線程池包含 4 個線程。我們在一個 JUnit 的 test 方法中運行 ConcurrentPrint 的這個循環(huán)任務(wù):


    清單 7. 運行 ConcurrentPrint 循環(huán)任務(wù)


         
             
                
             
         
         
                 @Test
                 public void testBarrier () throws InterruptedException, ExecutionException {
                 ForkJoinTask fjt = new ConcurrentPrint();
                 ForkJoinPool fjpool = new ForkJoinPool(4);
                 fjpool.submit(fjt);
                 fjpool.shutdown();
                 }
                  
                     



    RecursiveTask 和 CyclicAction 兩個例子的完整代碼如下所示:


    清單 8. RecursiveTask 和 CyclicAction 兩個例子的完整代碼


         
             
                
             
         
         
                 package tests;
                 import java.util.concurrent.ExecutionException;
                 import java.util.concurrent.Future;
                 import jsr166y.forkjoin.CyclicAction;
                 import jsr166y.forkjoin.ForkJoinPool;
                 import jsr166y.forkjoin.ForkJoinTask;
                 import jsr166y.forkjoin.RecursiveAction;
                 import jsr166y.forkjoin.RecursiveTask;
                 import jsr166y.forkjoin.TaskBarrier;
                 import org.junit.Test;
                 class Fibonacci extends RecursiveTask {
                 final int n;
                 Fibonacci(int n) {
                 this.n = n;
                 }
                 private int compute(int small) {
                 final int[] results = { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89 };
                 return results[small];
                 }
                 public Integer compute() {
                 if (n = 10;
                 }
                 };
                 int n = 3;
                 CyclicAction[] actions = new CyclicAction[n];
                 for (int i = 0; i  fjt = new Fibonacci(num);
                 ForkJoinPool fjpool = new ForkJoinPool();
                 Future result = fjpool.submit(fjt);
                 // do something
                 System.out.println("Fibonacci(" + num + ") = " + result.get());
                 }
                 }
                  
                     



    運行以上代碼,我們可以得到以下結(jié)果:


         
             
                
             
         
         
                 testing Task Barrier ...
                 I'm working 0 2
                 I'm working 0 0
                 I'm working 0 1
                 Cycle is 0; 3 parties
                 I'm working 1 2
                 I'm working 1 0
                 I'm working 1 1
                 Cycle is 1; 3 parties
                 I'm working 2 0
                 I'm working 2 1
                 I'm working 2 2
                 Cycle is 2; 3 parties
                 I'm working 3 0
                 I'm working 3 2
                 I'm working 3 1
                 Cycle is 3; 3 parties
                 I'm working 4 2
                 I'm working 4 0
                 I'm working 4 1
                 Cycle is 4; 3 parties
                 I'm working 5 1
                 I'm working 5 0
                 I'm working 5 2
                 Cycle is 5; 3 parties
                 I'm working 6 0
                 I'm working 6 2
                 I'm working 6 1
                 Cycle is 6; 3 parties
                 I'm working 7 2
                 I'm working 7 0
                 I'm working 7 1
                 Cycle is 7; 3 parties
                 I'm working 8 1
                 I'm working 8 0
                 I'm working 8 2
                 Cycle is 8; 3 parties
                 I'm working 9 0
                 I'm working 9 2
                 testing Fibonacci ...
                 fork new thread for 13
                 fork new thread for 12
                 fork new thread for 11
                 fork new thread for 10
                 fork new thread for 12
                 fork new thread for 11
                 fork new thread for 10
                 fork new thread for 9
                 fork new thread for 10
                 fork new thread for 9
                 fork new thread for 11
                 fork new thread for 10
                 fork new thread for 10
                 fork new thread for 9
                 Fibonacci(14) = 610
                  
                     



         
             
                
             
         
       

       



    結(jié)論



    以上的例子中可以看到,通過使用 Fork/Join
    模式,軟件開發(fā)人員能夠方便地利用多核平臺的計算能力。盡管還沒有做到對軟件開發(fā)人員完全透明,F(xiàn)ork/Join
    模式已經(jīng)極大地簡化了編寫并發(fā)程序的瑣碎工作。對于符合 Fork/Join
    模式的應(yīng)用,軟件開發(fā)人員不再需要處理各種并行相關(guān)事務(wù),例如同步、通信等,以難以調(diào)試而聞名的死鎖和 data race
    等錯誤也就不會出現(xiàn),提升了思考問題的層次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer
    策略,僅僅關(guān)注如何劃分任務(wù)和組合中間結(jié)果,將剩下的事情丟給 Fork/Join 框架。


    在實際工作中利用 Fork/Join 模式,可以充分享受多核平臺為應(yīng)用帶來的免費午餐。





    本文來自ChinaUnix博客,如果查看原文請點:http://blog.chinaunix.net/u3/110548/showart_2152219.html
  • 您需要登錄后才可以回帖 登錄 | 注冊

    本版積分規(guī)則 發(fā)表回復(fù)

      

    北京盛拓優(yōu)訊信息技術(shù)有限公司. 版權(quán)所有 京ICP備16024965號-6 北京市公安局海淀分局網(wǎng)監(jiān)中心備案編號:11010802020122 niuxiaotong@pcpop.com 17352615567
    未成年舉報專區(qū)
    中國互聯(lián)網(wǎng)協(xié)會會員  聯(lián)系我們:huangweiwei@itpub.net
    感謝所有關(guān)心和支持過ChinaUnix的朋友們 轉(zhuǎn)載本站內(nèi)容請注明原作者名及出處

    清除 Cookies - ChinaUnix - Archiver - WAP - TOP