首页 > 技术文章 > Bilstm中文微博多情感分析

xiaofengzai 2021-12-15 23:21 原文

Bilstm中文微博多情感分析

数据

我的数据是来自github的一个项目:ChineseNlpCorpus 里面收集了蛮多用于自然语言处理的中文数据集/语料。

下载地址: 百度网盘
数据概览: 36 万多条,带情感标注 新浪微博,包含 4 种情感,其中喜悦约 20 万条,愤怒、厌恶、低落各约 5 万条
数据来源: 新浪微博
原数据集: 微博情感分析数据集,网上搜集,具体作者、来源不详

预处理

划分训练集和测试集

将表中的各类情感的数据各抽取前10000条做测试集,剩余的用作训练集

import pandas as pd
import openpyxl
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE


def test():

    file='simplifyweibo_4_moods.csv'
    data=pd.read_csv(file)
    label=data.get('label')
    review = data.get('review')
    train_review=[]
    train_label=[]
    test_review=[]
    test_label=[]
    n1=n2=n3=n4=0
    for i in range(len(review)):
        lab=int(label[i])
        line=str(review[i])
        line=ILLEGAL_CHARACTERS_RE.sub(r'', line)
        if int(lab)==0:
            if n1<10000:
                n1+=1
                test_label.append(lab)
                test_review.append(line)
            else:
                train_label.append(lab)
                train_review.append(line)
        elif int(lab)==1:
            if n2<10000:
                n2+=1
                test_label.append(lab)
                test_review.append(line)
            else:
                train_label.append(lab)
                train_review.append(line)
        elif int(lab)==2:
            if n3<10000:
                n3+=1
                test_label.append(lab)
                test_review.append(line)
            else:
                train_label.append(lab)
                train_review.append(line)
        elif int(lab)==3:
            if n4<10000:
                n4+=1
                test_label.append(lab)
                test_review.append(line)
            else:
                train_label.append(lab)
                train_review.append(line)
    import openpyxl
    import xlsxwriter

    xl = openpyxl.Workbook()
    # 调用对象的add_sheet方法
    sheet1 = xl.create_sheet(index=0)
    sheet1.cell(1, 1, "label")
    sheet1.cell(1, 2, "review")

    for i in range(0, len(train_review)):
        sheet1.cell(i + 2, 1, train_label[i])
        sheet1.cell(i + 2, 2, train_review[i])
    xl.save("train.xlsx")


if __name__ == '__main__':
    test()

得到对应的文件

 

 

生成字典

数据中的一些标点符号、特殊符号、英文字母、数字等对于我们的实验都是没有用处的,所以我们需要将他们过滤掉。

去除停用词

def tokenlize(sentence):
    """
    进行文本分词
    :param sentence: str
    :return: [str,str,str]
    """

    fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '',
                '', ]
    sentence = re.sub("|".join(fileters), "", sentence)
    sentence=jieba.cut(sentence,cut_all=False)
    sentence=' '.join(sentence)
    result = [i for i in sentence.split(" ") if len(i) > 0]
    result=movestopwords(result)
    return result

def stopwordslist(filepath):
    stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
    return stopwords


# 对句子去除停用词
def movestopwords(sentence):
    stopwords = stopwordslist('data/stopwords.txt')  # 这里加载停用词的路径
    outstr = []
    for word in sentence:
        if word not in stopwords:
            if word != '\t' and '\n':
                outstr.append(word)
                # outstr += " "
    return outstr

设计字典类

"""
文本序列化
"""


class Vocab:
    UNK_TAG = "<UNK>"  # 表示未知字符
    PAD_TAG = "<PAD>"  # 填充符
    PAD = 0
    UNK = 1

    def __init__(self):
        self.dict = {  # 保存词语和对应的数字
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.count = {}  # 统计词频的

    def fit(self, sentence):
        """
        接受句子,统计词频
        :param sentence:[str,str,str]
        :return:None
        """
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频

    def build_vocab(self, min_count=1, max_count=None, max_features=None):
        """
        根据条件构造 词典
        :param min_count:最小词频
        :param max_count: 最大词频
        :param max_features: 最大词语数
        :return:
        """
        if min_count is not None:
            self.count = {word: count for word, count in self.count.items() if count >= min_count}
        if max_count is not None:
            self.count = {word: count for word, count in self.count.items() if count <= max_count}
        if max_features is not None:
            # [(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])

        for word in self.count:
            self.dict[word] = len(self.dict)  # 每次word对应一个数字

        # 把dict进行翻转
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))

    def transform(self, sentence, max_len=None):
        """
        把句子转化为数字序列
        :param sentence:[str,str,str]
        :return: [int,int,int]
        """
        if len(sentence) > max_len:
            sentence = sentence[:max_len]
        else:
            sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD

        return [self.dict.get(i, 1) for i in sentence]

    def inverse_transform(self, incides):
        """
        把数字序列转化为字符
        :param incides: [int,int,int]
        :return: [str,str,str]
        """
        return [self.inverse_dict.get(i, "<UNK>") for i in incides]

    def __len__(self):
        return len(self.dict)

