用PyTorch製作一個簡易短語情緒分析模型




所有用到的東西都會放在我的GitHub上
https://github.com/p208p2002/JOSPON-with-Pytorch

資料集和預訓練模型都放在這
https://github.com/p208p2002/JOSPON-with-Pytorch/releases

一些word2vector的資料參考了,建議先看過此篇
http://zake7749.github.io/2016/08/28/word2vec-with-gensim/

這次的目標是建立一個模型,用於判定輸入語句的正、反面情緒

取得文字向量

如果需要將文字進行機器學習,我們需要先將文字進行各種轉換或是標記,簡單來說就是數值化

這邊我先使用了jieba將我們的input斷詞,然後使用word2vec取得了各單詞的向量,最後加總平均當作句向量

"""
W2V_SV.py
使用預訓練的w2v模型取得句子向量
default size : 200
"""

from gensim import models
import jieba
import numpy as np
class W2VS():
    def __init__(self):
        """
        初始化加載
        """
        jieba.set_dictionary('dict/dict.txt.big')
        jieba.load_userdict('dict/my_dict')
        jieba.initialize()
        self.model = models.Word2Vec.load('w2vmodel/word2vec.model')
    
    def getSenVec(self,sentence):
        """
        取得單詞向量
        單詞向量相加平均
        返回向量(句子向量)
        """
        senCut = list(jieba.cut(sentence))
        lenOfCut = len(senCut)
        vecSum = np.zeros(200)
        for i in senCut:
            try:
                vec = self.model.wv.__getitem__(i)
                vecSum = np.add(vecSum, vec)
            except Exception as e:
                # print(e)
                lenOfCut -= 1
                continue
        if(lenOfCut == 0 ):
            return np.array([0]*200)
        divisor = np.array([lenOfCut]*200)
        return np.divide(vecSum, divisor)
        
if __name__ == "__main__":
    w2vs = W2VS()
    print(w2vs.getSenVec("今天天氣很好"))

準備訓練資料

這次用到的資料是一些關於外賣評論的資料

來源: https://github.com/SophonPlus/ChineseNlpCorpus

使用前面的 W2VS.py 將資料讀入,並且轉換成句向量,存成.pkl供稍後使用

# -*- coding: UTF-8 -*-
"""
make_w2v_set.py
轉換資料集到句子向量
save as pickle
"""
from W2V_SV import W2VS
import pickle
import csv

if __name__ == "__main__":
    w2vs = W2VS()
    sentencesDict = {}
    with open('dataset/waimai_10k_tw.csv',newline='') as f:
        rows = csv.reader(f)
        for i,row in enumerate(rows):
            if(row[0] == 'label'):
                continue
            line = row[1].strip('\n')
            sVec = w2vs.getSenVec(line)
            # sentencesDict.append(sVec)
            sentencesDict[str(i-1)] = (sVec,row[0])
    with open('dataset/waimai_10k_tw.pkl','wb') as f:
        pickle.dump(sentencesDict,f)
    print("finish")

定義神經網路模型

接著使用PyTorch開始建立我們自己的神經網路模型

# -*- coding: UTF-8 -*-
"""
JWP.py
PyTorch類神經網路模型
"""

import torch
import torch.nn as nn
import torch.nn.functional as F

class JWP(nn.Module):
    def __init__(self, n_feature, n_hidden,n_hidden2, n_output):
        super(JWP, self).__init__()
        self.hidden = nn.Linear(n_feature, n_hidden)
        self.hidden2 = nn.Linear(n_hidden, n_hidden2)
        self.out = nn.Linear(n_hidden2, n_output)
        
    def forward(self, x, apply_sigmoid=False):
        x = F.relu(self.hidden(x).squeeze())
        x = F.relu(self.hidden2(x).squeeze())
        # 
        if(apply_sigmoid):
            x = torch.sigmoid(self.out(x))
        else:
            x = self.out(x)

        return x

這是一個有兩個隱藏層的模型,中間兩層使用Relu做輸出函數。

apply_sigmoid參數是給最後訓練完成應用時使用的,訓練時不應該在output使用sigmoid

訓練模型

將資料分成:
1. 訓練用正反面評論各3000則
2. 測試用正反面評論各1000則

"""
jwp_train_bce.py
"""
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
from argparse import Namespace
import numpy as np
import pickle
from JWP import JWP
from argparse import Namespace

args = Namespace(
    dataset_file = 'dataset/waimai_10k_tw.pkl',
    model_save_path='torchmodel/pytorch_bce.model',
    # Training hyper parameters
    batch_size = 200,
    learning_rate = 0.009,
    min_learning_rate = 0.001,
    num_epochs=50,
)

with open(args.dataset_file,'rb') as f:
    waimai10k = pickle.load(f)

"""
訓練資料3K
"""
POSTIVE_COMMENT_STRAT = 0
NEGATIVE_COMMENT_START = 4000
postiveAns = torch.ones([3000,1],dtype=torch.float)
negativeAns = torch.zeros([3000,1],dtype=torch.float)
postiveComments = []
negativeComments = []

for i in range(POSTIVE_COMMENT_STRAT, POSTIVE_COMMENT_STRAT + 3000):
    vec,ans = waimai10k[str(i)]
    postiveComments.append(vec)
postiveComments = torch.FloatTensor(postiveComments)

for i in range(NEGATIVE_COMMENT_START, NEGATIVE_COMMENT_START + 3000):
    vec,ans = waimai10k[str(i)]
    negativeComments.append(vec)
negativeComments = torch.FloatTensor(negativeComments)

trainData = torch.cat((postiveComments,negativeComments))
trainDataAns = torch.cat((postiveAns,negativeAns))
trainDataSet = Data.TensorDataset(trainData, trainDataAns)

