2013年5月7日 星期二

[ Java 文章收集 ] Java Concurrency - Semahores

來源自 這裡 
Preface: 
有關於 Semaphore 的說明可以參考 wiki 上面的說明
In computer science, a semaphore is a variable or abstract data type that provides a simple but useful abstraction for controlling access by multiple processes to a common resource in a parallel programming or multi user environment.

簡單說就是在 Multithread 的環境下, 你會定義一個 Critical section 並限制該部分的 code 一次只能有一個 thread 進去以避免 Race condition. 而 Semaphore 你可以把它當作一個 lock, 在要進去 critical section 時去 acquire lock, 此時如果 lock 是 available, 就可以進入 critical section , 否則就 wait 直到在 critical section 的 thread 離開後釋放 lock 後, 其它在等待 lock 的 thread 便有一隻會爭搶到 lock 而得以進入 critical section. 在 Java 5 以後有提供這樣的 implementation 在 java.util.concurrent.Semaphore. 而 Semaphore 又分為 binary Semaphore 與 counting Semaphore, 主要是那個 lock 能夠同時被幾個 thread 擁有, 也就是同時有幾個 thread 能進入到 critical section. 在 Java 提供的 Semaphore 的建構子: 
Semaphore(int permits) 
Creates a Semaphore with the given number of permits and nonfair fairness setting.

當你給 permit=1 就是 binary Semaphore; 當 permit>1 就是所謂的 counting Semaphore. 

接下底下會實做一個簡單版本的 Semaphore, 並透過幾個範例來了解如何使用 Semaphore. 

Simple/Binary Semaphore: 
底下是接下來範例會遇到的 "陽春版" Semaphore 實作: 
  1. package demo;  
  2.   
  3. public class Semaphore {  
  4.     private boolean signal = false;  
  5.   
  6.     public synchronized void take() {  
  7.         this.signal = true;  
  8.         this.notify();  
  9.     }  
  10.   
  11.     public synchronized void release() throws InterruptedException {  
  12.         while (!this.signal)  
  13.             wait();  
  14.         this.signal = false;  
  15.     }  
  16. }  
這邊函數的取名 take() 與 release() 其實沒有多大意義, 你可以想成是兩隻 thread 在溝通的方式. 當有 thread 呼叫 Semaphore 上的 release() 前沒有其他 thread 呼叫 Semaphore 上的take() 時, 它將會停止執行直到有其他 thread 呼叫 Semaphore 上面的 take() 叫醒它. 

Using Semaphores for Signaling: 
底下是一個 Thread 利用 Semaphore 當作 signal 的範例: 
- SendingThread : Sending signal to wake up RecevingThread. 
  1. package demo;  
  2.   
  3. import java.util.Random;  
  4.   
  5. public class SendingThread extends Thread{  
  6.     Semaphore semaphore = null;  
  7.   
  8.     public SendingThread(Semaphore semaphore) {  
  9.         this.semaphore = semaphore;  
  10.     }  
  11.   
  12.     @Override  
  13.     public void run() {  
  14.         Random rdm = new Random();  
  15.         int so = 0;  
  16.         while (true) {  
  17.             System.out.printf("\t[ST] Prepare stuff...\n");  
  18.             try{Thread.sleep(rdm.nextInt(500)+500);}catch(Exception e){}  
  19.             System.out.printf("\t[ST] Signaling(%d)!\n", so);  
  20.             this.semaphore.take();  
  21.             so++;  
  22.         }  
  23.     }  
  24. }  
- RecevingThread: Waiting signal to do thing 
  1. package demo;  
  2.   
  3. public class RecevingThread extends Thread{  
  4.     Semaphore semaphore = null;  
  5.   
  6.     public RecevingThread(Semaphore semaphore) {  
  7.         this.semaphore = semaphore;  
  8.     }  
  9.   
  10.     @Override  
  11.     public void run() {  
  12.         int so = 0;  
  13.         while (true) {  
  14.             try  
  15.             {  
  16.                 System.out.printf("\t[RT] Wait signal!\n");  
  17.                 this.semaphore.release();  
  18.                 System.out.printf("\t[RT] Receiving signal(%d)!\n", so);  
  19.                 so++;  
  20.             }  
  21.             catch(Exception e){e.printStackTrace();}  
  22.         }  
  23.     }  
  24. }  
- Demo1: 這個範例要求 RecevingThread 必須等到 signal 才能做事. 通常 SendingThread 會比較慢, 故 RecevingThread 需要等待. 
  1. package demo;  
  2.   
  3. public class Demo1 {  
  4.     public static void main(String[] args) {  
  5.         Semaphore semaphore = new Semaphore();  
  6.   
  7.         SendingThread sender = new SendingThread(semaphore);  
  8.   
  9.         RecevingThread receiver = new RecevingThread(semaphore);  
  10.   
  11.         receiver.start();  
  12.         sender.start();  
  13.     }  
  14. }  
執行的 Log 如下: 
[RT] Wait signal!
[ST] Prepare stuff...
[ST] Signaling(0)!
[ST] Prepare stuff...
[RT] Receiving signal(0)!
[RT] Wait signal!
[ST] Signaling(1)!
[RT] Receiving signal(1)!
[RT] Wait signal!
[ST] Prepare stuff...
[ST] Signaling(2)!
[RT] Receiving signal(2)!
...

