併發編程 —— 線程池

概述

在程序中,我們會用各種池化技術來緩存創建昂貴的對象,比如線程池、連接池、內存池。一般是預先創建一些對象放入池中,使用的時候直接取出使用,用完歸還以便復用,還會通過一定的策略調整池中緩存對象的數量,實現池的動態伸縮。

由於線程的創建比較昂貴,隨意、沒有控制地創建大量線程會造成性能問題,因此短平快的任務一般考慮使用線程池來處理,而不是直接創建線程。

那麼,如何正確的創建並正確的使用線程池呢,這篇文章就來細看下。

線程池

雖然在 Java 語言中創建線程看上去就像創建一個對象一樣簡單,只需要 new Thread() 就可以了,但實際上創建線程遠不是創建一個對象那麼簡單。

創建對象,僅僅是在 JVM 的堆里分配一塊內存而已;而創建一個線程,卻需要調用操作系統內核的 API,然後操作系統要為線程分配一系列的資源,這個成本就很高了。所以線程是一個重量級的對象,應該避免頻繁創建和銷毀,一般就是採用線程池來避免頻繁的創建和銷毀線程。

 

線程池原理

Java 通過用戶線程與內核線程結合的 1:1 線程模型來實現,Java 將線程的調度和管理設置在了用戶態。在 HotSpot VM 的線程模型中,Java 線程被一對一映射為內核線程。Java 在使用線程執行程序時,需要創建一個內核線程;當該 Java 線程被終止時,這個內核線程也會被回收。因此 Java 線程的創建與銷毀將會消耗一定的計算機資源,從而增加系統的性能開銷。

除此之外,大量創建線程同樣會給系統帶來性能問題,因為內存和 CPU 資源都將被線程搶佔,如果處理不當,就會發生內存溢出、CPU 使用率超負荷等問題。

為了解決上述兩類問題,Java 提供了線程池概念,對於頻繁創建線程的業務場景,線程池可以創建固定的線程數量,並且在操作系統底層,輕量級進程將會把這些線程映射到內核。

線程池可以提高線程復用,又可以固定最大線程使用量,防止無限制地創建線程。當程序提交一個任務需要一個線程時,會去線程池中查找是否有空閑的線程,若有,則直接使用線程池中的線程工作,若沒有,會去判斷當前已創建的線程數量是否超過最大線程數量,如未超過,則創建新線程,如已超過,則進行排隊等待或者直接拋出異常。

 

線程池是一種生產者 – 消費者模式

線程池的設計,普遍採用的都是生產者 – 消費者模式。線程池的使用方是生產者,線程池本身是消費者。

原理實現大致如下:

 1 package com.lyyzoo.test.concurrent.executor;  2 
 3 import java.util.ArrayList;  4 import java.util.List;  5 import java.util.concurrent.BlockingQueue;  6 import java.util.concurrent.LinkedBlockingQueue;  7 
 8 /**
 9  * @author bojiangzhou 2020/02/12 10  */
11 public class CustomThreadPool { 12 
13     public static void main(String[] args) { 14         // 使用有界阻塞隊列 創建線程池
15         CustomThreadPool pool = new CustomThreadPool(2, new LinkedBlockingQueue<>(10)); 16         pool.execute(() -> { 17             System.out.println("提交了一個任務"); 18  }); 19  } 20 
21     // 利用阻塞隊列實現生產者-消費者模式
22     final BlockingQueue<Runnable> workQueue; 23     // 保存內部工作線程
24     final List<Thread> threads = new ArrayList<>(); 25 
26     public CustomThreadPool(int coreSize, BlockingQueue<Runnable> workQueue) { 27         this.workQueue = workQueue; 28         // 創建工作線程
29         for (int i = 0; i < coreSize; i++) { 30             WorkerThread work = new WorkerThread(); 31  work.start(); 32  threads.add(work); 33  } 34  } 35 
36     // 生產者 提交任務
37     public void execute(Runnable command) { 38         try { 39             // 隊列已滿,put 會一直等待
40  workQueue.put(command); 41         } catch (InterruptedException e) { 42  e.printStackTrace(); 43  } 44  } 45 
46     /**
47  * 工作線程負責消費任務,並執行任務 48      */
49     class WorkerThread extends Thread { 50  @Override 51         public void run() { 52             // 循環取任務並執行,take 取不到任務會一直等待
53             while (true) { 54                 try { 55                     Runnable runnable = workQueue.take(); 56  runnable.run(); 57                 } catch (InterruptedException e) { 58  e.printStackTrace(); 59  } 60  } 61  } 62  } 63 }

ThreadPoolExecutor

線程池參數說明

Java 提供的線程池相關的工具類中,最核心的是 ThreadPoolExecutor,通過名字也能看出來,它強調的是 Executor,而不是一般意義上的池化資源。

ThreadPoolExecutor 的構造函數非常複雜,這個最完備的構造函數有 7 個參數:

 

各個參數的含義如下:

  • corePoolSize:表示線程池保有的最小線程數。
  • maximumPoolSize:表示線程池創建的最大線程數。
  • keepAliveTime & unit:如果一個線程空閑了 keepAliveTime & unit 這麼久,而且線程池的線程數大於 corePoolSize ,那麼這個空閑的線程就要被回收了。
  • workQueue:工作隊列,一般定義有界阻塞隊列。
  • threadFactory:通過這個參數你可以自定義如何創建線程,例如你可以給線程指定一個有意義的名字。
  • handler:通過這個參數可以自定義任務的拒絕策略。如果線程池中所有的線程都在忙碌,並且工作隊列也滿了(前提是工作隊列是有界隊列),那麼此時提交任務,線程池就會拒絕接收。ThreadPoolExecutor 已經提供了以下 4 種拒絕策略。
    •   CallerRunsPolicy:提交任務的線程自己去執行該任務。
    •   AbortPolicy:默認的拒絕策略,會 throws RejectedExecutionException。
    •   DiscardPolicy:直接丟棄任務,沒有任何異常拋出。
    •   DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作隊列的任務丟棄,然後把新任務加入到工作隊列。

 

ThreadPoolExecutor 構造完成后,還可以通過如下方法定製默認行為:

  • executor.allowCoreThreadTimeOut(true):將包括“核心線程”在內的,沒有任務分配的所有線程,在等待 keepAliveTime 時間后回收掉。
  • executor.prestartAllCoreThreads():創建線程池后,立即創建核心數個工作線程;線程池默認是在任務來時才創建工作線程。

 

創建線程池示例:

 1 public void test() throws InterruptedException {  2     ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(  3             // 核心線程數
 4             2,  5             // 最大線程數
 6             16,  7             // 線程空閑時間
 8             60, TimeUnit.SECONDS,  9             // 使用有界阻塞隊列
10             new LinkedBlockingQueue<>(1024), 11             // 定義線程創建方式,可自定線程名稱
12             new ThreadFactoryBuilder().setNameFormat("executor-%d").build(), 13             // 自定義拒絕策略,一般和降級策略配合使用
14             (r, executor) -> { 15                 // 隊列已滿,拒絕執行
16                 throw new RejectedExecutionException("Task " + r.toString() +
17                         " rejected from " + executor.toString()); 18  } 19  ); 20 
21     poolExecutor.submit(() -> { 22         LOGGER.info("submit task"); 23  }); 24 }

 

線程池的線程分配流程

任務提交后的大致流程如下圖所示。提交任務后,如果線程數小於 corePoolSize,則創建新線程執行任務,無論當前線程池的線程是否空閑都會創建新的線程。

當創建的線程數等於 corePoolSize 時,提交的任務會被加入到設置的阻塞隊列中。

當隊列滿了,則會創建非核心線程執行任務,直到線程池中的線程數量等於 maximumPoolSize。