# # 以下是调试代码
# if __name__ == '__main__':
#     sentences = [["今天", "天气", "很", "好"],
#                  ["今天", "去", "吃", "什么"]]
#     ws = Vocab()
#     for sentence in sentences:
#         # 统计词频
#         ws.fit(sentence)
#     # 构造词典
#     ws.build_vocab(min_count=1)
#     print(ws.dict)
#     # 把句子转换成数字序列
#     ret = ws.transform(["好", "好", "好", "好", "好", "好", "好", "热", "呀"], max_len=13)
#     print(ret)
#     # 把数字序列转换成句子
#     ret = ws.inverse_transform(ret)
#     print(ret)
#     pass

dataset

# -*-coding:utf-8-*-
import os
import pickle
import re
import zipfile
import jieba

from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import pandas as pd

class ImdbDataset(Dataset):
    def __init__(self, train=True):
        if train == True:
            url = 'data/train.xlsx'
        else:
            url = "data/test.xlsx"
        data = pd.read_excel(url)
        sentence = data.get('review')
        label = data.get('label')
        self.sentence_list=sentence
        self.label_list=label



    def __getitem__(self, idx):
        line_text=self.sentence_list[idx]
        # 从txt获取评论并分词
        review = tokenlize(str(line_text))
        # 获取评论对应的label
        label = int(self.label_list[idx])
        return review, label

    def __len__(self):
        return len(self.sentence_list)


def tokenlize(sentence):
    """
    进行文本分词
    :param sentence: str
    :return: [str,str,str]
    """

    fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '',
                '', ]
    sentence = re.sub("|".join(fileters), "", sentence)
    sentence=jieba.cut(sentence,cut_all=False)
    sentence=' '.join(sentence)
    result = [i for i in sentence.split(" ") if len(i) > 0]
    result=movestopwords(result)
    return result

def stopwordslist(filepath):
    stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
    return stopwords


# 对句子去除停用词
def movestopwords(sentence):
    stopwords = stopwordslist('data/stopwords.txt')  # 这里加载停用词的路径
    outstr = []
    for word in sentence:
        if word not in stopwords:
            if word != '\t' and '\n':
                outstr.append(word)
                # outstr += " "
    return outstr


# 以下为调试代码
def collate_fn(batch):
    """
    对batch数据进行处理
    :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
    :return: 元组
    """
    reviews, labels = zip(*batch)

    return reviews, labels



if __name__ == "__main__":
    from 情感分析.imdb_sentiment.vocab import Vocab
    imdb_dataset = ImdbDataset(True)
    my_dataloader = DataLoader(imdb_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    for review,label in my_dataloader:
        vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
        print(review[0])
        result = vocab_model.transform(review[0], 30)
        print(result)
        break

构建字典

# -*-coding:utf-8-*-
import pickle

from tqdm import tqdm

from 情感分析.weibo_many_emotion import dataset
# from 情感分析.imdb_sentiment.vocab import Vocab
from torch.utils.data import DataLoader

class Vocab:
    UNK_TAG = "<UNK>"  # 表示未知字符
    PAD_TAG = "<PAD>"  # 填充符
    PAD = 0
    UNK = 1

    def __init__(self):
        self.dict = {  # 保存词语和对应的数字
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.count = {}  # 统计词频的

    def fit(self, sentence):
        """
        接受句子,统计词频
        :param sentence:[str,str,str]
        :return:None
        """
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频

    def build_vocab(self, min_count=1, max_count=None, max_features=None):
        """
        根据条件构造 词典
        :param min_count:最小词频
        :param max_count: 最大词频
        :param max_features: 最大词语数
        :return:
        """
        if min_count is not None:
            self.count = {word: count for word, count in self.count.items() if count >= min_count}
        if max_count is not None:
            self.count = {word: count for word, count in self.count.items() if count <= max_count}
        if max_features is not None:
            # [(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])

        for word in self.count:
            self.dict[word] = len(self.dict)  # 每次word对应一个数字

        # 把dict进行翻转
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))

    def transform(self, sentence, max_len=None):
        """
        把句子转化为数字序列
        :param sentence:[str,str,str]
        :return: [int,int,int]
        """
        if len(sentence) > max_len:
            sentence = sentence[:max_len]
        else:
            sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD

        return [self.dict.get(i, 1) for i in sentence]

    def inverse_transform(self, incides):
        """
        把数字序列转化为字符
        :param incides: [int,int,int]
        :return: [str,str,str]
        """
        return [self.inverse_dict.get(i, "<UNK>") for i in incides]

    def __len__(self):
        return len(self.dict)

def collate_fn(batch):
    """
    对batch数据进行处理
    :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
    :return: 元组
    """
    reviews, labels = zip(*batch)

    return reviews, labels



def get_dataloader(train=True):
    imdb_dataset = dataset.ImdbDataset(train)
    my_dataloader = DataLoader(imdb_dataset, batch_size=200, shuffle=True, collate_fn=collate_fn)
    return my_dataloader


