Preface:
有關於 Semaphore 的說明可以參考 wiki 上面的說明:
簡單說就是在 Multithread 的環境下, 你會定義一個 Critical section 並限制該部分的 code 一次只能有一個 thread 進去以避免 Race condition. 而 Semaphore 你可以把它當作一個 lock, 在要進去 critical section 時去 acquire lock, 此時如果 lock 是 available, 就可以進入 critical section , 否則就 wait 直到在 critical section 的 thread 離開後釋放 lock 後, 其它在等待 lock 的 thread 便有一隻會爭搶到 lock 而得以進入 critical section. 在 Java 5 以後有提供這樣的 implementation 在 java.util.concurrent.Semaphore. 而 Semaphore 又分為 binary Semaphore 與 counting Semaphore, 主要是那個 lock 能夠同時被幾個 thread 擁有, 也就是同時有幾個 thread 能進入到 critical section. 在 Java 提供的 Semaphore 的建構子:
- Semaphore(int permits)
當你給 permit=1 就是 binary Semaphore; 當 permit>1 就是所謂的 counting Semaphore.
接下底下會實做一個簡單版本的 Semaphore, 並透過幾個範例來了解如何使用 Semaphore.
Simple/Binary Semaphore:
底下是接下來範例會遇到的 "陽春版" Semaphore 實作:
- package demo;
- public class Semaphore {
- private boolean signal = false;
- public synchronized void take() {
- this.signal = true;
- this.notify();
- }
- public synchronized void release() throws InterruptedException {
- while (!this.signal)
- wait();
- this.signal = false;
- }
- }
Using Semaphores for Signaling:
底下是一個 Thread 利用 Semaphore 當作 signal 的範例:
- SendingThread : Sending signal to wake up RecevingThread.
- package demo;
- import java.util.Random;
- public class SendingThread extends Thread{
- Semaphore semaphore = null;
- public SendingThread(Semaphore semaphore) {
- this.semaphore = semaphore;
- }
- @Override
- public void run() {
- Random rdm = new Random();
- int so = 0;
- while (true) {
- System.out.printf("\t[ST] Prepare stuff...\n");
- try{Thread.sleep(rdm.nextInt(500)+500);}catch(Exception e){}
- System.out.printf("\t[ST] Signaling(%d)!\n", so);
- this.semaphore.take();
- so++;
- }
- }
- }
- package demo;
- public class RecevingThread extends Thread{
- Semaphore semaphore = null;
- public RecevingThread(Semaphore semaphore) {
- this.semaphore = semaphore;
- }
- @Override
- public void run() {
- int so = 0;
- while (true) {
- try
- {
- System.out.printf("\t[RT] Wait signal!\n");
- this.semaphore.release();
- System.out.printf("\t[RT] Receiving signal(%d)!\n", so);
- so++;
- }
- catch(Exception e){e.printStackTrace();}
- }
- }
- }
- package demo;
- public class Demo1 {
- public static void main(String[] args) {
- Semaphore semaphore = new Semaphore();
- SendingThread sender = new SendingThread(semaphore);
- RecevingThread receiver = new RecevingThread(semaphore);
- receiver.start();
- sender.start();
- }
- }
可以發現 Receiving signal 一定都發生在 Signaling 之後 (signal 的數字要一樣). 如此我們確保了接收 signal 的 thread 一定比發 signal 的 thread 晚執行!
Counting Semaphore:
上面陽春版的 Semaphore 只能用在只有兩個 Thread 的場合, 當你有多個(>2) Thread 在交互作用時, 此時便需要 counting Semaphore. 下面為範例使用的 counting Semaphore 實作:
- package demo;
- public class CountingSemaphore {
- private int signals = 0;
- public synchronized void take() {
- this.signals++;
- this.notify();
- }
- public synchronized void release() throws InterruptedException {
- while (this.signals == 0)
- wait();
- this.signals--;
- }
- }
但如果你希望 take() 不是可以無限的呼叫時, 也就是你希望 lock 可以使用的 thread 數是有限的, 此時可以考慮 bounded counting Semaphore. 簡單實作如下:
- package demo;
- public class BoundedSemaphore {
- private int signals = 0;
- private int bound = 0;
- public BoundedSemaphore(int upperBound) {
- this.bound = upperBound;
- }
- public synchronized void take() throws InterruptedException {
- while (this.signals == bound)
- wait();
- this.signals++;
- this.notify();
- }
- public synchronized void release() throws InterruptedException {
- while (this.signals == 0)
- wait();
- this.signals--;
- this.notify();
- }
- }
bounded counting Semaphore 的其中一個使用範例是 Producer-Consumer problem. 這邊把 bounded counting Semaphore 當作 buffer 給 Producer 存放產出; 給 Consumer 取出產出. 當 buffer 滿了時, Producer 要放入產出時會 block; 當 buffer 空了時, Consumer 要取出產出時會 block 出.
- Producer
- package demo;
- import java.util.Random;
- public class ProducerThd extends Thread{
- public BoundedSemaphore bsem=null;
- public static int loop=0;
- public ProducerThd(int l, BoundedSemaphore bs)
- {
- this.bsem = bs;
- this.loop=l;
- }
- @Override
- public void run()
- {
- Random rdm = new Random();
- try
- {
- for(int i=0; i
- {
- synchronized(this)
- {
- Thread.sleep(rdm.nextInt(200)+100);
- bsem.take();
- System.out.printf("\t[PD] Producing...%d\n", i+1);
- }
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- package demo;
- import java.util.Random;
- public class ConsumerThd extends Thread{
- public BoundedSemaphore bs = null;
- public static int cc = 0;
- public ConsumerThd(int i, BoundedSemaphore bs){this.setName(String.format("C%d", i)); this.bs=bs;}
- @Override
- public void run()
- {
- try
- {
- Random rdm = new Random();
- while(true)
- {
- synchronized(ConsumerThd.class)
- {
- if(cc==ProducerThd.loop) break;
- bs.release();
- cc++;
- System.out.printf("\t[%s] Consuming %d...\n", getName(), cc);
- Thread.sleep(rdm.nextInt(300)+300);
- }
- }
- }
- catch(Exception e){e.printStackTrace();}
- System.out.printf("\t[%s] Byebye!\n", getName());
- }
- }
- package demo;
- public class Demo2 {
- public static void main(String args[])
- {
- BoundedSemaphore buffer = new BoundedSemaphore(20);
- ProducerThd ph = new ProducerThd(10, buffer);
- ConsumerThd c1 = new ConsumerThd(1, buffer);
- ConsumerThd c2 = new ConsumerThd(2, buffer);
- ConsumerThd c3 = new ConsumerThd(3, buffer);
- ph.start();
- c1.start(); c2.start(); c3.start();
- }
- }
* 多線程設計模式 : Producer-Consumer Pattern (我來做, 你來用)
沒有留言:
張貼留言