可以發現 Receiving signal 一定都發生在 Signaling 之後 (signal 的數字要一樣). 如此我們確保了接收 signal 的 thread 一定比發 signal 的 thread 晚執行! 

Counting Semaphore: 
上面陽春版的 Semaphore 只能用在只有兩個 Thread 的場合, 當你有多個(>2) Thread 在交互作用時, 此時便需要 counting Semaphore. 下面為範例使用的 counting Semaphore 實作: 
  1. package demo;  
  2.   
  3. public class CountingSemaphore {  
  4.     private int signals = 0;  
  5.   
  6.     public synchronized void take() {  
  7.         this.signals++;  
  8.         this.notify();  
  9.     }  
  10.   
  11.     public synchronized void release() throws InterruptedException {  
  12.         while (this.signals == 0)  
  13.             wait();  
  14.         this.signals--;  
  15.     }  
  16. }  
Bounded Semaphore: 
但如果你希望 take() 不是可以無限的呼叫時, 也就是你希望 lock 可以使用的 thread 數是有限的, 此時可以考慮 bounded counting Semaphore. 簡單實作如下: 
  1. package demo;  
  2.   
  3. public class BoundedSemaphore {  
  4.     private int signals = 0;  
  5.     private int bound = 0;  
  6.   
  7.     public BoundedSemaphore(int upperBound) {  
  8.         this.bound = upperBound;  
  9.     }  
  10.   
  11.     public synchronized void take() throws InterruptedException {  
  12.         while (this.signals == bound)  
  13.             wait();  
  14.         this.signals++;  
  15.         this.notify();  
  16.     }  
  17.   
  18.     public synchronized void release() throws InterruptedException {  
  19.         while (this.signals == 0)  
  20.             wait();  
  21.         this.signals--;  
  22.         this.notify();  
  23.     }  
  24. }  
Producer-Consumer Problem: 
bounded counting Semaphore 的其中一個使用範例是 Producer-Consumer problem. 這邊把 bounded counting Semaphore 當作 buffer 給 Producer 存放產出; 給 Consumer 取出產出. 當 buffer 滿了時, Producer 要放入產出時會 block; 當 buffer 空了時, Consumer 要取出產出時會 block 出. 
- Producer 
  1. package demo;  
  2.   
  3. import java.util.Random;  
  4.   
  5. public class ProducerThd extends Thread{      
  6.     public BoundedSemaphore bsem=null;  
  7.     public static int loop=0;  
  8.       
  9.       
  10.     public ProducerThd(int l, BoundedSemaphore bs)  
  11.     {         
  12.         this.bsem = bs;  
  13.         this.loop=l;  
  14.     }  
  15.       
  16.     @Override  
  17.     public void run()  
  18.     {  
  19.         Random rdm = new Random();  
  20.         try  
  21.         {  
  22.             for(int i=0; i
  23.             {                 
  24.                 synchronized(this)  
  25.                 {  
  26.                     Thread.sleep(rdm.nextInt(200)+100);  
  27.                     bsem.take();                  
  28.                     System.out.printf("\t[PD] Producing...%d\n", i+1);  
  29.                 }  
  30.             }  
  31.         }  
  32.         catch(Exception e)  
  33.         {  
  34.             e.printStackTrace();  
  35.         }  
  36.     }  
  37. }  
- Consumer 
  1. package demo;  
  2.   
  3. import java.util.Random;  
  4.   
  5. public class ConsumerThd extends Thread{  
  6.     public BoundedSemaphore bs = null;  
  7.     public static int cc = 0;  
  8.       
  9.     public ConsumerThd(int i, BoundedSemaphore bs){this.setName(String.format("C%d", i)); this.bs=bs;}  
  10.   
  11.     @Override  
  12.     public void run()  
  13.     {  
  14.         try  
  15.         {  
  16.             Random rdm = new Random();  
  17.             while(true)  
  18.             {                 
  19.                 synchronized(ConsumerThd.class)  
  20.                 {  
  21.                     if(cc==ProducerThd.loop) break;  
  22.                     bs.release();  
  23.                     cc++;                 
  24.                     System.out.printf("\t[%s] Consuming %d...\n", getName(), cc);  
  25.                     Thread.sleep(rdm.nextInt(300)+300);  
  26.                 }  
  27.             }  
  28.         }  
  29.         catch(Exception e){e.printStackTrace();}  
  30.         System.out.printf("\t[%s] Byebye!\n", getName());  
  31.     }  
  32. }  
- Demo2 
  1. package demo;  
  2.   
  3. public class Demo2 {  
  4.     public static void main(String args[])  
  5.     {  
  6.         BoundedSemaphore buffer = new BoundedSemaphore(20);  
  7.         ProducerThd ph = new ProducerThd(10, buffer);  
  8.         ConsumerThd c1 = new ConsumerThd(1, buffer);  
  9.         ConsumerThd c2 = new ConsumerThd(2, buffer);  
  10.         ConsumerThd c3 = new ConsumerThd(3, buffer);  
  11.         ph.start();  
  12.         c1.start(); c2.start(); c3.start();  
  13.     }  
  14. }  
Supplement: 
多線程設計模式 : Producer-Consumer Pattern (我來做, 你來用)

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...