ESIM計算文字相似度
阿新 • • 發佈:2021-02-09
ESIM計算文字相似度
#3折,連線層進行了改變,x1,x2,x3,x4,連線的地方進行改變,最後在連線
import datetime
starttime=datetime.datetime.now()
from keras.layers import *
import math
from keras.models import Model
import numpy as np
from keras.utils.np_utils import to_categorical
from keras.preprocessing.text import Tokenizer#分詞器
from keras. preprocessing.sequence import pad_sequences
import pdb
from keras import backend as K
from keras.engine.topology import Layer
from keras import layers, models, optimizers
from keras.layers.merge import concatenate
import os
import sys
import jieba
jieba.load_userdict('D:\研究生期間\python程式碼\文字處理\外部資料集/dict_all.txt' )
from gensim.models import word2vec
import pandas as pd
import logging
from keras.preprocessing import sequence
import imblearn
from imblearn.over_sampling import RandomOverSampler
from keras.activations import softmax
def load_data(data_file,max_len):
tokenizer = Tokenizer(num_words=None)
datas = pd.read_csv(data_file,names=["index", "s1", "s2", "label"],encoding="utf-8",
header=None,sep="\t")
texts = []
words1_seg_list = []
words2_seg_list = []
label_list = datas['label'].tolist()
label_list_float = [float(l) for l in label_list]
s1_list = datas['s1'].tolist()
s2_list = datas['s2'].tolist()
for s in s1_list:
words = jieba.lcut(s, cut_all=False)
words1_seg_list.append(words)
for s in s2_list:
words = jieba.lcut(s, cut_all=False)
words2_seg_list.append(words)
texts.extend(words1_seg_list)
texts.extend(words2_seg_list)
tokenizer.fit_on_texts(texts)
#word轉換為id
words1_list_ids = tokenizer.texts_to_sequences(words1_seg_list)
words2_list_ids = tokenizer.texts_to_sequences(words2_seg_list)
#pading
words1_list_ids_pad = sequence.pad_sequences(words1_list_ids, maxlen = 30, padding='post')
words2_list_ids_pad = sequence.pad_sequences(words2_list_ids, maxlen = 30, padding='post')
#字典{word:id}
num_words_dict = tokenizer.word_index
print(len(num_words_dict))
return np.array(words1_list_ids_pad), np.array(words2_list_ids_pad), np.array(label_list_float), num_words_dict
s0, s1, labels, num_words_dict =load_data("D:\研究生期間\python程式碼\文字處理\螞蟻金服文字相似度計算\\all_data.csv",30)
word2vec_data = pd.read_csv("D:\研究生期間\python程式碼\文字處理\螞蟻金服文字相似度計算\\all_data.csv", encoding="utf-8", header=None, sep="\t")
#s0, s1, labels, num_words_dict =load_data("D:\研究生期間\python程式碼\文字處理\螞蟻金服文字相似度計算\\test.csv",30)
#word2vec_data = pd.read_csv("D:\研究生期間\python程式碼\文字處理\螞蟻金服文字相似度計算\\test.csv", encoding="utf-8", header=None, sep="\t")
sentence = word2vec_data[1] + ' ' + word2vec_data[2]
sentance = list(sentence)
## 對句子進行分詞
def segment_sen(sen):
sen_list = []
try:
sen_list = jieba.lcut(sen)
except:
pass
return sen_list
# 將資料變成gensim中 word2wec函式的資料格式
sens_list = [segment_sen(i) for i in sentance]
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
model = word2vec.Word2Vec(sens_list, sg=1,min_count=5, iter=5,size=300)
embeddings_index1={}
for word in model.wv.vocab.keys():
#vec_string = np.array2string(model.wv[word]).replace('\n','').replace(' ',',')
vec_string=np.asarray(model.wv[word],dtype='float32')
embeddings_index1[word]=vec_string
nb_words = min(20000, len(num_words_dict))
embedding_matrix = np.zeros((nb_words + 1, 300))
for word, i in num_words_dict.items():#word是詞,i是編號
#if i > 20000:
# continue
embedding_vector = embeddings_index1.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
from sklearn.model_selection import KFold
from keras import backend as K
from sklearn.metrics import f1_score
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import accuracy_score as accuarcy
kf = KFold(n_splits = 3, shuffle = True, random_state = 1024)
F1 = {}
ACC = {}
RECALL = {}
PRE = {}
#-----------------需要的函式-----------------------
def create_pretrained_embedding(pretrained_weights_path, trainable=False, **kwargs):
"Create embedding layer from a pretrained weights array"
pretrained_weights = np.load(pretrained_weights_path)
in_dim, out_dim = pretrained_weights.shape
embedding = Embedding(in_dim, out_dim, weights=[pretrained_weights], trainable=False, **kwargs)
return embedding
def soft_attention_alignment(input_1, input_2):
"Align text representation with neural soft attention"
attention = Dot(axes=-1)([input_1, input_2])
w_att_1 = Lambda(lambda x: softmax(x, axis=1))(attention)
w_att_2 = Permute((2,1))(Lambda(lambda x: softmax(x, axis=2))(attention))
in1_aligned = Dot(axes=1)([w_att_1, input_1])
in2_aligned = Dot(axes=1)([w_att_2, input_2])
return in1_aligned, in2_aligned
def apply_multiple(input_, layers):
"Apply layers to input then concatenate result"
if not len(layers) > 1:
raise ValueError('Layers list should contain more than 1 layer')
else:
agg_ = []
for layer in layers:
agg_.append(layer(input_))
out_ = Concatenate()(agg_)
return out_
def submult(input_1, input_2):
"Get multiplication and subtraction then concatenate results"
mult = Multiply()([input_1, input_2])
sub = substract(input_1, input_2)
out_= Concatenate()([sub, mult])
return out_
def substract(input_1, input_2):
"Substract element-wise"
neg_input_2 = Lambda(lambda x: -x)(input_2)
out_ = Add()([input_1, neg_input_2])
return out_
#--------------------------------------------------
for train_id, test_id in kf.split(s0):
xtrain_A, xtest_A = s0[train_id], s0[test_id]
xtrain_B, xtest_B = s1[train_id], s1[test_id]
ytrain, ytest = labels[train_id], labels[test_id]
#------------------過取樣------------------------------------
ros=RandomOverSampler(random_state=0)
Xtrain = np.hstack((xtrain_A, xtrain_B))
X_resample, y_resample = ros.fit_resample(Xtrain, ytrain)
Xtrain = np.hsplit(X_resample, 2)
xtrain_A=Xtrain[0]
xtrain_B=Xtrain[1]
ytrain=y_resample
# -------------------搭建網路-----------------------------------------
K.clear_session()
tweet_a = Input(shape=(30,))
tweet_b = Input(shape=(30,))
tweet_input = Input(shape=(30,))
#-----------------embedding--------------------
embedding_layer = Embedding(nb_words + 1, 300, input_length=30,
weights=[embedding_matrix], trainable=False)
bn = BatchNormalization(axis=2)
embedded_sequences_1 = bn(embedding_layer(tweet_a))
embedded_sequences_2 = bn(embedding_layer(tweet_b))
#-----------------encode-------------------------
encode = Bidirectional(LSTM(100, return_sequences=True))
encode_sequences_1 = encode(embedded_sequences_1)
encode_sequences_2 = encode(embedded_sequences_2)
#------------------attention----------------------
alignd_sequences_1, alignd_sequences_2 = soft_attention_alignment(encode_sequences_1, encode_sequences_2)
#----------------compose--------------------------
combined_sequences_1 = Concatenate()(
[encode_sequences_1, alignd_sequences_2, submult(encode_sequences_1, alignd_sequences_2)])
combined_sequences_2 = Concatenate()(
[encode_sequences_2, alignd_sequences_1, submult(encode_sequences_2, alignd_sequences_1)])
compose = Bidirectional(LSTM(100, return_sequences=True))
compare_sequences_1 = compose(combined_sequences_1)
compare_sequences_2 = compose(combined_sequences_2)
#------------------aggregate------------------------
rep_sequences_1 = apply_multiple(compare_sequences_1, [GlobalAvgPool1D(), GlobalMaxPool1D()])
rep_sequences_2 = apply_multiple(compare_sequences_2, [GlobalAvgPool1D(), GlobalMaxPool1D()])
# -------------------classifier-----------------------
merged = Concatenate()([rep_sequences_1, rep_sequences_2])
dense = BatchNormalization()(merged)
dense = Dense(100, activation='elu')(dense)
dense = BatchNormalization()(dense)
dense = Dropout(0.5)(dense)
dense = Dense(50, activation='elu')(dense)
dense = BatchNormalization()(dense)
dense = Dropout(0.5)(dense)
predictions= Dense(1, activation='sigmoid')(dense)
model = Model(input=[tweet_a, tweet_b], output=predictions)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']) # 對模型進行編譯,
# model.summary()
#------------------------訓練+預測------------------------------------
model.fit([xtrain_A, xtrain_B], ytrain, batch_size=128,nb_epoch=3)
y_pred = model.predict([xtest_A, xtest_B])
# print('預測值:',y_pred)
print('max(y_pred):', max(y_pred))
sim = [0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95]
for j in sim:
y_pred1 = []
for i in y_pred:
if i > j:
y_pred1.append(1)
else:
y_pred1.append(0)
F1.setdefault(j, []).append(f1_score(ytest, y_pred1))
ACC.setdefault(j, []).append(accuarcy(ytest, y_pred1))
RECALL.setdefault(j, []).append(recall_score(ytest, y_pred1))
PRE.setdefault(j, []).append(precision_score(ytest, y_pred1))
#------------------計算各項指標-------------------------------------
def get_result(matrix):
result = {}
for key, value in matrix.items():
result[key] = round(np.mean(matrix[key]), 4)
# print(result)
return result
print('F1:\n', get_result(F1))
print('準確率:\n', get_result(ACC))
print('召回率:\n', get_result(RECALL))
print('精準率:\n', get_result(PRE))
endtime = datetime.datetime.now()
print ('執行時間:',(endtime - starttime).seconds)
f1:0.5327