if __name__ == '__main__':

    ws = Vocab()
    dl_train = get_dataloader(True)
    dl_test = get_dataloader(False)
    for reviews, label in tqdm(dl_train, total=len(dl_train)):
        for sentence in reviews:
            ws.fit(sentence)
    for reviews, label in tqdm(dl_test, total=len(dl_test)):
        for sentence in reviews:
            ws.fit(sentence)
    ws.build_vocab()
    print(len(ws))

    pickle.dump(ws, open("./models/vocab.pkl", "wb"))

模型训练

# -*-coding:utf-8-*-
import pickle

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import DataLoader
from tqdm import tqdm

from 情感分析.weibo_many_emotion import dataset
from 情感分析.中文情感分类.vocab import Vocab

train_batch_size = 512
test_batch_size = 128
voc_model = pickle.load(open("./models/vocab.pkl", "rb"))
sequence_max_len = 100


def collate_fn(batch):
    """
    对batch数据进行处理
    :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
    :return: 元组
    """
    reviews, labels = zip(*batch)
    reviews = torch.LongTensor([voc_model.transform(i, max_len=sequence_max_len) for i in reviews])
    labels = torch.LongTensor(labels)
    return reviews, labels


def get_dataloader(train=True):
    imdb_dataset = dataset.ImdbDataset(train)
    batch_size = train_batch_size if train else test_batch_size
    return DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)


class ImdbModel(nn.Module):
    def __init__(self):
        super(ImdbModel, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=len(voc_model), embedding_dim=200, padding_idx=voc_model.PAD).to()
        self.lstm = nn.LSTM(input_size=200, hidden_size=64, num_layers=2, batch_first=True, bidirectional=True,
                            dropout=0.1)
        self.fc1 = nn.Linear(64 * 2, 64)
        self.fc2 = nn.Linear(64, 4)

    def forward(self, input):
        """
        :param input:[batch_size,max_len]
        :return:
        """
        input_embeded = self.embedding(input)  # input embeded :[batch_size,max_len,200]

        output, (h_n, c_n) = self.lstm(input_embeded)  # h_n :[4,batch_size,hidden_size]
        # out :[batch_size,hidden_size*2]
        out = torch.cat([h_n[-1, :, :], h_n[-2, :, :]], dim=-1)  # 拼接正向最后一个输出和反向最后一个输出

        # 进行全连接
        out_fc1 = self.fc1(out)
        # 进行relu
        out_fc1_relu = F.relu(out_fc1)

        # 全连接
        out_fc2 = self.fc2(out_fc1_relu)  # out :[batch_size,2]
        return F.log_softmax(out_fc2, dim=-1)


def device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')


def train(imdb_model, epoch):
    """

    :param imdb_model:
    :param epoch:
    :return:
    """
    train_dataloader = get_dataloader(train=True)


    optimizer = Adam(imdb_model.parameters())
    for i in range(epoch):
        bar = tqdm(train_dataloader, total=len(train_dataloader))
        for idx, (data, target) in enumerate(bar):
            optimizer.zero_grad()
            data = data.to(device())
            target = target.to(device())
            output = imdb_model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            bar.set_description("epcoh:{}  idx:{}   loss:{:.6f}".format(i, idx, loss.item()))
    torch.save(imdb_model, 'lstm_model.pkl')


def test(imdb_model):
    """
    验证模型
    :param imdb_model:
    :return:
    """
    test_loss = 0
    correct = 0
    imdb_model.eval()
    test_dataloader = get_dataloader(train=False)
    with torch.no_grad():
        for data, target in tqdm(test_dataloader):
            data = data.to(device())
            target = target.to(device())
            output = imdb_model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
            correct += pred.eq(target.data.view_as(pred)).sum()
    test_loss /= len(test_dataloader.dataset)
    print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
        test_loss, correct, len(test_dataloader.dataset),
        100. * correct / len(test_dataloader.dataset)))


def xlftest():
    import numpy as np
    model = torch.load('lstm_model.pkl')
    model.to(device())
    from 情感分析.weibo_many_emotion.dataset import tokenlize
    lines=['哈哈哈开心','真是无语,你们怎么搞的','小姐姐,祝你生日快乐','你他妈的有病']
    for line in lines:
        print(line)
        review = tokenlize(line)
        # review=tokenlize(line)
        vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
        result = vocab_model.transform(review,sequence_max_len)
        # print(result)
        data = torch.LongTensor(result).to(device())
        data=torch.reshape(data,(1,sequence_max_len)).to(device())
        # print(data.shape)
        output = model(data)
        print(output.data)
        pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
        print(pred.item())
        if pred.item() == 0:
            print("喜悦")
        elif pred.item() == 1:
            print("愤怒")
        elif pred.item() == 2:
            print("厌恶")
        elif pred.item() == 3:
            print("低落")

if __name__ == '__main__':
    # imdb_model = ImdbModel().to(device())
    # train(imdb_model,20)
    # test(imdb_model)
    xlftest()

测试结果:

对四种分类的准确度只有40左右,一部分原因是数据集不规范,还需要进行调参优化

 

 

 

推荐阅读