當線程數量已經等於 maximumPoolSize 時, 新提交的任務無法加入到等待隊列,也無法創建非核心線程直接執行,如果沒有為線程池設置拒絕策略,這時線程池就會拋出 RejectedExecutionException 異常,即默認拒絕接受任務。

 

線程池默認的拒絕策略就是丟棄任務,所以我們在設置有界隊列時,需要考慮設置合理的拒絕策略,要考慮到高峰時期任務的數量,避免任務被丟棄而影響業務流程。

 

強烈建議使用有界隊列

創建 ThreadPoolExecutor 時強烈建議使用有界隊列。如果設置為無界隊列,那麼一般最大線程數的設置是不起作用的,而且遇到任務高峰時,如果一直往隊列添加任務,容易出現OOM,拋出如下異常。

Exception in thread "http-nio-45678-ClientPoller" java.lang.OutOfMemoryError: GC overhead limit exceeded

 

使用有界隊列時,需要注意,當任務過多時,線程池會觸發執行拒絕策略,線程池默認的拒絕策略會拋出 RejectedExecutionException,這是個運行時異常,對於運行時異常編譯器並不強制 catch 它,所以開發人員很容易忽略,因此默認拒絕策略要慎重使用。如果線程池處理的任務非常重要,建議自定義自己的拒絕策略;並且在實際工作中,自定義的拒絕策略往往和降級策略配合使用。

 

監控線程池的狀態

建議用一些監控手段來觀察線程池的狀態。線程池這個組件往往會表現得任勞任怨、默默無聞,除非是出現了拒絕策略,否則壓力再大都不會拋出一個異常。如果我們能提前觀察到線程池隊列的積壓,或者線程數量的快速膨脹,往往可以提早發現並解決問題。

 1 public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName, long period, TimeUnit unit) {
 2     Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
 3         LOGGER.info("[>>ExecutorStatus<<] ThreadPool Name: [{}], Pool Status: [shutdown={}, Terminated={}], Pool Thread Size: {}, Active Thread Count: {}, Task Count: {}, Tasks Completed: {}, Tasks in Queue: {}",
 4                 threadPoolName,
 5                 threadPool.isShutdown(), threadPool.isTerminated(), // 線程是否被終止
 6                 threadPool.getPoolSize(), // 線程池線程數量
 7                 threadPool.getActiveCount(), // 工作線程數
 8                 threadPool.getTaskCount(), // 總任務數
 9                 threadPool.getCompletedTaskCount(), // 已完成的任務數
10                 threadPool.getQueue().size()); // 線程池中線程的數量
11     }, 0, period, unit);
12 }

線程池任務提交方式

提交任務可以通過 execute 和 submit 方法提交任務,下面就來看下它們的區別。

submit 方法簽名:

execute 方法簽名:

 

使用 execute 提交任務

使用 execute 提交任務,線程池內拋出異常會導致線程退出,線程池只能重新創建一個線程。如果每個異步任務都以異常結束,那麼線程池可能完全起不到線程重用的作用。

而且主線程無法捕獲(catch)到線程池內拋出的異常。因為沒有手動捕獲異常進行處理,ThreadGroup 幫我們進行了未捕獲異常的默認處理,向標準錯誤輸出打印了出現異常的線程名稱和異常信息。顯然,這種沒有以統一的錯誤日誌格式記錄錯誤信息打印出來的形式,對生產級代碼是不合適的。

 

如下,execute 提交任務,拋出異常后,從線程名稱可以看出,老線程退出,創建了新的線程。

ThreadGroup 處理未捕獲異常:直接輸出到 System.err

 

解決方式:

  • 以 execute 方法提交到線程池的異步任務,最好在任務內部做好異常處理;
  • 設置自定義的異常處理程序作為保底,比如在聲明線程池時自定義線程池的未捕獲異常處理程序。或者設置全局的默認未捕獲異常處理程序。
 1 // 自定義線程池的未捕獲異常處理程序
 2 ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,  3         30, TimeUnit.MINUTES,  4         new LinkedBlockingQueue<>(),  5         new ThreadFactoryBuilder()  6                 .setNameFormat("pool-%d")  7                 .setUncaughtExceptionHandler((Thread t, Throwable e) -> {  8                     log.error("pool happen exception, thread is {}", t, e);  9  }) 10  .build()); 11                 
12 // 設置全局的默認未捕獲異常處理程序
13 static { 14     Thread.setDefaultUncaughtExceptionHandler((thread, throwable)-> { 15         log.error("Thread {} got exception", thread, throwable) 16  }); 17 }  

定義的異常處理程序將未捕獲的異常信息打印到標準日誌中了,老線程同樣會退出。如果要避免這個問題,就需要使用 submit 方法提交任務。

 

使用 submit 提交任務

使用 submit,線程不會退出,但是異常不會記錄,會被生吞掉。查看 FutureTask 源碼可以發現,在執行任務出現異常之後,異常存到了一個 outcome 字段中,只有在調用 get 方法獲取 FutureTask 結果的時候,才會以 ExecutionException 的形式重新拋出異常。所以我們可以通過捕獲 get 方法拋出的異常來判斷線程的任務是否拋出了異常。

 

submit 提交任務,可以通過 Future 獲取返回結果,如果拋出異常,可以捕獲 ExecutionException 得到異常棧信息。通過線程名稱可以看出,老線程也沒有退出。

需要注意的是,使用 submit 時,setUncaughtExceptionHandler 設置的異常處理器不會生效。

 

submit 與 execute 的區別

execute提交的是Runnable類型的任務,而submit提交的是Callable或者Runnable類型的任務;

execute的提交沒有返回值,而submit的提交會返回一個Future類型的對象;

execute提交的時候,如果有異常,就會直接拋出異常,而submit在遇到異常的時候,通常不會立馬拋出異常,而是會將異常暫時存儲起來,等待你調用Future.get()方法的時候,才會拋出異常;

execute 提交的任務拋出異常,老線程會退出,線程池會立即創建一個新的線程。submit 提交的任務拋出異常,老線程不會退出;

線程池設置的 UncaughtExceptionHandler 對 execute 提交的任務生效,對 submit 提交的任務不生效。

線程數設置多少合適

創建多少線程合適,要看多線程具體的應用場景。我們的程序一般都是 CPU 計算和 I/O 操作交叉執行的,由於 I/O 設備的速度相對於 CPU 來說都很慢,所以大部分情況下,I/O 操作執行的時間相對於 CPU 計算來說都非常長,這種場景我們一般都稱為 I/O 密集型計算;和 I/O 密集型計算相對的就是 CPU 密集型計算了,CPU 密集型計算大部分場景下都是純 CPU 計算。I/O 密集型程序和 CPU 密集型程序,計算最佳線程數的方法是不同的。

 

CPU 密集型計算

多線程本質上是提升多核 CPU 的利用率,所以對於一個 4 核的 CPU,每個核一個線程,理論上創建 4 個線程就可以了,再多創建線程也只是增加線程切換的成本。所以,對於 CPU 密集型的計算場景,理論上“線程的數量 = CPU 核數”就是最合適的。不過在工程上,線程的數量一般會設置為“CPU 核數 +1”,這樣的話,當線程因為偶爾的內存頁失效或其他原因導致阻塞時,這個額外的線程可以頂上,從而保證 CPU 的利用率。

 

I/O 密集型的計算場景

如果 CPU 計算和 I/O 操作的耗時是 1:1,那麼 2 個線程是最合適的。如果 CPU 計算和 I/O 操作的耗時是 1:2,那設置 3 個線程是合適的,如下圖所示:CPU 在 A、B、C 三個線程之間切換,對於線程 A,當 CPU 從 B、C 切換回來時,線程 A 正好執行完 I/O 操作。這樣 CPU 和 I/O 設備的利用率都達到了 100%。

會發現,對於 I/O 密集型計算場景,最佳的線程數是與程序中 CPU 計算和 I/O 操作的耗時比相關的,可以總結出這樣一個公式:最佳線程數 =1 +(I/O 耗時 / CPU 耗時)

