參考自 這裡
前言 :
在上 NLP 時剛好學到了這個 Algorithm, 因為有點複雜, 所以記錄在這方便以後參考. The Viterbi algorithm 使用 dynamic programming algorithm 來找出有最大可能性的 hidden states 的 sequence (或叫 Viterbi path). 其計算結果就是一個 observed events 的路徑或順序, 在 HMM (hidden Markov models - 隱藏馬克夫模型) 被使用來當作 decoder. 在使用這個 Algorithm 時是有一些假設 (Assumptions) :
而這樣的假設在 first-order HMM 是被滿足的. 最後底下是這個 Algorithm 的歷史典故 :
概念說明 :
在開始談 Algorithm/coding 之前, 我們先來了解一下 Viterbi algorithm 是要解什麼問題, 還有他的運作環境與條件是什麼? 首先考慮我們有一個 State machine 如下 :
從上面的 State machine 主要是由 3 個 States 組成 Start, Rainy 與 Sunny. 而 State 間的轉換滿除下面機率模型 (又稱 Table A) :
問題是我們是想從未來的時間來推斷天氣是 Sunny/Rainny (未知), 雖然天氣是未知但是我們有觀察到一些行為與天氣相關聯 (已知) , 而其機率模型為 (又稱 Table B) :
其中 P(Walk:Shop:Clean|Rainy) 可以想成當天氣下雨時, 你會散步的機率是 0.1; 逛街的機率是 0.4; 打掃家裡的機率是 0.5. 因此我們打算透過觀察的行為順序 (已知) 來推斷 "最有可能" 的天氣的發生順序 (未知). 而這便是 Viterbi algorithm 要解的問題!
演算法說明 :
知道了這個演算法想解決的問題, 接著我們來看看這個演算法運算的原理是什麼. 我們以上面說明的 State machine 當作範例進行說明, 首先任何的 State machine 都會有一個起始狀態 (Start). 我們會在每一步產生由當前這一步到前面一步最有可能的狀態, 並將之紀錄起來方便結算時我們可以從最後一步推算到第一步. 接著假設我們有觀察到某五天的行為依序是 Walk->Walk->Shop->Clean. 接著我們要透過之前兩個機率模型來推斷這四天最有可能的天氣分布會是什麼. 首先第一天推算結果如下 :
第二天則基於第一天結果往下推算, 從下面說明可以知道第二天推算結果 :
基於第二天運算結果, 從下面說明可以知道第三天運算結果 :
基於第三天結果, 從下面說明可以知道最後一天運算結果並推算最後一天 "最有可能" 是 Rainy :
接著我們開始透過剛剛紀錄每一步求出前一步最有可能的天氣, 並將結果串連起來便得到我們想預測的天氣順序 : Sunny > Sunny > Rainy > Rainy
Coding 實作 :
基於上述說明, 底下我們用 Java 實現這樣的 Algorithm 並套用上面 State machine 的範例運算 :
執行結果 :
實用工具:
上面代碼有點針對範例而寫, 不容易套到通用問題. 因此下面的 ViterbiAlg.groovy 算是一個工具讓你可以在決定機率矩陣 A, B 與 π 後, 可以輕鬆使用此工具進行 HMM Decoding 的工作:
- ViterbiAlg.groovy
一個使用範例如下:
執行結果:
前言 :
在上 NLP 時剛好學到了這個 Algorithm, 因為有點複雜, 所以記錄在這方便以後參考. The Viterbi algorithm 使用 dynamic programming algorithm 來找出有最大可能性的 hidden states 的 sequence (或叫 Viterbi path). 其計算結果就是一個 observed events 的路徑或順序, 在 HMM (hidden Markov models - 隱藏馬克夫模型) 被使用來當作 decoder. 在使用這個 Algorithm 時是有一些假設 (Assumptions) :
而這樣的假設在 first-order HMM 是被滿足的. 最後底下是這個 Algorithm 的歷史典故 :
概念說明 :
在開始談 Algorithm/coding 之前, 我們先來了解一下 Viterbi algorithm 是要解什麼問題, 還有他的運作環境與條件是什麼? 首先考慮我們有一個 State machine 如下 :
從上面的 State machine 主要是由 3 個 States 組成 Start, Rainy 與 Sunny. 而 State 間的轉換滿除下面機率模型 (又稱 Table A) :
問題是我們是想從未來的時間來推斷天氣是 Sunny/Rainny (未知), 雖然天氣是未知但是我們有觀察到一些行為與天氣相關聯 (已知) , 而其機率模型為 (又稱 Table B) :
其中 P(Walk:Shop:Clean|Rainy) 可以想成當天氣下雨時, 你會散步的機率是 0.1; 逛街的機率是 0.4; 打掃家裡的機率是 0.5. 因此我們打算透過觀察的行為順序 (已知) 來推斷 "最有可能" 的天氣的發生順序 (未知). 而這便是 Viterbi algorithm 要解的問題!
演算法說明 :
知道了這個演算法想解決的問題, 接著我們來看看這個演算法運算的原理是什麼. 我們以上面說明的 State machine 當作範例進行說明, 首先任何的 State machine 都會有一個起始狀態 (Start). 我們會在每一步產生由當前這一步到前面一步最有可能的狀態, 並將之紀錄起來方便結算時我們可以從最後一步推算到第一步. 接著假設我們有觀察到某五天的行為依序是 Walk->Walk->Shop->Clean. 接著我們要透過之前兩個機率模型來推斷這四天最有可能的天氣分布會是什麼. 首先第一天推算結果如下 :
第二天則基於第一天結果往下推算, 從下面說明可以知道第二天推算結果 :
基於第二天運算結果, 從下面說明可以知道第三天運算結果 :
基於第三天結果, 從下面說明可以知道最後一天運算結果並推算最後一天 "最有可能" 是 Rainy :
接著我們開始透過剛剛紀錄每一步求出前一步最有可能的天氣, 並將結果串連起來便得到我們想預測的天氣順序 : Sunny > Sunny > Rainy > Rainy
Coding 實作 :
基於上述說明, 底下我們用 Java 實現這樣的 Algorithm 並套用上面 State machine 的範例運算 :
執行結果 :
實用工具:
上面代碼有點針對範例而寫, 不容易套到通用問題. 因此下面的 ViterbiAlg.groovy 算是一個工具讓你可以在決定機率矩陣 A, B 與 π 後, 可以輕鬆使用此工具進行 HMM Decoding 的工作:
- ViterbiAlg.groovy
- class ViterbiAlg {
- def LZERO = -1*Math.pow(10, 10)
- def LSMALL = -0.5*Math.pow(10, 10)
- def minLogExp = -Math.log(-LZERO)
- def isDebug = false
- def shift=0
- def A = [] // State MM
- def B = [] // Observer Matrix
- def π = [] // Default/Initial Prob.
- def SS = 0 // State Size
- def OS = 0 // Observation Size
- void dprintf(String fmt, ...Os)
- {
- if(isDebug) printf(fmt, Os)
- }
- /**
- * Input: x = log(x'); y = log(y')
- * Return: log(x'+y')
- * @param x
- * @param y
- * @return
- */
- double logAdd(double x, double y)
- {
- double temp, diff, z;
- if (x < y)
- {
- temp = x; x = y; y = temp;
- }
- diff = y-x; // notice that diff <= 0
- if (diff < minLogExp) // if y' is far smaller than x'
- return (x < LSMALL) ? LZERO : x;
- else
- {
- z = Math.exp(diff);
- return x + Math.log(1.0 + z);
- }
- }
- /**
- * Input: d1, d2, ... dn
- * Return: Log(d1)+Log(d2)+...+Log(dn)
- * @param ds
- * @return
- */
- double logSum(Double ...ds)
- {
- double lSum=0
- ds.each {
- lSum+=Math.log(it)
- //lSum*=it
- }
- return lSum
- }
- void setPM(def A, def B, def π){setA(A); setB(B); this.π = π;}
- void setA(def A)
- {
- this.A = A
- SS = A.size()
- }
- void setB(def B)
- {
- this.B = B
- OS = B[0].size()
- }
- double backward(List
olist) - {
- def β = []
- def tp = []
- if(olist.size()==0) return 0
- else if(olist.size()==1)
- {
- def o = olist[0]
- return π[o]*B[o][olist[0]]
- }
- // Initialize
- SS.times{tp << 1}
- β[olist.size()-1] = tp
- if(olist.size()>1)
- {
- for(int i=olist.size()-2; i>=0; i--)
- {
- tp = []
- SS.times{ ps->
- double p = 0
- SS.times{ cs->
- p += (double)A[ps][cs] * B[cs][olist[i+1]] * β[i+1][cs]
- }
- tp << p
- }
- β[i]=tp
- }
- }
- double p = 0;
- SS.times{
- p += (double)π[it] * B[it][olist[0]] * β[0][it];
- }
- return p;
- }
- double forward(List
olist) - {
- def α = []
- def tp = []
- if(olist.size()==0) return 0
- else if(olist.size()==1)
- {
- def o = olist[0]
- return π[o]*B[o][olist[0]]
- }
- // Initialize
- //printf "\t[Test] o=%d\n", olist[0]
- SS.times{it->
- tp << (double)π[it]*B[it][olist[0]]
- }
- α << tp
- if(olist.size()>1)
- {
- def tolist = olist[1..-1]
- tolist.each{ o->
- tp = []
- SS.times{ps->
- double prob=0
- SS.times{ cs->
- prob+=(double)α[-1][ps]*A[ps][cs]*B[cs][o]
- }
- tp << prob
- }
- α << tp
- }
- }
- double prob=0
- def lo = olist[-1]
- SS.times{ps->
- /*SS.times{cs->
- prob+=(double)α[-1][ps]*A[ps][cs]*B[cs][lo]
- }*/
- prob+=(double)α[-1][ps]
- }
- return prob
- }
- List
run(List O) - {
- def S = [] // Best State Transition
- def PV = [] // Previous Viter Result
- def PVP = [] // Previous Viter Result Path
- def t=1
- double mv=Integer.MIN_VALUE // max viter value
- def ms=0
- O.each{ o->
- if(shift>0) o-=shift
- double cp
- mv=Integer.MIN_VALUE
- ms=0
- def V = [] // Current Viter Result
- def VS = [:] // Current Viter/State map
- SS.times { s->
- dprintf("\tδ%d(%d)=max P(q%d=%d|o%d=%d)\n", t, s+shift, t, s+shift, t, o+shift)
- if(t>1)
- {
- dprintf "\t\t=max δ%d(i)*ai%d*b%d(%d)\n", t-1, s+shift, s+shift, o+shift
- dprintf "\t\t=max ("
- double tmv=Integer.MIN_VALUE
- def tms=0
- SS.times {
- def tv = PV[it]+logSum(A[it][s],B[s][o])
- //def tv = PV[it]*A[it][s]*B[s][o]
- dprintf "δ%d(%d)*a%d%d*b%d(%d)=%.04f ", t-1, it+shift, it+shift, s+shift, s+shift, o+shift, tv
- if(tv>tmv)
- {
- tmv = tv
- tms = it
- }
- }
- VS[s]=tms
- cp = tmv
- dprintf ")=%.03f (%d)\n", cp, tms
- }
- else
- {
- cp = logSum(π[s],B[s][o])
- //cp = π[s]*B[s][o]
- dprintf "\t\t=π%d b%d(%d)=%.03f\n", s+shift, s+shift, o+shift, cp
- }
- V[s] = cp
- if(cp>mv)
- {
- mv = cp
- ms = s
- }
- }
- dprintf "\tPickup S=%d at t(%d)!\n", ms+shift, t
- PVP << VS
- PV=V
- t++
- }
- S << ms+shift
- for(int i=(PVP.size()-1); i>=1; i--)
- {
- ms = PVP[i][ms]
- S << ms+shift
- }
- S = S.reverse()
- dprintf "\tFinal State Sequence: %s (Prop.=%.09f)\n", S.join("->"), Math.exp(mv)
- return S
- }
- List
run2(List O) - {
- def S = [] // Best State Transition
- def PV = [] // Previous Viter Result
- def PVP = [] // Previous Viter Result Path
- def t=1
- double mv=Integer.MIN_VALUE // max viter value
- def ms=0
- O.each{ o->
- if(shift>0) o-=shift
- double cp
- mv=Integer.MIN_VALUE
- ms=0
- def V = [] // Current Viter Result
- def VS = [:] // Current Viter/State map
- SS.times { s->
- dprintf("\tδ%d(%d)=max P(q%d=%d|o%d=%d)\n", t, s+shift, t, s+shift, t, o+shift)
- if(t>1)
- {
- dprintf "\t\t=max δ%d(i)*ai%d*b%d(%d)\n", t-1, s+shift, s+shift, o+shift
- dprintf "\t\t=max ("
- double tmv=0
- def tms=0
- SS.times {
- def tv = PV[it]*A[it][s]*B[s][o]
- dprintf "δ%d(%d)*a%d%d*b%d(%d)=%.04f ", t-1, it+shift, it+shift, s+shift, s+shift, o+shift, tv
- if(tv>tmv)
- {
- tmv = tv
- tms = it
- }
- }
- VS[s]=tms
- cp = tmv
- dprintf ")=%.03f (%d)\n", cp, tms+shift
- }
- else
- {
- cp = π[s]*B[s][o]
- dprintf "\t\t=π%d b%d(%d)=%.03f\n", s+shift, s+shift, o+shift, cp
- }
- V[s] = cp
- if(cp>mv)
- {
- mv = cp
- ms = s
- }
- }
- dprintf "\tPickup S=%d at t(%d)!\n", ms+shift, t
- PVP << VS
- PV=V
- t++
- }
- //printf "\t[Test] %d\n", ms
- S << ms+shift
- for(int i=(PVP.size()-1); i>=1; i--)
- {
- ms = PVP[i][ms]
- //printf "\t[Test] %d (%d)\n", ms, i
- S << ms+shift
- }
- S = S.reverse()
- dprintf "\tFinal State Sequence: %s (Prop.=%.06f)\n", S.join("->"), mv
- return S
- }
- }
- def A = []
- def B = []
- def π = []
- def SM = [:]
- def OM = [:]
- SM[0]='Sunny'
- SM[1]='Rainy'
- OM[0]='Walk'
- OM[1]='Shop'
- OM[2]='Clean'
- π[0] = 0.4; π[1] = 0.6
- A[0] = [0.6, 0.4]
- A[1] = [0.3, 0.7]
- B[0] = [0.6, 0.3, 0.1]
- B[1] = [0.1, 0.4, 0.5]
- ViterbiAlg va = new ViterbiAlg()
- va.setPM(A, B, π)
- def O = [0, 0, 1, 2] // Walk > Walk > Shop > Clean
- def S = va.run(O)
- printf "%s\n", S.collect { SM[it]}.join(" > ")
This message was edited 35 times. Last update was at 12/05/2014 15:42:17
你的transition matrix寫錯了
回覆刪除