trainDataLoader = Data.DataLoader(
    dataset = trainDataSet,
    batch_size = args.batch_size,
    shuffle = True,
    num_workers = 4
)

"""
測試資料 1K
"""
T_POSTIVE_COMMENT_STRAT = 3000
T_NEGATIVE_COMMENT_START = 7000
t_postiveAns = torch.ones([1000,1],dtype=torch.float)
t_negativeAns = torch.zeros([1000,1],dtype=torch.float)
t_postiveComments = []
t_negativeComments = []

for i in range(T_POSTIVE_COMMENT_STRAT, T_POSTIVE_COMMENT_STRAT + 1000):
    vec,ans = waimai10k[str(i)]
    t_postiveComments.append(vec)
t_postiveComments = torch.FloatTensor(t_postiveComments)

for i in range(T_NEGATIVE_COMMENT_START, T_NEGATIVE_COMMENT_START + 1000):
    vec,ans = waimai10k[str(i)]
    t_negativeComments.append(vec)
t_negativeComments = torch.FloatTensor(t_negativeComments)

testData = torch.cat((t_postiveComments,t_negativeComments))
testDataAns = torch.cat((t_postiveAns,t_negativeAns))
testDataSet = Data.TensorDataset(testData, testDataAns)

testDataLoader = Data.DataLoader(
    dataset = testDataSet,
    batch_size = args.batch_size,
    shuffle = True,
    num_workers = 4
)

lr = args.learning_rate
min_lr = args.min_learning_rate
def adjust_learning_rate(optimizer, epoch):
    """
    調整學習率
    """
    global lr
    if epoch % 10 == 0 and epoch != 0:
        lr = lr * 0.65
        if(lr < min_lr):
            lr = min_lr
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

def compute_accuracy(y_pred, y_target):
    """
    計算正確率
    """
    y_target = y_target.cpu()
    y_pred_indices = (torch.sigmoid(y_pred)>0.5).cpu().long()#.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100
    

if __name__ == "__main__":
    EPOCH = args.num_epochs
    net = JWP(200,150,100,1)
    print(net)

    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    loss_func = torch.nn.BCEWithLogitsLoss()

    for t in range(EPOCH):
        """
        動態調整學習率
        """
        adjust_learning_rate(optimizer,t)

        """
        Train phase
        """
        net.train() # 訓練模式        
        TrainAcc = 0.0
        TrainLoss = 0.0
        # Train batch
        for step,(batchData, batchTarget) in enumerate(trainDataLoader):
            optimizer.zero_grad() # 梯度歸零
            out = net(batchData)
            trainAcc = compute_accuracy(out,batchTarget.long()) # 取得正確率
            TrainAcc = TrainAcc + trainAcc
            loss = loss_func(out,batchTarget) # loss 計算
            TrainLoss = TrainLoss + loss
            loss.backward() # 反向傳播
            optimizer.step() # 更新權重
        TrainLoss = TrainLoss / (step+1) # epoch loss
        TrainAcc = TrainAcc / (step+1) # epoch acc

        """
        Eval phase
        """
        net.eval()
        TestAcc = 0.0
        TestLoss = 0.0
        # Eval batch
        for step,(t_batchData, t_batchTarget) in enumerate(trainDataLoader):            
            t_out = net(t_batchData)
            testAcc = compute_accuracy(t_out,t_batchTarget.long())
            TestAcc = TestAcc + testAcc
            t_loss = loss_func(t_out,t_batchTarget)
            TestLoss = TestLoss + t_loss
        TestLoss = TestLoss / (step+1)
        TestAcc = TestAcc / (step+1)
        
        """
        Result
        """
        print(
            "epoch:",t+1 ,
            "train_loss:",round(TrainLoss.item(),3),
            "train_acc:",round(TrainAcc,3),
            "test_loss:",round(TestLoss.item(),3),
            "test_acc:",round(TestAcc,3),
            "LR:",lr
        )
    
    torch.save(net, args.model_save_path)
    print('model save')

這邊稍微複雜一些,但是前面都在處理資料載入的部分,可以從
if __name__ == "__main__":
這邊開始看起

net = JWP(200,150,100,1)
這是將剛我們剛剛定義的神經網路初始化出來使用的方法
後面的參數分別代表:
1. 200:輸入數量,因為當初w2v的size設定在200,表示每一個單詞用200維度空間表示
2. 150:隱藏層1的輸入
3. 100:隱藏層2的輸入
4. 1 :最後的輸出結果只要一個,0可表負面,1表正面

loss_func = torch.nn.BCEWithLogitsLoss()
因為我們輸出只有1個,所以選用binary cross entropy
若是設定多個輸出用categorical cross entropy

搞定,可以開始訓練了!

測試

建立一支測試檔案玩玩吧

"""
demo.py
評價句子正反面情緒
"""

import torch
from JWP import JWP
from gensim.models.doc2vec import Doc2Vec
import torch.nn.functional as F
import torch
import numpy as np
from W2V_SV import W2VS

print("init...")
w2vs = W2VS()
net = torch.load('torchmodel/pytorch_bce.model')
net.eval()
# test_data
while True:
    ts = input("輸入評價:")
    v1 = w2vs.getSenVec(ts)
    res = net(torch.FloatTensor(v1), apply_sigmoid = True)
    out = res
    res = res.clone().detach().numpy()[0]
    print(round(res,3))

    if(res>0.5):
        print("正面")
    else:
        print("反面")
輸入評價:東西好吃
1.0
正面

輸入評價:東西難吃
0.0
反面

輸入評價:速度很快,還不錯吃
1.0
正面

輸入評價:動作慢,不太好吃
0.389
反面

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

這個網站採用 Akismet 服務減少垃圾留言。進一步瞭解 Akismet 如何處理網站訪客的留言資料