對於多核 CPU,需要等比擴大,計算公式如下:最佳線程數 =CPU 核數 * [ 1 +(I/O 耗時 / CPU 耗時)]

 

線程池線程數設置 

可通過如下方式獲取CPU核數:

1 /**
2  * 獲取返回CPU核數 3  * 4  * @return 返回CPU核數,默認為8 5  */
6 public static int getCpuProcessors() { 7     return Runtime.getRuntime() != null && Runtime.getRuntime().availableProcessors() > 0 ?
8             Runtime.getRuntime().availableProcessors() : 8; 9 }

 

在一些非核心業務中,我們可以將核心線程數設置小一些,最大線程數量設置為CPU核心數量,阻塞隊列大小根據具體場景設置;不要過大,防止大量任務進入等待隊列而超時,應儘快創建非核心線程執行任務;也不要過小,避免隊列滿了任務被拒絕丟棄。

 1 public ThreadPoolExecutor executor() {  2     int coreSize = getCpuProcessors();  3     ThreadPoolExecutor executor = new ThreadPoolExecutor(  4             2, coreSize,  5             10, TimeUnit.MINUTES,  6             new LinkedBlockingQueue<>(512),  7             new ThreadFactoryBuilder().setNameFormat("executor-%d").build(), 10             new ThreadPoolExecutor.AbortPolicy() 11  );14 
15     return executor; 16 }

 

在一些核心業務中,核心線程數設置為CPU核心數,最大線程數可根據公式 最佳線程數 =CPU 核數 * [ 1 +(I/O 耗時 / CPU 耗時)] 來計算。阻塞隊列可以根據具體業務場景設置,如果線程處理業務非常迅速,我們可以考慮將阻塞隊列設置大一些,處理的請求吞吐量會大些;如果線程處理業務非常耗時,阻塞隊列設置小些,防止請求在阻塞隊列中等待過長時間而導致請求已超時。

public ThreadPoolExecutor executor() { int coreSize = getCpuProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor( coreSize, coreSize * 8, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("executor-%d").build(), new ThreadPoolExecutor.AbortPolicy() );return executor; }

 

注意:一般不要將 corePoolSize 設置為 0,例如下面的線程池,使用了無界隊列,雖 maximumPoolSize > 0,但實際上只會有一個工作線程,因為其它任務都加入等待隊列了。

1 ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 30, TimeUnit.SECONDS, 3         new LinkedBlockingQueue<>(), 4         new ThreadFactoryBuilder().setNameFormat("test-%d").build() 5 );

 

線程池如何優先啟用非核心線程

如果想讓線程池激進一點,優先開啟更多的線程,而把隊列當成一個後備方案,可以自定義隊列,重寫 offer 方法,因為線程池是通過 offer 方法將任務放入隊列。

 

通過重寫隊列的 offer 方法,直接返回 false,造成這個隊列已滿的假象,線程池在工作隊列滿了無法入隊的情況下會擴容線程池。直到線程數達到最大線程數,就會觸發拒絕策略,此時再通過自定義的拒絕策略將任務通過隊列的 put 方法放入隊列中。這樣就可以優先開啟更多線程,而不是進入隊列了。

 1 public static void main(String[] args) {  2     // ThreadPoolExecutor 通過 offer 將元素放入隊列,重載隊列的 offer 方法,直接返回 false,造成隊列已滿的假象  3     // 隊列滿時,會創建新的線程直到達到 maximumPoolSize,之後會觸發執行拒絕策略
 4     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {  5         private static final long serialVersionUID = 8303142475890427046L;  6 
 7  @Override  8         public boolean offer(Runnable e) {  9             return false; 10  } 11  }; 12 
13     // 當線程達到 maximumPoolSize 時會觸發拒絕策略,此時將任務 put 到隊列中
14     RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { 15  @Override 16         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 17             try { 18                 // 任務拒絕時,通過 put 放入隊列
19  queue.put(r); 20             } catch (InterruptedException e) { 21  Thread.currentThread().interrupt(); 22  } 23  } 24  }; 25 
26     // 構造線程池
27     ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 28             600, TimeUnit.SECONDS, 29  queue, 30             new ThreadFactoryBuilder().setNameFormat("demo-%d").build(), 31  rejectedExecutionHandler); 32 
33     IntStream.rangeClosed(1, 50).forEach(i -> { 34         executor.submit(() -> { 35             log.info("start..."); 36             sleep(9000); 37  }); 38  }); 39 }

優雅的終止線程和線程池

優雅地終止線程

在程序中,我們不能隨便中斷一個線程,因為這是極其不安全的操作,我們無法知道這個線程正運行在什麼狀態,它可能持有某把鎖,強行中斷可能導致鎖不能釋放的問題;或者線程可能在操作數據庫,強行中斷導致數據不一致混亂的問題。正因此,JAVA里將Thread的stop方法設置為過時,以禁止大家使用。

優雅地終止線程,不是自己終止自己,而是在一個線程 T1 中,終止線程 T2;這裏所謂的“優雅”,指的是給 T2 一個機會料理後事,而不是被一劍封喉。兩階段終止模式,就是將終止過程分成兩個階段,其中第一個階段主要是線程 T1 向線程 T2發送終止指令,而第二階段則是線程 T2響應終止指令。

Java 線程進入終止狀態的前提是線程進入 RUNNABLE 狀態,而實際上線程也可能處在休眠狀態,也就是說,我們要想終止一個線程,首先要把線程的狀態從休眠狀態轉換到 RUNNABLE 狀態。如何做到呢?這個要靠 Java Thread 類提供的 interrupt() 方法,它可以將休眠狀態的線程轉換到 RUNNABLE 狀態。

線程轉換到 RUNNABLE 狀態之後,我們如何再將其終止呢?RUNNABLE 狀態轉換到終止狀態,優雅的方式是讓 Java 線程自己執行完 run() 方法,所以一般我們採用的方法是設置一個標誌位,然後線程會在合適的時機檢查這個標誌位,如果發現符合終止條件,則自動退出 run() 方法。這個過程其實就是第二階段:響應終止指令。終止指令,其實包括兩方面內容:interrupt() 方法和線程終止的標誌位。

如果我們在線程內捕獲中斷異常(如Thread.sleep()拋出了中斷一次)之後,需通過 Thread.currentThread().interrupt() 重新設置線程的中斷狀態,因為 JVM 的異常處理會清除線程的中斷狀態。

 

建議自己設置線程終止標誌位,避免線程內調用第三方類庫的方法未處理線程中斷狀態,如下所示。

 1 public class InterruptDemo {  2 
 3     /**
 4  * 輸出:調用 interrupt() 時,只是設置了線程中斷標識,線程依舊會繼續執行當前方法,執行完之後再退出線程。  5  * do something...  6  * continue do something...  7  * do something...  8  * continue do something...  9  * do something... 10  * 線程被中斷... 11  * continue do something... 12      */
13     public static void main(String[] args) throws InterruptedException { 14         Proxy proxy = new Proxy(); 15  proxy.start(); 16 
17         Thread.sleep(6000); 18  proxy.stop(); 19  } 20 
21     static class Proxy { 22         // 自定義線程終止標誌位
23         private volatile boolean terminated = false; 24 
25         private boolean started = false; 26 
27  Thread t; 28 
29         public synchronized void start() { 30             if (started) { 31                 return; 32  } 33             started = true; 34             terminated = false; 35 
36             t = new Thread(() -> { 37                 while (!terminated) { // 取代 while (true)
38                     System.out.println("do something..."); 39                     try { 40                         Thread.sleep(2000); 41                     } catch (InterruptedException e) { 42                         // 如果其它線程中斷此線程,拋出異常時,需重新設置線程中斷狀態,因為 JVM 的異常處理會清除線程的中斷狀態。
43                         System.out.println("線程被中斷..."); 44  Thread.currentThread().interrupt(); 45  } 46                     System.out.println("continue do something..."); 47  } 48                 started = false; 49  }); 50  t.start(); 51  } 52 
53         public synchronized void stop() { 54             // 設置中斷標誌
55             terminated = true; 56  t.interrupt(); 57  } 58  } 59 
60 }

 

