【Python例項第11講】文字的核外分類
機器學習訓練營——機器學習愛好者的自由交流空間(qq 群號:696721295)
這個例子顯示scikit-learn怎樣進行OOC
(out-of-core)分類。所謂核外方法(OOC approach), 指的是從未經記憶體的資料學習。在這裡,我們利用一個支援partial_fit
方法的線上分類器學習。為了確保特徵空間在不同的時刻仍是相同的,我們利用HashingVectorizer
, 它把每個例子投射到相同的特徵空間。這在文字分類,當新特徵出現在每一批次的例子裡時,是特別有用的。本例使用的資料集Reuters-21578
來自UCI機器學習資料庫(UCI ML repository).
圖結果反映了分類器的學習曲線:分類準確率隨著最小批量的變化情況。這裡的分類準確率是在前1000個樣本組成的驗證集上測量的。為了限制記憶體消耗,在把例子送進學習器之前,我們把它們排隊成固定的數量。
資料集介紹
Reuters-21578文字類別資料集,收集自1987年的路透社(英國最大的通訊社)文件,這些文件被按類別編目。該資料集整理後包括21,578個例項,5個屬性。這5個屬性分別表示5份檔案,列出了每個文件裡所有合法目錄的名字。
例項詳解
首先,載入必需的庫。
# Authors: Eustache Diemert <[email protected]> # @FedericoV <https://github.com/FedericoV/> # License: BSD 3 clause from __future__ import print_function from glob import glob import itertools import os.path import re import tarfile import time import sys import numpy as np import matplotlib.pyplot as plt from matplotlib import rcParams from sklearn.externals.six.moves import html_parser from sklearn.externals.six.moves.urllib.request import urlretrieve from sklearn.datasets import get_data_home from sklearn.feature_extraction.text import HashingVectorizer from sklearn.linear_model import SGDClassifier from sklearn.linear_model import PassiveAggressiveClassifier from sklearn.linear_model import Perceptron from sklearn.naive_bayes import MultinomialNB def _not_in_sphinx(): # Hack to detect whether we are running by the sphinx builder return '__file__' in globals()
Reuters資料集有關程式
類 ReutersParser
定義一個ReutersParser
類,它是一個工具類,作用是從語法上分割SGML(標準通用標記語言,定義獨立於平臺和應用的文字文件格式)檔案,一次產生一個文件。
class ReutersParser(html_parser.HTMLParser): """Utility class to parse a SGML file and yield documents one at a time.""" def __init__(self, encoding='latin-1'): html_parser.HTMLParser.__init__(self) self._reset() self.encoding = encoding def handle_starttag(self, tag, attrs): method = 'start_' + tag getattr(self, method, lambda x: None)(attrs) def handle_endtag(self, tag): method = 'end_' + tag getattr(self, method, lambda: None)() def _reset(self): self.in_title = 0 self.in_body = 0 self.in_topics = 0 self.in_topic_d = 0 self.title = "" self.body = "" self.topics = [] self.topic_d = "" def parse(self, fd): self.docs = [] for chunk in fd: self.feed(chunk.decode(self.encoding)) for doc in self.docs: yield doc self.docs = [] self.close() def handle_data(self, data): if self.in_body: self.body += data elif self.in_title: self.title += data elif self.in_topic_d: self.topic_d += data def start_reuters(self, attributes): pass def end_reuters(self): self.body = re.sub(r'\s+', r' ', self.body) self.docs.append({'title': self.title, 'body': self.body, 'topics': self.topics}) self._reset() def start_title(self, attributes): self.in_title = 1 def end_title(self): self.in_title = 0 def start_body(self, attributes): self.in_body = 1 def end_body(self): self.in_body = 0 def start_topics(self, attributes): self.in_topics = 1 def end_topics(self): self.in_topics = 0 def start_d(self, attributes): self.in_topic_d = 1 def end_d(self): self.in_topic_d = 0 self.topics.append(self.topic_d) self.topic_d = ""
函式 stream_reuters_documents
函式stream_reuters_documents
迭代Reuters資料集文件。它有一個引數data_path
, 表示資料集的本地位置路徑,預設值是None
. 如果該引數取預設值,那麼將自動從UCI資料庫下載並解壓縮。文件用字典型物件表示,有三個鍵:‘body’, ‘title’ and ‘topics’.
def stream_reuters_documents(data_path=None):
"""Iterate over documents of the Reuters dataset.
The Reuters archive will automatically be downloaded and uncompressed if
the `data_path` directory does not exist.
Documents are represented as dictionaries with 'body' (str),
'title' (str), 'topics' (list(str)) keys.
"""
DOWNLOAD_URL = ('http://archive.ics.uci.edu/ml/machine-learning-databases/'
'reuters21578-mld/reuters21578.tar.gz')
ARCHIVE_FILENAME = 'reuters21578.tar.gz'
if data_path is None:
data_path = os.path.join(get_data_home(), "reuters")
if not os.path.exists(data_path):
"""Download the dataset."""
print("downloading dataset (once and for all) into %s" %
data_path)
os.mkdir(data_path)
def progress(blocknum, bs, size):
total_sz_mb = '%.2f MB' % (size / 1e6)
current_sz_mb = '%.2f MB' % ((blocknum * bs) / 1e6)
if _not_in_sphinx():
sys.stdout.write(
'\rdownloaded %s / %s' % (current_sz_mb, total_sz_mb))
archive_path = os.path.join(data_path, ARCHIVE_FILENAME)
urlretrieve(DOWNLOAD_URL, filename=archive_path,
reporthook=progress)
if _not_in_sphinx():
sys.stdout.write('\r')
print("untarring Reuters dataset...")
tarfile.open(archive_path, 'r:gz').extractall(data_path)
print("done.")
parser = ReutersParser()
for filename in glob(os.path.join(data_path, "*.sgm")):
for doc in parser.parse(open(filename, 'rb')):
yield doc
主程式
主程式產生向量物件,限制特徵數的上限到一個合理的值。
vectorizer 物件
vectorizer = HashingVectorizer(decode_error='ignore', n_features=2 ** 18,
alternate_sign=False)
# Iterator over parsed Reuters SGML files.
data_stream = stream_reuters_documents()
# We learn a binary classification between the "acq" class and all the others.
# "acq" was chosen as it is more or less evenly distributed in the Reuters
# files. For other datasets, one should take care of creating a test set with
# a realistic portion of positive instances.
all_classes = np.array([0, 1])
positive_class = 'acq'
# Here are some classifiers that support the `partial_fit` method
partial_fit_classifiers = {
'SGD': SGDClassifier(max_iter=5),
'Perceptron': Perceptron(tol=1e-3),
'NB Multinomial': MultinomialNB(alpha=0.01),
'Passive-Aggressive': PassiveAggressiveClassifier(tol=1e-3),
}
函式 get_minibatch
函式get_minibatch
規定提取例項的最小批次數量,返回一個元組物件X_text, y.
def get_minibatch(doc_iter, size, pos_class=positive_class):
"""Extract a minibatch of examples, return a tuple X_text, y.
Note: size is before excluding invalid docs with no topics assigned.
"""
data = [(u'{title}\n\n{body}'.format(**doc), pos_class in doc['topics'])
for doc in itertools.islice(doc_iter, size)
if doc['topics']]
if not len(data):
return np.asarray([], dtype=int), np.asarray([], dtype=int)
X_text, y = zip(*data)
return X_text, np.asarray(y, dtype=int)
函式 iter_minibatches
函式iter_minibatches
作為一個最小批次生成器。
def iter_minibatches(doc_iter, minibatch_size):
"""Generator of minibatches."""
X_text, y = get_minibatch(doc_iter, minibatch_size)
while len(X_text):
yield X_text, y
X_text, y = get_minibatch(doc_iter, minibatch_size)
檢驗統計量
產生檢驗統計量,檢驗1000個文件,估計準確率。
# test data statistics
test_stats = {'n_test': 0, 'n_test_pos': 0}
# First we hold out a number of examples to estimate accuracy
n_test_documents = 1000
tick = time.time()
X_test_text, y_test = get_minibatch(data_stream, 1000)
parsing_time = time.time() - tick
tick = time.time()
X_test = vectorizer.transform(X_test_text)
vectorizing_time = time.time() - tick
test_stats['n_test'] += len(y_test)
test_stats['n_test_pos'] += sum(y_test)
print("Test set is %d documents (%d positive)" % (len(y_test), sum(y_test)))
函式 progress
函式progress
報告檢驗的過程資訊,返回一個字串。
def progress(cls_name, stats):
"""Report progress information, return a string."""
duration = time.time() - stats['t0']
s = "%20s classifier : \t" % cls_name
s += "%(n_train)6d train docs (%(n_train_pos)6d positive) " % stats
s += "%(n_test)6d test docs (%(n_test_pos)6d positive) " % test_stats
s += "accuracy: %(accuracy).3f " % stats
s += "in %.2fs (%5d docs/s)" % (duration, stats['n_train'] / duration)
return s
cls_stats = {}
for cls_name in partial_fit_classifiers:
stats = {'n_train': 0, 'n_train_pos': 0,
'accuracy': 0.0, 'accuracy_history': [(0, 0)], 't0': time.time(),
'runtime_history': [(0, 0)], 'total_fit_time': 0.0}
cls_stats[cls_name] = stats
get_minibatch(data_stream, n_test_documents)
丟棄檢驗集
我們將1000個文件的最小批次送進分類器學習,這意味著,在任何時候記憶體至多有1000個文件。文件批次規模越小,偏擬合方法的相對間接消耗就越大。
# We will feed the classifier with mini-batches of 1000 documents; this means
# we have at most 1000 docs in memory at any time. The smaller the document
# batch, the bigger the relative overhead of the partial fit methods.
minibatch_size = 1000
# Create the data_stream that parses Reuters SGML files and iterates on
# documents as a stream.
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0
主迴圈
在主迴圈裡,迭代分類一個最小批次的例子。
# Main loop : iterate on mini-batches of examples
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
tick = time.time()
X_train = vectorizer.transform(X_train_text)
total_vect_time += time.time() - tick
for cls_name, cls in partial_fit_classifiers.items():
tick = time.time()
# update estimator with examples in the current mini-batch
cls.partial_fit(X_train, y_train, classes=all_classes)
# accumulate test accuracy stats
cls_stats[cls_name]['total_fit_time'] += time.time() - tick
cls_stats[cls_name]['n_train'] += X_train.shape[0]
cls_stats[cls_name]['n_train_pos'] += sum(y_train)
tick = time.time()
cls_stats[cls_name]['accuracy'] = cls.score(X_test, y_test)
cls_stats[cls_name]['prediction_time'] = time.time() - tick
acc_history = (cls_stats[cls_name]['accuracy'],
cls_stats[cls_name]['n_train'])
cls_stats[cls_name]['accuracy_history'].append(acc_history)
run_history = (cls_stats[cls_name]['accuracy'],
total_vect_time + cls_stats[cls_name]['total_fit_time'])
cls_stats[cls_name]['runtime_history'].append(run_history)
if i % 3 == 0:
print(progress(cls_name, cls_stats[cls_name]))
if i % 3 == 0:
print('\n')
程式部分輸出:
視覺化結果
def plot_accuracy(x, y, x_legend):
"""Plot accuracy as a function of x."""
x = np.array(x)
y = np.array(y)
plt.title('Classification accuracy as a function of %s' % x_legend)
plt.xlabel('%s' % x_legend)
plt.ylabel('Accuracy')
plt.grid(True)
plt.plot(x, y)
rcParams['legend.fontsize'] = 10
cls_names = list(sorted(cls_stats.keys()))
# Plot accuracy evolution
plt.figure()
for _, stats in sorted(cls_stats.items()):
# Plot accuracy evolution with #examples
accuracy, n_examples = zip(*stats['accuracy_history'])
plot_accuracy(n_examples, accuracy, "training examples (#)")
ax = plt.gca()
ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc='best')
plt.figure()
for _, stats in sorted(cls_stats.items()):
# Plot accuracy evolution with runtime
accuracy, runtime = zip(*stats['runtime_history'])
plot_accuracy(runtime, accuracy, 'runtime (s)')
ax = plt.gca()
ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc='best')
# Plot fitting times
plt.figure()
fig = plt.gcf()
cls_runtime = []
for cls_name, stats in sorted(cls_stats.items()):
cls_runtime.append(stats['total_fit_time'])
cls_runtime.append(total_vect_time)
cls_names.append('Vectorization')
bar_colors = ['b', 'g', 'r', 'c', 'm', 'y']
ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5,
color=bar_colors)
ax.set_xticks(np.linspace(0.25, len(cls_names) - 0.75, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel('runtime (s)')
ax.set_title('Training Times')
def autolabel(rectangles):
"""attach some text vi autolabel on rectangles."""
for rect in rectangles:
height = rect.get_height()
ax.text(rect.get_x() + rect.get_width() / 2.,
1.05 * height, '%.4f' % height,
ha='center', va='bottom')
autolabel(rectangles)
plt.show()
# Plot prediction times
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
cls_runtime.append(stats['prediction_time'])
cls_runtime.append(parsing_time)
cls_names.append('Read/Parse\n+Feat.Extr.')
cls_runtime.append(vectorizing_time)
cls_names.append('Hashing\n+Vect.')
ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5,
color=bar_colors)
ax.set_xticks(np.linspace(0.25, len(cls_names) - 0.75, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel('runtime (s)')
ax.set_title('Prediction Times (%d instances)' % n_test_documents)
autolabel(rectangles)
plt.show()
閱讀更多精彩內容,請關注微信公眾號:統計學習與大資料