1. 程式人生 > >統計學習方法_隱馬爾可夫模型HMM實現

統計學習方法_隱馬爾可夫模型HMM實現

這裡用到的資料集是三角波,使用長度20的序列訓練100次,生成長度為100的序列。HMM的初始化非常重要,這裡採用隨機初始化。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import csv
import random

import numpy as np
import matplotlib.pyplot as plt

class HMM(object):
	def __init__(self, N, M):
		self.A = np.zeros((N, N))  # 狀態轉移概率矩陣
		self.B = np.zeros((N, M))  # 觀測概率矩陣
		self.Pi = np.array([1.0 / N] * N)  # 初始狀態概率矩陣

		self.N = N  # 可能的狀態數
		self.M = M  # 可能的觀測數

	def init(self):
		'''
			隨機生成A,B,Pi
			並且保證每行相加為1
		'''
		for i in range(self.N):
			# 這裡的100只是隨便選取的數,為了生成概率
			randomlist = [random.randint(0, 100) for t in range(self.N)]
			Sum = sum(randomlist)
			for j in range(self.N):
				self.A[i][j] = randomlist[j] / Sum

		for i in range(self.N):
			randomlist = [random.randint(0, 100) for t in range(self.M)]
			Sum = sum(randomlist)
			for j in range(self.M):
				self.B[i][j] = randomlist[j] / Sum

	def forward(self):
		'''
			前向演算法,從alpha定義理解,這裡的時刻是從0到T-1,書上是1到T
		'''
		self.alpha = np.zeros((self.T, self.N))

		# 公式(10.15)
		for i in range(self.N):
			self.alpha[0][i] = self.Pi[i] * self.B[i][self.O[0]]

		# 公式(10.16)
		for t in range(1, self.T):  # 時刻從1從T-1
			for i in range(self.N):
				sum = 0
				for j in range(self.N):
					sum += self.alpha[t - 1][j] * self.A[j][i]
				self.alpha[t][i] = sum * self.B[i][self.O[t]]

	def backward(self):
		'''
			後向演算法,從beta定義理解
		'''
		self.beta = np.zeros((self.T, self.N))

		# 公式(10.19)
		for i in range(self.N):
			self.beta[self.T - 1][i] = 1  # 規定最終時刻到達狀態qi概率為1

		# 公式(10.20)
		for t in range(self.T - 2, -1, -1):
			for i in range(self.N):
				for j in range(self.N):  # 遍歷狀態qj
					self.beta[t][i] += self.A[i][j] * self.B[j][self.O[t + 1]] * self.beta[t + 1][j]

	def cal_ksi(self, i, j, t):
		'''
			公式(10.26)
		'''
		numerator = self.alpha[t][i] * self.A[i][j] * self.B[j][self.O[t + 1]] * self.beta[t + 1][j]
		denominator = 0

		for i in range(self.N):
			for j in range(self.N):
				denominator += self.alpha[t][i] * self.A[i][j] * self.B[j][self.O[t + 1]] * self.beta[t + 1][j]

		return numerator / denominator

	def cal_gama(self, i, t):
		'''
			公式(10.24)
		'''
		numerator = self.alpha[t][i] * self.beta[t][i]
		denominator = 0

		for j in range(self.N):
			denominator += self.alpha[t][j] * self.beta[t][j]

		return numerator / denominator

	def train(self, O, MaxSteps=100):
		'''
			Baum-Welch演算法估計HMM模型中的引數
			無需知道隱藏狀態是哪些,只需要知道隱藏狀態一共有多少
		'''
		self.T = len(O)  # T是時間,也是觀察序列的長度
		self.O = O  # O是觀察序列

		self.init()  # 初始化

		step = 0
		while step < MaxSteps:  #  遞推
			step += 1
			# print('step = ', step)
			tmp_A = np.zeros((self.N, self.N))
			tmp_B = np.zeros((self.N, self.M))
			tmp_pi = np.array([0.0] * self.N)

			self.forward()  # 前向演算法,生成alpha
			self.backward()  # 後向演算法,生成beta

			# 計算aij的引數估計,公式(10.39)
			for i in range(self.N):
				for j in range(self.N):
					numerator = 0.0  # 分子
					denominator = 0.0  # 分母
					for t in range(self.T - 1):
						numerator += self.cal_ksi(i, j, t)
						denominator += self.cal_gama(i, t)
					tmp_A[i][j] = numerator / denominator

			# 計算bjk的引數估計,公式(10.40)
			for j in range(self.N):
				for k in range(self.M):
					numerator = 0.0
					denominator = 0.0
					for t in range(self.T):
						if k == self.O[t]:
							numerator += self.cal_gama(j, t)
						denominator += self.cal_gama(j, t)
					tmp_B[j][k] = numerator / denominator

			# 計算(pi)i的引數估計,公式(10.41)
			for i in range(self.N):
				tmp_pi[i] = self.cal_gama(i, 0)

			self.A = tmp_A
			self.B = tmp_B
			self.Pi = tmp_pi

	def generate(self, length):
		I = []  # 生成的狀態序列

		# 首先生成初始狀態
		ran = random.randint(0, 1000) / 1000.0
		i = 0
		while self.Pi[i] < ran or self.Pi[i] < 0.0001:
			ran -= self.Pi[i]
			i += 1
		I.append(i)

		# 生成狀態序列
		for i in range(1, length):
			last = I[-1]
			ran = random.randint(0, 1000) / 1000.0
			j = 0
			while self.A[last][j] < ran or self.A[last][j] < 0.0001:
				ran -= self.A[last][j]
				j += 1
			I.append(j)
		print(I)

		Y = []  # 生成的觀測序列
		for i in range(length):
			k = 0
			ran = random.randint(0, 1000) / 1000.0
			while self.B[I[i]][k] < ran or self.B[I[i]][k] < 0.0001:
				ran -= self.B[I[i]][k]
				k += 1
			Y.append(k)

		return Y

def triangle(length):
	'''
		三角波
	'''
	X = [i for i in range(length)]
	Y = []

	for x in X:
		x = x % 6
		if x <= 3:
			Y.append(x)
		else:
			Y.append(6 - x)
	return X, Y

def show_data(x, y):
	plt.plot(x, y, 'g')
	plt.show()

if __name__ == '__main__':
	hmm = HMM(10, 4)  # 10個狀態,4個觀察
	tri_x, tri_y = triangle(20)
	print(tri_x)
	print(tri_y)

	# 隱藏序列就是X軸座標序列
	# 觀察序列就是Y軸座標序列
	hmm.train(tri_y)
	y = hmm.generate(100)  # 利用HMM生成100個數
	print(y)
	x = [i for i in range(100)]
	show_data(x, y)