優雅的終止線程池

線程池提供了兩個方法來中斷線程池:shutdown() 和 shutdownNow()。

shutdown():是一種很保守的關閉線程池的方法。線程池執行 shutdown() 后,就會拒絕接收新的任務,但是會等待線程池中正在執行的任務和已經進入阻塞隊列的任務都執行完之後才最終關閉線程池。

shutdownNow():相對激進一些,線程池執行 shutdownNow() 后,會拒絕接收新的任務,同時還會中斷線程池中正在執行的任務,已經進入阻塞隊列的任務也被剝奪了執行的機會,不過這些被剝奪執行機會的任務會作為 shutdownNow() 方法的返回值返回。因為 shutdownNow() 方法會中斷正在執行的線程,所以提交到線程池的任務,如果需要優雅地結束,就需要正確地處理線程中斷。如果提交到線程池的任務不允許取消,那就不能使用 shutdownNow() 方法終止線程池。

 

如果想在jvm關閉的時候進行內存清理、對象銷毀等操作,或者僅僅想起個線程然後這個線程不會退出,可以使用Runtime.addShutdownHook。

這個方法的作用就是在JVM中增加一個關閉的鈎子。當程序正常退出、系統調用 System.exit 方法或者虛擬機被關閉時才會執行系統中已經設置的所有鈎子,當系統執行完這些鈎子后,JVM才會關閉。

利用這個性質,就可以在這個最後執行的線程中把線程池優雅的關閉掉。雖然jvm關閉了,但優雅關閉線程池總是好的,特別是涉及到服務端的 tcp 連接。

 1 /**
 2  * 添加Hook在Jvm關閉時優雅的關閉線程池  3  *  4  * @param threadPool 線程池  5  * @param threadPoolName 線程池名稱  6  */
 7 public static void hookShutdownThreadPool(ExecutorService threadPool, String threadPoolName) {  8     Runtime.getRuntime().addShutdownHook(new Thread(() -> {  9         LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]", threadPoolName); 10         // 使新任務無法提交
11  threadPool.shutdown(); 12         try { 13             // 等待未完成任務結束
14             if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 15                 threadPool.shutdownNow(); // 取消當前執行的任務
16                 LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker, which may cause some task inconsistent. Please check the biz logs."); 17 
18                 // 等待任務取消的響應
19                 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 20                     LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs."); 21  } 22  } 23         } catch (InterruptedException ie) { 24             // 重新取消當前線程進行中斷
25  threadPool.shutdownNow(); 26             LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs."); 27 
28             // 保留中斷狀態
29  Thread.currentThread().interrupt(); 30  } 31 
32         LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]", threadPoolName); 33  })); 34 }

Executors

考慮到 ThreadPoolExecutor 的構造函數實在是有些複雜,所以 Java 併發包里提供了一個線程池的靜態工廠類 Executors,利用 Executors 你可以快速創建線程池。

但《阿里巴巴 Java 開發手冊》中提到,禁止使用這些方法來創建線程池,而應該手動 new ThreadPoolExecutor 來創建線程池。最重要的原因是:Executors 提供的很多方法默認使用的都是無界的 LinkedBlockingQueue,高負載情境下,無界隊列很容易導致 OOM,而 OOM 會導致所有請求都無法處理,這是致命問題。最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因為資源耗盡導致 OOM 問題。

 

newCachedThreadPool

具有緩存性質的線程池,線程最大空閑時間60s,線程可重複利用,沒有最大線程數限制。使用的是 SynchronousQueue 無容量阻塞隊列,沒有最大線程數限制。這意味着,只要有請求到來,就必須找到一條工作線程來處理,如果當前沒有空閑的線程就再創建一條新的。

高併發情況下,大量的任務進來後會創建大量的線程,導致OOM(無法創建本地線程):

1 [11:30:30.487] [http-nio-45678-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; 2     nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause 3 java.lang.OutOfMemoryError: unable to create new native thread 

 

newFixedThreadPool

具有固定數量的線程池,核心線程數等於最大線程數,超出最大線程數進行等待。使用的是 LinkedBlockingQueue 無界阻塞隊列。雖然使用 newFixedThreadPool 可以把工作線程控制在固定的數量上,但任務隊列是無界的。如果任務較多並且執行較慢的話,隊列可能會快速積壓,撐爆內存導致 OOM。

如果一直往這個無界隊列中添加任務,不久就會出現OOM異常(內存佔滿):

1 Exception in thread "http-nio-45678-ClientPoller" 
2     java.lang.OutOfMemoryError: GC overhead limit exceeded

 

newSingleThreadExecutor

核心線程數與最大線程數均為1,可用於當鎖控制同步。使用的是 LinkedBlockingQueue 無界阻塞隊列。

 

newScheduledThreadPool

具有時間調度性的線程池,必須初始化核心線程數。

沒有最大線程數限制,線程最大空閑時間為0,空閑線程執行完即銷毀。底層使用 DelayedWorkQueue 實現延遲特性。

線程池創建正確姿勢

最後,總結一下,從如下的一些方面考慮如何正確地創建線程池。

線程池配置

我們需要根據自己的場景、併發情況來評估線程池的幾個核心參數,包括核心線程數、最大線程數、線程回收策略、工作隊列的類型,以及拒絕策略,確保線程池的工作行為符合需求,一般都需要設置有界的工作隊列和可控的線程數。

要根據任務的“輕重緩急”來指定線程池的核心參數,包括線程數、回收策略和任務隊列:

  • 對於執行比較慢、數量不大的 IO 任務,要考慮更多的線程數,而不需要太大的隊列。
  • 對於吞吐量較大的計算型任務,線程數量不宜過多,可以是 CPU 核數或核數 *2(理由是,線程一定調度到某個 CPU 進行執行,如果任務本身是 CPU 綁定的任務,那麼過多的線程只會增加線程切換的開銷,並不能提升吞吐量),但可能需要較長的隊列來做緩衝。

 

任何時候,都應該為自定義線程池指定有意義的名稱,以方便排查問題。當出現線程數量暴增、線程死鎖、線程佔用大量 CPU、線程執行出現異常等問題時,我們往往會抓取線程棧。此時,有意義的線程名稱,就可以方便我們定位問題。

除了建議手動聲明線程池以外,還建議用一些監控手段來觀察線程池的狀態。如果我們能提前觀察到線程池隊列的積壓,或者線程數量的快速膨脹,往往可以提早發現並解決問題。

 

確認線程池本身是不是復用的

既然使用了線程池就需要確保線程池是在復用的,每次 new 一個線程池出來可能比不用線程池還糟糕。如果你沒有直接聲明線程池而是使用其他同學提供的類庫來獲得一個線程池,請務必查看源碼,以確認線程池的實例化方式和配置是符合預期的。

 

斟酌線程池的混用策略

不要盲目復用線程池,別人定義的線程池屬性不一定適合你的任務,而且混用會相互干擾。

另外,Java 8 的 parallel stream 背後是共享同一個 ForkJoinPool,默認并行度是 CPU 核數 -1。對於 CPU 綁定的任務來說,使用這樣的配置比較合適,但如果集合操作涉及同步 IO 操作的話(比如數據庫操作、外部服務調用等),建議自定義一個 ForkJoinPool(或普通線程池)。因此在使用 Java8 的并行流時,建議只用在計算密集型的任務,IO密集型的任務建議自定義線程池來提交任務,避免影響其它業務。

 

CommonExecutor

如下是我自己封裝的一個線程池工具類,還提供了執行批量任務的方法,關於批量任務後面再單獨寫篇文章來介紹。

 1 package org.hzero.core.util;  2 
 3 import java.util.ArrayList;  4 import java.util.Collections;  5 import java.util.List;  6 import java.util.concurrent.*;  7 import java.util.stream.Collectors;  8 import javax.annotation.Nonnull;  9 
 10 import com.google.common.util.concurrent.ThreadFactoryBuilder;  11 import org.apache.commons.collections4.CollectionUtils;  12 import org.apache.commons.lang3.RandomUtils;  13 import org.slf4j.Logger;  14 import org.slf4j.LoggerFactory;  15 import org.springframework.dao.DuplicateKeyException;  16 
 17 import io.choerodon.core.exception.CommonException;  18 
 19 import org.hzero.core.base.BaseConstants;  20 
 21 /**
 22  * @author bojiangzhou 2020/02/24  23  */
 24 public class CommonExecutor {  25 
 26     private static final Logger LOGGER = LoggerFactory.getLogger(CommonExecutor.class);  27 
 28     private static final ThreadPoolExecutor BASE_EXECUTOR;  29 
 30     static {  31         BASE_EXECUTOR = buildThreadFirstExecutor("BaseExecutor");  32  }  33 
 34     /**
 35  * 構建線程優先的線程池  36  * <p>  37  * 線程池默認是當核心線程數滿了后,將任務添加到工作隊列中,當工作隊列滿了之後,再創建線程直到達到最大線程數。  38  *  39  * <p>  40  * 線程優先的線程池,就是在核心線程滿了之後,繼續創建線程,直到達到最大線程數之後,再把任務添加到工作隊列中。  41  *  42  * <p>  43  * 此方法默認設置核心線程數為 CPU 核數,最大線程數為 8倍 CPU 核數,空閑線程超過 5 分鐘銷毀,工作隊列大小為 65536。  44  *  45  * @param poolName 線程池名稱  46  * @return ThreadPoolExecutor  47      */
 48     public static ThreadPoolExecutor buildThreadFirstExecutor(String poolName) {  49         int coreSize = CommonExecutor.getCpuProcessors();  50         int maxSize = coreSize * 8;  51         return buildThreadFirstExecutor(coreSize, maxSize, 5, TimeUnit.MINUTES, 1 << 16, poolName);  52  }  53 
 54     /**
 55  * 構建線程優先的線程池  56  * <p>  57  * 線程池默認是當核心線程數滿了后,將任務添加到工作隊列中,當工作隊列滿了之後,再創建線程直到達到最大線程數。  58  *  59  * <p>  60  * 線程優先的線程池,就是在核心線程滿了之後,繼續創建線程,直到達到最大線程數之後,再把任務添加到工作隊列中。  61  *  62  * @param corePoolSize 核心線程數  63  * @param maximumPoolSize 最大線程數  64  * @param keepAliveTime 空閑線程的空閑時間  65  * @param unit 時間單位  66  * @param workQueueSize 工作隊列容量大小  67  * @param poolName 線程池名稱  68  * @return ThreadPoolExecutor  69      */
 70     public static ThreadPoolExecutor buildThreadFirstExecutor(int corePoolSize,  71                                                               int maximumPoolSize,  72                                                               long keepAliveTime,  73  TimeUnit unit,  74                                                               int workQueueSize,  75  String poolName) {  76         // 自定義隊列,優先開啟更多線程,而不是放入隊列
 77         LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(workQueueSize) {  78             private static final long serialVersionUID = 5075561696269543041L;  79 
 80  @Override  81             public boolean offer(@Nonnull Runnable o) {  82                 return false; // 造成隊列已滿的假象
 83  }  84  };  85 
 86         // 當線程達到 maximumPoolSize 時會觸發拒絕策略,此時將任務 put 到隊列中
 87         RejectedExecutionHandler rejectedExecutionHandler = (runnable, executor) -> {  88             try {  89                 // 任務拒絕時,通過 offer 放入隊列
 90  queue.put(runnable);  91             } catch (InterruptedException e) {  92                 LOGGER.warn("{} Queue offer interrupted. ", poolName, e);  93  Thread.currentThread().interrupt();  94  }  95  };  96 
 97         ThreadPoolExecutor executor = new ThreadPoolExecutor(  98  corePoolSize, maximumPoolSize,  99  keepAliveTime, unit, 100  queue, 101                 new ThreadFactoryBuilder() 102                         .setNameFormat(poolName + "-%d") 103                         .setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { 104                             LOGGER.error("{} catching the uncaught exception, ThreadName: [{}]", poolName, thread.toString(), throwable); 105  }) 106  .build(), 107  rejectedExecutionHandler 108  ); 109 
110  CommonExecutor.displayThreadPoolStatus(executor, poolName); 111  CommonExecutor.hookShutdownThreadPool(executor, poolName); 112         return executor; 113  } 114 
115     /**
116  * 批量提交異步任務,使用默認的線程池
117 * 118 * @param tasks 將任務轉化為 AsyncTask 批量提交 119 */ 120 public static <T> List<T> batchExecuteAsync(List<AsyncTask<T>> tasks, @Nonnull String taskName) { 121 return batchExecuteAsync(tasks, BASE_EXECUTOR, taskName); 122 } 123 124 /** 125 * 批量提交異步任務,執行失敗可拋出異常或返回異常編碼即可 <br> 126 * <p> 127 * 需注意提交的異步任務無法控制事務,一般需容忍產生一些垃圾數據的情況下才能使用異步任務,異步任務執行失敗將拋出異常,主線程可回滾事務. 128 * <p> 129 * 異步任務失敗后,將取消剩餘的任務執行. 130 * 131 * @param tasks 將任務轉化為 AsyncTask 批量提交 132 * @param executor 線程池,需自行根據業務場景創建相應的線程池 133 * @return 返回執行結果 134 */ 135 public static <T> List<T> batchExecuteAsync(@Nonnull List<AsyncTask<T>> tasks, @Nonnull ThreadPoolExecutor executor, @Nonnull String taskName) { 136 if (CollectionUtils.isEmpty(tasks)) { 137 return Collections.emptyList(); 138 } 139 140 int size = tasks.size(); 141 142 List<Callable<T>> callables = tasks.stream().map(t -> (Callable<T>) () -> { 143 try { 144 T r = t.doExecute(); 145 146 LOGGER.debug("[>>Executor<<] Async task execute success. ThreadName: [{}], BatchTaskName: [{}], SubTaskName: [{}]", 147 Thread.currentThread().getName(), taskName, t.taskName()); 148 return r; 149 } catch (Throwable e) { 150 LOGGER.warn("[>>Executor<<] Async task execute error. ThreadName: [{}], BatchTaskName: [{}], SubTaskName: [{}], exception: {}", 151 Thread.currentThread().getName(), taskName, t.taskName(), e.getMessage()); 152 throw e; 153 } 154 }).collect(Collectors.toList()); 155 156 CompletionService<T> cs = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(size)); 157 List<Future<T>> futures = new ArrayList<>(size); 158 LOGGER.info("[>>Executor<<] Start async tasks, BatchTaskName: [{}], TaskSize: [{}]", taskName, size); 159 160 for (Callable<T> task : callables) { 161 futures.add(cs.submit(task)); 162 } 163 164 List<T> resultList = new ArrayList<>(size); 165 for (int i = 0; i < size; i++) { 166 try { 167 Future<T> future = cs.poll(6, TimeUnit.MINUTES); 168 if (future != null) { 169 T result = future.get(); 170 resultList.add(result); 171 LOGGER.debug("[>>Executor<<] Async task [{}] - [{}] execute success, result: {}", taskName, i, result); 172 } else { 173 cancelTask(futures); 174 LOGGER.error("[>>Executor<<] Async task [{}] - [{}] execute timeout, then cancel other tasks.", taskName, i); 175 throw new CommonException(BaseConstants.ErrorCode.TIMEOUT); 176 } 177 } catch (ExecutionException e) { 178 LOGGER.warn("[>>Executor<<] Async task [{}] - [{}] execute error, then cancel other tasks.", taskName, i, e); 179 cancelTask(futures); 180 Throwable throwable = e.getCause(); 181 if (throwable instanceof CommonException) { 182 throw (CommonException) throwable; 183 } else if (throwable instanceof DuplicateKeyException) { 184 throw (DuplicateKeyException) throwable; 185 } else { 186 throw new CommonException("error.executorError", e.getCause().getMessage()); 187 } 188 } catch (InterruptedException e) { 189 cancelTask(futures); 190 Thread.currentThread().interrupt(); // 重置中斷標識 191 LOGGER.error("[>>Executor<<] Async task [{}] - [{}] were interrupted.", taskName, i); 192 throw new CommonException(BaseConstants.ErrorCode.ERROR); 193 } 194 } 195 LOGGER.info("[>>Executor<<] Finish async tasks , BatchTaskName: [{}], TaskSize: [{}]", taskName, size); 196 return resultList; 197 } 198 199 /** 200 * 根據一定周期輸出線程池的狀態 201 * 202 * @param threadPool 線程池 203 * @param threadPoolName 線程池名稱 204 */ 205 public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName) { 206 displayThreadPoolStatus(threadPool, threadPoolName, RandomUtils.nextInt(60, 600), TimeUnit.SECONDS); 207 } 208 209 /** 210 * 根據一定周期輸出線程池的狀態 211 * 212 * @param threadPool 線程池 213 * @param threadPoolName 線程池名稱 214 * @param period 周期 215 * @param unit 時間單位 216 */ 217 public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName, long period, TimeUnit unit) { 218 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { 219 LOGGER.info("[>>ExecutorStatus<<] ThreadPool Name: [{}], Pool Status: [shutdown={}, Terminated={}], Pool Thread Size: {}, Active Thread Count: {}, Task Count: {}, Tasks Completed: {}, Tasks in Queue: {}", 220 threadPoolName, 221 threadPool.isShutdown(), threadPool.isTerminated(), // 線程是否被終止 222 threadPool.getPoolSize(), // 線程池線程數量 223 threadPool.getActiveCount(), // 工作線程數 224 threadPool.getTaskCount(), // 總任務數 225 threadPool.getCompletedTaskCount(), // 已完成的任務數 226 threadPool.getQueue().size()); // 線程池中線程的數量 227 }, 0, period, unit); 228 } 229 230 /** 231 * 添加Hook在Jvm關閉時優雅的關閉線程池 232 * 233 * @param threadPool 線程池 234 * @param threadPoolName 線程池名稱 235 */ 236 public static void hookShutdownThreadPool(ExecutorService threadPool, String threadPoolName) { 237 Runtime.getRuntime().addShutdownHook(new Thread(() -> { 238 LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]", threadPoolName); 239 // 使新任務無法提交 240 threadPool.shutdown(); 241 try { 242 // 等待未完成任務結束 243 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 244 threadPool.shutdownNow(); // 取消當前執行的任務 245 LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker, which may cause some task inconsistent. Please check the biz logs."); 246 247 // 等待任務取消的響應 248 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 249 LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs."); 250 } 251 } 252 } catch (InterruptedException ie) { 253 // 重新取消當前線程進行中斷 254 threadPool.shutdownNow(); 255 LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs."); 256 257 // 保留中斷狀態 258 Thread.currentThread().interrupt(); 259 } 260 261 LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]", threadPoolName); 262 })); 263 } 264 265 /** 266 * 獲取返回CPU核數 267 * 268 * @return 返回CPU核數,默認為8 269 */ 270 public static int getCpuProcessors() { 271 return Runtime.getRuntime() != null && Runtime.getRuntime().availableProcessors() > 0 ? 272 Runtime.getRuntime().availableProcessors() : 8; 273 } 274 275 private static <T> void cancelTask(List<Future<T>> futures) { 276 for (Future<T> future : futures) { 277 if (!future.isDone()) { 278 future.cancel(true); 279 } 280 } 281 } 282 283 }

AsyncTask:

 1 package org.hzero.core.util;  2 
 3 import java.util.UUID;  4 
 5 public interface AsyncTask<T> {  6 
 7     default String taskName() {  8         return UUID.randomUUID().toString();  9  } 10 
11  T doExecute(); 12 }

 

————————————————————————————————————–

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

緬甸小象成功獲救 凸顯盜獵危機仍在

摘錄自2019年10月2日自由時報報導

據《路透》報導,緬甸Wingabaw大象庇護營上個月在該國西南部森林,拯救了一隻誤入盜獵者陷阱的4個月大幼象,她的左前腿遭陷阱夾傷,父母推測也慘遭盜獵者殺害。

現在這隻被命名為艾雅‧賽恩(Ayeyar Sein),雖然腳上還戴著以竹子和繃帶做成的夾板,但在定期清理傷口之下,賽恩的恢復狀況良好,食慾也逐漸恢復正常。

據專家2018年統計報告指出,目前緬甸僅存1400至2000頭野生象,並有5000頭大象遭囚禁,凸顯出緬甸所面臨的盜獵危機。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

氣候變遷衝擊農民生計 菲青年下鄉助行銷轉型

摘錄自2019年10月2日中央社報導

菲律賓存在乾季和雨季,但氣候變遷改變降雨季節,衝擊農民生計。為此,25名菲律賓青年創業家組成的「根源聯合會」(Roots Collective)與「和平與公平基金會」(Peace and Equity Foundation)合作,即日起到6日在位於大馬尼拉「波尼法西奧堡環球城」(BGC)的商場展售在地農民、手工藝工作者與社會企業合作開發的織品、包包、鞋子、食品及日用品。

青年創業家佛蘭查(John Francia)與夥伴創立「編織工藝」(Woven Crafts),行銷販售來自菲律賓東部薩馬省(Samar)巴席鎮(Basey)的織品,並開發出筆電包、手拿包、化妝包等新產品。佛蘭查說,他們正把觸角延伸到民答那峨島和2017年受叛亂襲擊的馬拉韋市(Marawi)等地,希望開發新的工藝素材,幫助更多菲律賓貧困社區提高收入、脫離貧窮。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

布袋蓮洗刷惡名 成為肯亞再生能源金礦

摘錄自2019年9月30日聯合報報導

布袋蓮為原產於南美洲的水生植物,20世紀初首次出現於非洲。自從布袋蓮開始在1900年從南非開普敦開始向外蔓延,並開始堵塞主要水壩和河流後,科學家稱它為「世界最嚴重的水生雜草」。布袋蓮常因阻塞重要水道且難以根除而遭到敵視,但它在非洲肯亞卻有助提供更乾淨的能源,進而保護居民健康。

英國衛報報導,在肯亞基蘇木郡(Kisumu)維多利亞湖的Winam灣岸邊附近,大量布袋蓮腐爛在湖邊,但那些逐漸腐壞的蠟質葉片卻是可再生能源的金礦。事實證明,布袋蓮不僅繁殖能力出眾,它的葉子還含有高比例的碳和氮。早期研究預測,僅約4公斤的乾燥布袋蓮,就能滿足一個大家庭的日常能源需求。

肯亞學者證實,布袋蓮能將有機物轉化為甲烷(沼氣的主要成分)、二氧化碳及水。因為沼氣容易燃燒,一般天然氣能做的事情它都能辦到,像是煮飯、加熱或作為動力來源。Dunga村2018年獲贈兩個沼氣池,能為村莊60%人口提供服務,現在肯亞約有50個沼氣池。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

航向公海 海洋吸塵器首次成功收集垃圾

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

萬安灘因探勘石油與中對峙 越南學者籲提交聯合國安理會

摘錄自2019年10月7日中央社報導

中國7月起派遣探勘船「海洋地質8號」等船隻進入南沙群島最西側淺灘萬安灘(Vanguard Bank)探勘石油,越南將萬安灘稱為思政灘(Tu Chinh),聲稱依據聯合國海洋法公約,萬安灘位於越南200海里專屬經濟區內,出動相關船隻阻擾,與中方船隻發生對峙,越方並多次向中方抗議。兩國對峙數月,越南法律政策與發展研究所6日舉行座談會,與會學者建議越南將中國近期在萬安灘海域所採取威脅和武力行為提交聯合國安全理事會。

學者表示,中國近年來企圖將整個南海變成「家裡水池」,對東沙、西沙、中沙和南沙四大群島提出主權聲索,其中主張在與越南有爭議的西沙和南沙兩個群島設立專屬經濟區和大陸棚,目的是將沒有爭議海域變成有爭議海域,從而提出「共同資源開採」的聲索。

越南法律政策與發展研究所所長黃玉交(HoangNgoc Giao)表示,中國近年來在南海採取的行為,尤其是近期在歸屬越南專屬經濟區的萬安灘海域的既威脅又使用武力的行為,已違反聯合國憲章;國際社會反應也顯示,這是威脅區域和平、安全以及違反國際法基本原則的行為,中國不僅侵犯越南海域,還侵犯馬來西亞和菲律賓海域。越南應趁著擔任聯合國安理會2020年至2021年非常任理事國之際,將中國在南海威脅和武力行為向聯合國安理會提交。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

日本岩手、宮城水產品可望出口 歐盟:最快2019年內解禁

摘錄自2019年10月05日東森新聞日本報導

日本2011年4月發生9.0地震,福島第一核電站因此放射性物質外洩,成為嚴重的核災事故,讓各國紛紛對日本核災地區設下禁令,不願讓汙染物入國。如今,日本政府表示,歐盟在今年內將會放寬對日本食品的進口限制。

綜合日媒報導,日本執政黨相關人士5日指出,歐盟委員會主席容克(Jean-Claude Juncker)於之前的布魯塞爾會談時向日本首相安倍晉三透露,歐盟很快就會放寬對日本食品的進口限制,尤其是取消岩手、宮城縣的水產品進口禁令。

事實上,歐盟早在2017年就取消對福島縣大米的禁令,不過日本仍然致力於說服其他國家,包括歐盟、中國、韓國及美國等,希望解除福島食品的禁令,也強調日本出口食品時都會通過嚴格檢驗,出口品絕對安全。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

Jmeter系列(30)- 詳解 JDBC Request

如果你想從頭學習Jmeter,可以看看這個系列的文章哦

https://www.cnblogs.com/poloyy/category/1746599.html

 

前言

  • JDBC Request 主要是向數據庫發送一個 JDBC 請求(sql 語句),並獲取返回的數據集
  • 它需要和數據庫連接池配置(JDBC Connection Configuration)一起使用,可參考此篇博文:https://www.cnblogs.com/poloyy/p/13182706.html

 

JDBC Request

 

JDBC Request 界面介紹

 

字段含義

字段 含義

Variable Name Bound to Pool

數據庫連接池配置的名稱

Query Type

sql 語句的類型

SQL Query

  • sql 語句
  • 語句結尾不需要添加 ; 
  • 變量用 ? 佔位

Parameter values

需要傳遞的變量值,多個變量用 , 分隔

Parameter types

變量類型

Variable Names

  • 保存sql語句返回的數據和返回數據的總行數
  • 用 , 分隔
  • 跳過列用空

Result Variable Name

一個 Object 變量存儲所有返回值

Query timeout(s)

超時時間;默認0,代表無限時間

Limit ResultSet

和 limit 類似作用,限制 sql 語句返回結果集的行數

Handle ResultSet

如何定義 callable statements 返回的結果集;默認是存儲為字符串

後續通過各種栗子來深入理解常用字段的含義

 

舉栗子的前提

需要自己找一個有數據庫的數據來練手哦!這裏拿的表數據如下哈

 

只有 sql 語句的栗子

JDBC Request

沒啥特別的,平時 sql 怎麼寫,這裏就怎麼寫

 

運行結果

 

參數化的栗子

JDBC Request

 

運行結果

 

知識點

  • 有幾個問號,Parameter value、Parameter type 填寫值的數量要保持一致,用,分隔
  • 問號其實是佔位符,如果學過編程的童鞋應該也知道這種寫法,可以避免 SQL 注入的問題
  • sql 中使用佔位符時,Query Type 必須選擇 Prepared Select Statement 或者 Prepared Update Statement 
  • 我們可以用 Jmeter 變量去賦值,看下面栗子

 

參數化+變量的栗子

JDBC Request

 

運行結果

 

知識點

  • 如果在 sql 語句中使用變量,且是字符串類型,需要加上引號(前提是變量值沒有加引號),如 ${name} 
  • 如果在 Parameter values 中使用變量,且是字符串類型,不需要加上引號,只需要在 Parameter types 里寫明為 varchar 即可

 

使用 Variable Names 的栗子

結構樹

 

JDBC Request

添加一個 Debug Sampler 就知道這個字段有什麼作用了

 

JDBC Request 運行結果

 

調試取樣器運行結果

 

知識點

  • mysql:數據庫連接池對象
  • a_#、b_#、c_#、d_#:代錶行數
  • a_1:第 1 行、第 1 列
  • b_2:第 2 行、第 2 列
  • c_3:第 3 行、第 3 列
  • d_3:第 3 行、第 4 列
  • 以此類推….
  • 一般如果 HTTP 請求需要用到 sql 查出來的數據的話,就會用到 Variable names 這個字段

 

使用 Result variable name 的栗子

JDBC Request

 

Debug Sampler  運行結果

 

知識點

該變量是個數組,每一個元素代表一條記錄

 

重點

關於通過 Variable names、Result variable name 獲取到的值如何提取,我們將在下一篇文章中詳細講解

 

使用 Limit ResultSet 的栗子

JDBC Request

 

 

運行結果

 

知識點

  • Limit ResultSet 是對 sql 語句返回的結果集限制行數
  •  limit 10 限制只返回了 10 條數據,然後 Limit ResultSet = 6 限制結果集最終只返回 6 條數據

 

  本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

Halcon斑點分析BlobAnalysis解析

斑點分析的算法非常簡單:在圖像中,相關對象的像素(也稱為前景)通過其灰度值來識別。例如,圖中示例显示了液體中的組織顆粒。這些粒子是明亮的,液體(背景)是暗的。通過選擇明亮的像素(閾值),可以很容易檢測到顆粒。在許多應用中,暗像素和亮像素的簡單條件不再成立,但結果相同可以通過額外的預處理或像素選擇/分組的替代方法來實現。

在這種情況下,斑點分析的優點是HALCON提供了大量算子使其具有極大的靈活性。此外,這些方法通常具有很高的性能。斑點分析也可以與許多其他視覺任務相結合,例如作為預處理步驟,靈活地生成交互區域。

基本概念

斑點分析主要包括三個部分:

  1. 獲取圖像

  2. 分割圖像

    採集圖像后,接下來的任務是選擇前景像素。這也稱為分割。結果
    在HALCON中通常將此過程為Blob(二進制大對象),數據類型為區域(a region)。

  3. 提取目標特徵

    在最後一步中,將計算出諸如面積(像素數),重心或方向之類的特徵

該基本概念的一個示例是以下程序,該程序屬於上述示例。在此,從文件中獲取圖像。使用閾值(threshold)選擇大於120的所有像素。然後,引入了一個不太明顯的步驟:算子連接(connection)將所有亮像素的集合分離為所謂的連接組件。此步驟的效果是我們將劃分出多個區域,而不是閾值(threshold)返回的單個區域。該程序的最後一步是一些功能的計算。在此,算子area_center確定了大小(像素數)和重心。請注意area_center返回了三個值(每個參數有一個值)。

read_image (Image, 'particle')
threshold (Image, BrightPixels, 120, 255)
connection (BrightPixels, Particles)
area_center (Particles, Area, Row, Column)

擴展概念

在許多情況下,斑點分析將比上述示例更高級。原因是混亂或不均勻的照明。此外,經常需要進行后處理,例如將元素特徵轉換為真實世界單位或結果可視化。

使用RIO(Region Of Interest)

可以通過使用感興趣區域來加快斑點分析。搜索的斑點區域被限制越多。搜索將更快更強大。

對齊RIO或圖像

在某些應用中,關注區域必須相對於另一個對象對齊。或者圖像本身可以對齊,例如通過旋轉或裁剪。

校正圖像

與對齊類似,可能需要校正圖像,例如消除鏡頭畸變或轉換圖像的參考點。

預處理圖像(過濾)

下一個重要部分是圖像的預處理。在這裏,像mean_image或gauss_filter這樣的運算符可用於消除噪音。一個快速但不太完美的替代方案是binomial_filter。運算符middle_image對於抑制小斑點或細線很有用。算子anisotropic_diffusion(各向異性擴散)對保留邊緣的平滑很有用,最後使用fill_interlace消除由隔行交錯相機(攝像機視頻流圖像)引起的缺陷

提取分割參數

代替使用固定的閾值,可以為每個圖像動態提取它們。例如具有多個峰值的灰度值直方圖,每個對象類別一個。在這裏,您可以使用算子gray_histo_abs和histo_to_thresh。作為高級替代方案,可以將算子intensity與參考圖像結合使用,僅適用於背景:在設置過程中,將確定背景區域的平均灰度值。如果平均灰度值已更改,則可以相應調整閾值。

分割圖像

對於分割,可以使用各種方法。最簡單的方法是threshold(閾值),指定一個屬於前景對象的值範圍。另一個非常常見的方法是dyn_threshold。在此,第二張圖像將作為參考圖像。通過這種方法,使用局部閾值而不是全局閾值。這些局部閾值存儲在參考圖像中。可以通過拍攝空背景圖片將其設為靜態作為參考圖像,也可以使用平滑濾鏡(例如mean_image)

處理區域

一旦斑點區域被分割。通常需要對其進行修改,例如,通過抑制小區域,給定方向或接近其他區域的區域。在這種情況下,形態算子open_circle和opening_rectangle通常可用於抑制噪聲,closeing_circle和closing-rectanglel填補空白。可以使用select_shape,select_shape_std和select-proto-proto選擇具有特定功能的斑點。

特徵提取

最終處理時,將提取斑點的特徵,所需功能的類型取決於應用程序。類型列表可以在參考手冊的“Regions/Features”和”Image/Features”中找到。

將結果轉換為世界坐標

諸如面積或重心之類的要素通常必須轉換為世界坐標。這可以通過HALCON相機
校準實現。

可視化結果

最後,你可能要显示圖像的斑點(區域)和特徵。

靈感來源於Halcon官方文檔

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

LeetCode 80,不使用外部空間的情況下對有序數組去重

本文始發於個人公眾號:TechFlow,原創不易,求個關注

今天是LeetCode專題的第49篇文章,我們一起來看LeetCode的第80題,有序數組去重II(Remove Duplicates from Sorted Array II)。

這題的官方難度是Medium,通過率是43.3%,點贊1104,反對690。這題的通過率有一點點高,然後點贊比也不是很高。說明這題偏容易,並且大家的評價偏低。也的確如此,我個人覺得,大家評價不好的主要原因還是這題偏容易了一些。

題面

其實從題目的標題當中我們已經可以得到很多信息了,實際上也的確如此,這題的題面和標題八九不離十,需要我們對一個有序的數組進行去重。不過去重的條件是最多允許一個元素出現兩次,也就是要將多餘的元素去掉。並且題目還限制了需要我們在原數組進行操作,對於空間複雜度的要求是。由於我們去除了元素之後會帶來數組長度的變化,所以我們最後需要返回完成之後數組的長度。

這是一種常規的做法,在C++以及一些古老的語言當中數組是不能變更長度的。我們想要在原數組上刪除數據,只能將要刪除的數據移動到數組末尾,然後返回變更之後的數組長度。這樣下游就通過返回的數組長度得知變更之後的數量變化。由於新晉的一些語言,比如Java、Python都支持數組長度變動,所以很少在這些語言的代碼當中看到這樣的用法了。

樣例

Given nums = [0,0,1,1,1,1,2,3,3],

Your function should return length = 7, with the first seven elements of nums being modified to 0, 0, 1, 1, 2, 3 and 3 respectively.

It doesn't matter what values are set beyond the returned length. 

在這個樣例當中,由於1出現了4次,所以我們需要刪除掉2個1,那麼刪除之後的數組長度也會減少2,所以我們需要返回7,表示刪除之後的新的數組的有效長度是7。並且保證原數組當中前5個元素是[0, 0, 1, 1, 2, 3]

題解

刪除重複的元素本身並不複雜,唯一麻煩的是我們怎麼在不引入額外存儲的情況下完成這一點。如果你能抓住數組是有序的這一點,應該很容易想通:既然數組是有序的,那麼相同的元素必然排在一起。

既然相同的元素排在一起,那麼我們可以利用一個變量存儲當前元素出現的次數。如果遇到不同的元素,則將次數置為1。這樣我們就可以判斷出究竟哪些元素需要刪除,哪些元素需要保留了。

但是這就又引入了另外一個問題,我們怎麼來刪除這些重複的元素呢?因為我們不能引入額外的數組,需要在當前數組上完成。我們可以先假設沒有這個限制,我們會怎麼做?

new_nums = []
cur = None
for i in range(n):
    if cur == nums[i]:
        count += 1
 else:
        count = 1
        cur = nums[i]
    if count > 2:
        continue
    new_nums.append(nums[i])

由於有這個限制,所以我們要做的就是把new_nums這個數組去掉,其實去掉是很簡單的,因為我們可以讓nums這個數組自己覆蓋自己。因為產出的數據的數量一定是小於等於數組長度的,所以不會出現數組越界的問題。我們只需要維護一個下標記錄nums數組當中允許覆蓋的位置即可。

這個也是非常常見的做法,我們在之前的題目當中也曾經見到過。

class Solution:
    def removeDuplicates(self, nums: List[int]) -> int:
        # start是起始覆蓋指針,指向第一個可以覆蓋的位置
        start, cur, cnt = 0, None, 0
        n = len(nums)
        if n == 0:
            return 0
        for i in range(n):
            if cur == nums[i]:
                cnt += 1
            else:
                cnt = 1
                cur = nums[i]
            # 如果數量超過2,說明當前元素應該捨棄,則continue
            if cnt > 2:
                continue
            # 否則用當前元素覆蓋start位置,並且start移動一位
            else:
                nums[start] = nums[i]
                start += 1
        return start

關於這段代碼,還有一個簡化版本,我們可以把cnt變量也省略掉。因為元素是有序的,我們可以直接用nums[i]和nums[i-2]進行判斷,如果相等,那麼說明重複的元素一定超過了兩個,當前元素需要跳過。

簡化之後的代碼如下:

class Solution(object):
    def removeDuplicates(self, nums):
        """  :type nums: List[int]  :rtype: int  """
        i = 0
        for n in nums:
            if i < 2 or n != nums[i - 2]:
                nums[i] = n
                i += 1
        return i

總結

今天的題目不難,總體來說算是Medium偏低難度,主要有兩點值得稱道。第一點是C++風格inplace變更數組的做法,第二點就是數組自我覆蓋的方法。除此之外,題目幾乎沒什麼難度,我想大家應該都能想出解法來。

如果喜歡本文,可以的話,請點個關注,給我一點鼓勵,也方便獲取更多文章。

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準