13. 文本挖掘

中国谚语

文章的内容超出了预期。 --沈相龙

images/sen_word_freq.png

13.1. 文本采集

13.1.1. 图像到文本

  • 我的 img2txt 功能

def img2txt(img_dir):
    """
    convert images to text
    """
    import os, PythonMagick
    from datetime import datetime
    import PyPDF2

    from PIL import Image
    import pytesseract

    f = open('doc4img.txt','wa')
    for img in [img_file for img_file in os.listdir(img_dir)
                if (img_file.endswith(".png") or
                    img_file.endswith(".jpg") or
                    img_file.endswith(".jpeg"))]:

        start_time = datetime.now()

        input_img = img_dir + "/" + img

        print('--------------------------------------------------------------------')
        print(img)
        print('Converting ' + img +'.......')
        print('--------------------------------------------------------------------')

        # extract the text information from images
        text = pytesseract.image_to_string(Image.open(input_img))
        print(text)

        # ouput text file
        f.write( img + "\n")
        f.write(text.encode('utf-8'))


        print "CPU Time for converting" + img +":"+ str(datetime.now() - start_time) +"\n"
        f.write( "\n-------------------------------------------------------------\n")

    f.close()
  • 演示

我申请了我的 img2txt 图像的函数 Image folder .

image_dir = r"Image"

img2txt(image_dir)

然后我得到了以下结果:

--------------------------------------------------------------------
feng.pdf_0.png
Converting feng.pdf_0.png.......
--------------------------------------------------------------------
l I l w

Wenqiang Feng
Data Scientist
DST APPLIED ANALYTICS GROUP



Wenqiang Feng is Data Scientist for DST’s Applied Analytics Group. Dr. Feng’s responsibilities
include providing DST clients with access to cutting—edge skills and technologies, including Big
Data analytic solutions, advanced analytic and data enhancement techniques and modeling.

Dr. Feng has deep analytic expertise in data mining, analytic systems, machine learning
algorithms, business intelligence, and applying Big Data tools to strategically solve industry
problems in a cross—functional business. Before joining the DST Applied Analytics Group, Dr.
Feng holds a MA Data Science Fellow at The Institute for Mathematics and Its Applications
{IMA) at the University of Minnesota. While there, he helped startup companies make
marketing decisions based on deep predictive analytics.

Dr. Feng graduated from University of Tennessee, Knoxville with PhD in Computational
mathematics and Master’s degree in Statistics. He also holds Master’s degree in Computational
Mathematics at Missouri University of Science and Technology (MST) and Master’s degree in
Applied Mathematics at University of science and technology of China (USTC).
CPU Time for convertingfeng.pdf_0.png:0:00:02.061208

13.1.2. 图像增强到文本

  • 我的 img2txt_enhance 功能

def img2txt_enhance(img_dir,scaler):
    """
    convert images files to text
    """

    import numpy as np
    import os, PythonMagick
    from datetime import datetime
    import PyPDF2

    from PIL import Image, ImageEnhance, ImageFilter
    import pytesseract

    f = open('doc4img.txt','wa')
    for img in [img_file for img_file in os.listdir(img_dir)
                if (img_file.endswith(".png") or
                    img_file.endswith(".jpg") or
                    img_file.endswith(".jpeg"))]:

        start_time = datetime.now()

        input_img = img_dir + "/" + img
        enhanced_img = img_dir + "/" +"Enhanced" + "/"+ img

        im = Image.open(input_img) # the second one
        im = im.filter(ImageFilter.MedianFilter())
        enhancer = ImageEnhance.Contrast(im)
        im = enhancer.enhance(1)
        im = im.convert('1')
        im.save(enhanced_img)

        for scale in np.ones(scaler):
            im = Image.open(enhanced_img) # the second one
            im = im.filter(ImageFilter.MedianFilter())
            enhancer = ImageEnhance.Contrast(im)
            im = enhancer.enhance(scale)
            im = im.convert('1')
            im.save(enhanced_img)



        print('--------------------------------------------------------------------')
        print(img)
        print('Converting ' + img +'.......')
        print('--------------------------------------------------------------------')

        # extract the text information from images
        text = pytesseract.image_to_string(Image.open(enhanced_img))
        print(text)

        # ouput text file
        f.write( img + "\n")
        f.write(text.encode('utf-8'))


        print "CPU Time for converting" + img +":"+ str(datetime.now() - start_time) +"\n"
        f.write( "\n-------------------------------------------------------------\n")

    f.close()
  • 演示

我申请了我的 img2txt_enhance 以下噪声图像的函数 Enhance folder .

images/noised.jpg
image_dir = r"Enhance"

pdf2txt_enhance(image_dir)

然后我得到了以下结果:

--------------------------------------------------------------------
noised.jpg
Converting noised.jpg.......
--------------------------------------------------------------------
zHHH
CPU Time for convertingnoised.jpg:0:00:00.135465

而结果来自 img2txt 函数是

--------------------------------------------------------------------
noised.jpg
Converting noised.jpg.......
--------------------------------------------------------------------
,2 WW
CPU Time for convertingnoised.jpg:0:00:00.133508

这是不正确的。

13.1.3. 文本的PDF

  • 我的 pdf2txt 功能

def pdf2txt(pdf_dir,image_dir):
    """
    convert PDF to text
    """

    import os, PythonMagick
    from datetime import datetime
    import PyPDF2

    from PIL import Image
    import pytesseract

    f = open('doc.txt','wa')
    for pdf in [pdf_file for pdf_file in os.listdir(pdf_dir) if pdf_file.endswith(".pdf")]:

        start_time = datetime.now()

        input_pdf = pdf_dir + "/" + pdf

        pdf_im = PyPDF2.PdfFileReader(file(input_pdf, "rb"))
        npage = pdf_im.getNumPages()

        print('--------------------------------------------------------------------')
        print(pdf)
        print('Converting %d pages.' % npage)
        print('--------------------------------------------------------------------')

        f.write( "\n--------------------------------------------------------------------\n")

        for p in range(npage):

            pdf_file = input_pdf + '[' + str(p) +']'
            image_file =  image_dir  + "/" + pdf+ '_' + str(p)+ '.png'

            # convert PDF files to Images
            im = PythonMagick.Image()
            im.density('300')
            im.read(pdf_file)
            im.write(image_file)

            # extract the text information from images
            text = pytesseract.image_to_string(Image.open(image_file))

            #print(text)

            # ouput text file
            f.write( pdf + "\n")
            f.write(text.encode('utf-8'))


        print "CPU Time for converting" + pdf +":"+ str(datetime.now() - start_time) +"\n"

    f.close()
  • 演示

我申请了我的 pdf2txt 函数到我的扫描的bio pdf文件 pdf folder .

pdf_dir = r"pdf"
image_dir = r"Image"

pdf2txt(pdf_dir,image_dir)

然后我得到了以下结果:

--------------------------------------------------------------------
feng.pdf
Converting 1 pages.
--------------------------------------------------------------------
l I l w

Wenqiang Feng
Data Scientist
DST APPLIED ANALYTICS GROUP



Wenqiang Feng is Data Scientist for DST’s Applied Analytics Group. Dr. Feng’s responsibilities
include providing DST clients with access to cutting—edge skills and technologies, including Big
Data analytic solutions, advanced analytic and data enhancement techniques and modeling.

Dr. Feng has deep analytic expertise in data mining, analytic systems, machine learning
algorithms, business intelligence, and applying Big Data tools to strategically solve industry
problems in a cross—functional business. Before joining the DST Applied Analytics Group, Dr.
Feng holds a MA Data Science Fellow at The Institute for Mathematics and Its Applications
{IMA) at the University of Minnesota. While there, he helped startup companies make
marketing decisions based on deep predictive analytics.

Dr. Feng graduated from University of Tennessee, Knoxville with PhD in Computational
mathematics and Master’s degree in Statistics. He also holds Master’s degree in Computational
Mathematics at Missouri University of Science and Technology (MST) and Master’s degree in
Applied Mathematics at University of science and technology of China (USTC).
CPU Time for convertingfeng.pdf:0:00:03.143800

13.1.4. 音频到文本

  • 我的 audio2txt 功能

def audio2txt(audio_dir):
    ''' convert audio to text'''

    import speech_recognition as sr
    r = sr.Recognizer()

    f = open('doc.txt','wa')
    for audio_n in [audio_file for audio_file in os.listdir(audio_dir) \
                  if audio_file.endswith(".wav")]:

        filename = audio_dir + "/" + audio_n

        # Read audio data
        with sr.AudioFile(filename) as source:
            audio = r.record(source)  # read the entire audio file

        # Google Speech Recognition
        text = r.recognize_google(audio)

        # ouput text file
        f.write( audio_n + ": ")
        f.write(text.encode('utf-8'))
        f.write("\n")

        print('You said: ' + text)

    f.close()
  • 演示

我申请了我的 audio2txt 功能到我的音频记录 audio folder .

audio_dir = r"audio"

audio2txt(audio_dir)

然后我得到了以下结果:

You said: hello this is George welcome to my tutorial
You said: mathematics is important in daily life
You said: call me tomorrow
You said: do you want something to eat
You said: I want to speak with him
You said: nice to see you
You said: can you speak slowly
You said: have a good day

顺便说一下,您可以使用我下面的python代码来录制您自己的音频并播放 audio2txt 命令行中的函数 python record.py "demo2.wav"

import sys, getopt

import speech_recognition as sr

audio_filename = sys.argv[1]

r = sr.Recognizer()
with sr.Microphone() as source:
    r.adjust_for_ambient_noise(source)
    print("Hey there, say something, I am recording!")
    audio = r.listen(source)
    print("Done listening!")

with open(audio_filename, "wb") as f:
    f.write(audio.get_wav_data())

13.2. 文本预处理

  • 检查一行是否只包含空格

def check_blanks(data_str):
    is_blank = str(data_str.isspace())
    return is_blank
  • 确定文本内容的语言是否为英语:使用langid模块对语言进行分类,以确保对英语langid应用正确的清理操作。

def check_lang(data_str):
    predict_lang = langid.classify(data_str)
    if predict_lang[1] >= .9:
        language = predict_lang[0]
    else:
        language = 'NA'
    return language
  • 删除特征

def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 3 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 2:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 2:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    return cleaned_str
  • 删除停止字

def remove_stops(data_str):
    # expects a string
    stops = set(stopwords.words("english"))
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stops:
            # rebuild cleaned_str
            if list_pos == 0:
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str
  • 标记文本

def tag_and_remove(data_str):
    cleaned_str = ' '
    # noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    # verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags

    # break string into 'words'
    text = data_str.split()

    # tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            cleaned_str += tagged_word[0] + ' '

    return cleaned_str
  • 勒马石化

def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str

定义pyspark中的预处理函数

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp

check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())

13.3. 文本分类

从理论上讲,您可以应用任何分类算法来进行分类。我将只介绍朴素的贝叶斯方法如下。

images/text_classification.png

13.3.1. 介绍

13.3.2. 演示

  1. 创建Spark上下文

import pyspark
from pyspark.sql import SQLContext

# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
  1. 负载数据集

# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("../data/raw_data.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))

# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))

#Create DataFrame
data_df = sqlContext.createDataFrame(typed_rdd, ["text", "id", "label"])

# get the raw columns
raw_cols = data_df.columns

#data_df.show()
data_df.printSchema()
root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
+--------------------+------------------+-----+
|                text|                id|label|
+--------------------+------------------+-----+
|Fresh install of ...|        1018769417|  1.0|
|Well. Now I know ...|       10284216536|  1.0|
|"Literally six we...|       10298589026|  1.0|
|Mitsubishi i MiEV...|109017669432377344|  1.0|
+--------------------+------------------+-----+
only showing top 4 rows
  1. 设置pyspark udf函数

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp

# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())
  1. 语言识别

lang_df = data_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")
en_df.show(4)
+--------------------+------------------+-----+----+
|                text|                id|label|lang|
+--------------------+------------------+-----+----+
|RT @goeentertain:...|665305154954989568|  1.0|  en|
|Teforia Uses Mach...|660668007975268352|  1.0|  en|
|   Apple TV or Roku?|       25842461136|  1.0|  en|
|Finished http://t...|        9412369614|  1.0|  en|
+--------------------+------------------+-----+----+
only showing top 4 rows
  1. 删除停止词

rm_stops_df = en_df.select(raw_cols)\
                   .withColumn("stop_text", remove_stops_udf(en_df["text"]))
rm_stops_df.show(4)
+--------------------+------------------+-----+--------------------+
|                text|                id|label|           stop_text|
+--------------------+------------------+-----+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0|RT @goeentertain:...|
|Teforia Uses Mach...|660668007975268352|  1.0|Teforia Uses Mach...|
|   Apple TV or Roku?|       25842461136|  1.0|      Apple TV Roku?|
|Finished http://t...|        9412369614|  1.0|Finished http://t...|
+--------------------+------------------+-----+--------------------+
only showing top 4 rows
  1. 删除不相关的功能

rm_features_df = rm_stops_df.select(raw_cols+["stop_text"])\
                            .withColumn("feat_text", \
                            remove_features_udf(rm_stops_df["stop_text"]))
rm_features_df.show(4)
+--------------------+------------------+-----+--------------------+--------------------+
|                text|                id|label|           stop_text|           feat_text|
+--------------------+------------------+-----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0|RT @goeentertain:...|  future blase   ...|
|Teforia Uses Mach...|660668007975268352|  1.0|Teforia Uses Mach...|teforia uses mach...|
|   Apple TV or Roku?|       25842461136|  1.0|      Apple TV Roku?|         apple  roku|
|Finished http://t...|        9412369614|  1.0|Finished http://t...|            finished|
+--------------------+------------------+-----+--------------------+--------------------+
only showing top 4 rows
  1. 标记单词

tagged_df = rm_features_df.select(raw_cols+["feat_text"]) \
                          .withColumn("tagged_text", \
                           tag_and_remove_udf(rm_features_df.feat_text))

tagged_df.show(4)
+--------------------+------------------+-----+--------------------+--------------------+
|                text|                id|label|           feat_text|         tagged_text|
+--------------------+------------------+-----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0|  future blase   ...| future blase vic...|
|Teforia Uses Mach...|660668007975268352|  1.0|teforia uses mach...| teforia uses mac...|
|   Apple TV or Roku?|       25842461136|  1.0|         apple  roku|         apple roku |
|Finished http://t...|        9412369614|  1.0|            finished|           finished |
+--------------------+------------------+-----+--------------------+--------------------+
only showing top 4 rows
  1. 词的引理化

lemm_df = tagged_df.select(raw_cols+["tagged_text"]) \
                   .withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))
lemm_df.show(4)
+--------------------+------------------+-----+--------------------+--------------------+
|                text|                id|label|         tagged_text|           lemm_text|
+--------------------+------------------+-----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0| future blase vic...|future blase vice...|
|Teforia Uses Mach...|660668007975268352|  1.0| teforia uses mac...|teforia use machi...|
|   Apple TV or Roku?|       25842461136|  1.0|         apple roku |          apple roku|
|Finished http://t...|        9412369614|  1.0|           finished |              finish|
+--------------------+------------------+-----+--------------------+--------------------+
only showing top 4 rows
  1. 删除空行并删除重复项

    check_blanks_df = lemm_df.select(raw_cols+["lemm_text"])\
                             .withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
    # remove blanks
    no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")

# drop duplicates
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])

    dedup_df.show(4)
+--------------------+------------------+-----+--------------------+--------+
|                text|                id|label|           lemm_text|is_blank|
+--------------------+------------------+-----+--------------------+--------+
|RT @goeentertain:...|665305154954989568|  1.0|future blase vice...|   False|
|Teforia Uses Mach...|660668007975268352|  1.0|teforia use machi...|   False|
|   Apple TV or Roku?|       25842461136|  1.0|          apple roku|   False|
|Finished http://t...|        9412369614|  1.0|              finish|   False|
+--------------------+------------------+-----+--------------------+--------+
only showing top 4 rows
  1. 添加UNIUQID

from pyspark.sql.functions import monotonically_increasing_id
# Create Unique ID
dedup_df = dedup_df.withColumn("uid", monotonically_increasing_id())
dedup_df.show(4)
+--------------------+------------------+-----+--------------------+--------+------------+
|                text|                id|label|           lemm_text|is_blank|         uid|
+--------------------+------------------+-----+--------------------+--------+------------+
|              dragon|        1546813742|  1.0|              dragon|   False| 85899345920|
|           hurt much|        1558492525|  1.0|           hurt much|   False|111669149696|
|seth blog word se...|383221484023709697|  1.0|seth blog word se...|   False|128849018880|
|teforia use machi...|660668007975268352|  1.0|teforia use machi...|   False|137438953472|
+--------------------+------------------+-----+--------------------+--------+------------+
only showing top 4 rows
  1. 创建最终数据集

data = dedup_df.select('uid','id', 'text','label')
data.show(4)
+------------+------------------+--------------------+-----+
|         uid|                id|                text|label|
+------------+------------------+--------------------+-----+
| 85899345920|        1546813742|              dragon|  1.0|
|111669149696|        1558492525|           hurt much|  1.0|
|128849018880|383221484023709697|seth blog word se...|  1.0|
|137438953472|660668007975268352|teforia use machi...|  1.0|
+------------+------------------+--------------------+-----+
only showing top 4 rows
  1. 创建训练和测试集

# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
  1. NaiveBayes管道

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import CountVectorizer

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
# vectorizer = CountVectorizer(inputCol= "words", outputCol="rawFeatures")
idf = IDF(minDocFreq=3, inputCol="rawFeatures", outputCol="features")

# Naive Bayes model
nb = NaiveBayes()

# Pipeline Architecture
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)
  1. 做出预测

predictions = model.transform(testData)

# Select example rows to display.
predictions.select("text", "label", "prediction").show(5,False)
+-----------------------------------------------+-----+----------+
|text                                           |label|prediction|
+-----------------------------------------------+-----+----------+
|finish                                         |1.0  |1.0       |
|meet rolo dogsofthinkgeek happy nationaldogday |1.0  |1.0       |
|pumpkin family                                 |1.0  |1.0       |
|meet jet dogsofthinkgeek happy nationaldogday  |1.0  |1.0       |
|meet vixie dogsofthinkgeek happy nationaldogday|1.0  |1.0       |
+-----------------------------------------------+-----+----------+
only showing top 5 rows
  1. 评价

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
0.912655971479501

13.4. 情绪分析

images/sen_class.png

13.4.1. 介绍

Sentiment analysis (有时被称为意见挖掘或情绪人工智能)是指使用自然语言处理、文本分析、计算语言学和生物特征来系统地识别、提取、量化和研究情感状态和主观信息。情感分析广泛应用于客户的声音材料,如评论和调查回复、在线和社交媒体,以及医疗保健材料,应用范围从市场营销到客户服务再到临床医学。

一般来说,情绪分析的目的是 决定态度 指演说者、作家或其他主题关于某个主题或整个语境的极性或对某个文档、互动或事件的情感反应。态度可以是一种判断或评价(见评价理论)、情感状态(即作者或演讲者的情感状态)或预期的情感交流(即作者或对话者预期的情感效果)。

商业中的情绪分析,也称为意见挖掘,是一个根据文本所传达的语调来识别和编目文本的过程。它具有广泛的应用:

  • 商务智能构建中的情绪分析

  • 企业竞争优势的情绪分析

  • 通过业务中的情绪分析提升客户体验

13.4.2. 管道

images/sentiment_analysis_pipeline.png

情绪分析管道

13.4.3. 演示

  1. 设置Spark上下文和SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Sentiment Analysis example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
  1. 负载数据集

df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
            load("../data/newtwitter.csv",header=True);
+--------------------+----------+-------+
|                text|        id|pubdate|
+--------------------+----------+-------+
|10 Things Missing...|2602860537|  18536|
|RT @_NATURALBWINN...|2602850443|  18536|
|RT @HBO24 yo the ...|2602761852|  18535|
|Aaaaaaaand I have...|2602738438|  18535|
|can I please have...|2602684185|  18535|
+--------------------+----------+-------+
only showing top 5 rows
  1. 文本预处理

  • 删除非ASCII字符

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag
import string
import re

# remove non ASCII characters
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)
# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())

检查:

df = df.withColumn('text_non_asci',strip_non_ascii_udf(df['text']))
df.show(5,True)

输出:

+--------------------+----------+-------+--------------------+
|                text|        id|pubdate|       text_non_asci|
+--------------------+----------+-------+--------------------+
|10 Things Missing...|2602860537|  18536|10 Things Missing...|
|RT @_NATURALBWINN...|2602850443|  18536|RT @_NATURALBWINN...|
|RT @HBO24 yo the ...|2602761852|  18535|RT @HBO24 yo the ...|
|Aaaaaaaand I have...|2602738438|  18535|Aaaaaaaand I have...|
|can I please have...|2602684185|  18535|can I please have...|
+--------------------+----------+-------+--------------------+
only showing top 5 rows
  • 固定缩写

# fixed abbreviation
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

fix_abbreviation_udf = udf(fix_abbreviation, StringType())
检查:
df = df.withColumn('fixed_abbrev',fix_abbreviation_udf(df['text_non_asci']))
df.show(5,True)

输出:

+--------------------+----------+-------+--------------------+--------------------+
|                text|        id|pubdate|       text_non_asci|        fixed_abbrev|
+--------------------+----------+-------+--------------------+--------------------+
|10 Things Missing...|2602860537|  18536|10 Things Missing...|10 things missing...|
|RT @_NATURALBWINN...|2602850443|  18536|RT @_NATURALBWINN...|@_naturalbwinner ...|
|RT @HBO24 yo the ...|2602761852|  18535|RT @HBO24 yo the ...|@hbo24 yo the #ne...|
|Aaaaaaaand I have...|2602738438|  18535|Aaaaaaaand I have...|aaaaaaaand i have...|
|can I please have...|2602684185|  18535|can I please have...|can i please have...|
+--------------------+----------+-------+--------------------+--------------------+
only showing top 5 rows
  • 删除不相关的功能

def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned_str.split())
# setup pyspark udf function
remove_features_udf = udf(remove_features, StringType())
检查:
df = df.withColumn('removed',remove_features_udf(df['fixed_abbrev']))
df.show(5,True)

输出:

+--------------------+----------+-------+--------------------+--------------------+--------------------+
|                text|        id|pubdate|       text_non_asci|        fixed_abbrev|             removed|
+--------------------+----------+-------+--------------------+--------------------+--------------------+
|10 Things Missing...|2602860537|  18536|10 Things Missing...|10 things missing...|things missing in...|
|RT @_NATURALBWINN...|2602850443|  18536|RT @_NATURALBWINN...|@_naturalbwinner ...|oh and do not lik...|
|RT @HBO24 yo the ...|2602761852|  18535|RT @HBO24 yo the ...|@hbo24 yo the #ne...|yo the newtwitter...|
|Aaaaaaaand I have...|2602738438|  18535|Aaaaaaaand I have...|aaaaaaaand i have...|aaaaaaaand have t...|
|can I please have...|2602684185|  18535|can I please have...|can i please have...|can please have t...|
+--------------------+----------+-------+--------------------+--------------------+--------------------+
only showing top 5 rows
  1. 情绪分析主要功能

from pyspark.sql.types import FloatType

from textblob import TextBlob

def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

sentiment_analysis_udf = udf(sentiment_analysis , FloatType())
df  = df.withColumn("sentiment_score", sentiment_analysis_udf( df['removed'] ))
df.show(5,True)
  • 情感评分

+--------------------+---------------+
|             removed|sentiment_score|
+--------------------+---------------+
|things missing in...|    -0.03181818|
|oh and do not lik...|    -0.03181818|
|yo the newtwitter...|      0.3181818|
|aaaaaaaand have t...|     0.11818182|
|can please have t...|     0.13636364|
+--------------------+---------------+
only showing top 5 rows
  • 词频

images/sen_word_freq.png
  • 情绪分类

def condition(r):
    if (r >=0.1):
        label = "positive"
    elif(r <= -0.1):
        label = "negative"
    else:
        label = "neutral"
    return label

sentiment_udf = udf(lambda x: condition(x), StringType())
  1. 产量

  • 情感类

images/sen_class.png
  • 每个情绪班的头条微博

+--------------------+---------------+---------+
|                text|sentiment_score|sentiment|
+--------------------+---------------+---------+
|and this #newtwit...|            1.0| positive|
|"RT @SarahsJokes:...|            1.0| positive|
|#newtwitter using...|            1.0| positive|
|The #NewTwitter h...|            1.0| positive|
|You can now undo ...|            1.0| positive|
+--------------------+---------------+---------+
only showing top 5 rows
+--------------------+---------------+---------+
|                text|sentiment_score|sentiment|
+--------------------+---------------+---------+
|Lists on #NewTwit...|           -0.1|  neutral|
|Too bad most of m...|           -0.1|  neutral|
|the #newtwitter i...|           -0.1|  neutral|
|Looks like our re...|           -0.1|  neutral|
|i switched to the...|           -0.1|  neutral|
+--------------------+---------------+---------+
only showing top 5 rows
+--------------------+---------------+---------+
|                text|sentiment_score|sentiment|
+--------------------+---------------+---------+
|oh. #newtwitter i...|           -1.0| negative|
|RT @chqwn: #NewTw...|           -1.0| negative|
|Copy that - its W...|           -1.0| negative|
|RT @chqwn: #NewTw...|           -1.0| negative|
|#NewTwitter has t...|           -1.0| negative|
+--------------------+---------------+---------+
only showing top 5 rows

13.5. n-克和相关性

13.6. 主题模型:潜在dirichlet分配

images/topic_time.png

13.6.1. 介绍

在文本挖掘中,主题模型是用于发现文档集合中出现的抽象“主题”的无监督模型。

潜在dirichlet分配(lda)是一种同时估计这两个主题的数学方法:找到与每个主题相关联的单词的混合,同时确定描述每个文档的主题的混合。

13.6.2. 演示

  1. 加载数据

rawdata = spark.read.load("../data/airlines.csv", format="csv", header=True)
rawdata.show(5)
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
only showing top 5 rows
  1. 文本预处理

我将使用以下原始列名称来保持表的简洁性:

raw_cols =  rawdata.columns
raw_cols
['id', 'airline', 'date', 'location', 'rating', 'cabin', 'value', 'recommended', 'review']
rawdata = rawdata.dropDuplicates(['review'])
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, DoubleType, DateType

from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag
import langid
import string
import re
  • 删除非ASCII字符

# remove non ASCII characters
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)
  • 检查是否为空行

# check to see if a row only contains whitespace
def check_blanks(data_str):
    is_blank = str(data_str.isspace())
    return is_blank
  • 检查语言(有点慢,我跳过了这一步)

# check the language (only apply to english)
def check_lang(data_str):
    from langid.langid import LanguageIdentifier, model
    identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)
    predict_lang = identifier.classify(data_str)

    if predict_lang[1] >= .9:
        language = predict_lang[0]
    else:
        language = predict_lang[0]
    return language
  • 固定缩写

# fixed abbreviation
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str
  • 删除不相关的功能

# remove irrelevant features
def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned_str.split())
  • 删除停止字

# removes stop words
def remove_stops(data_str):
    # expects a string
    stops = set(stopwords.words("english"))
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stops:
            # rebuild cleaned_str
            if list_pos == 0:
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str
  • 语音标记的一部分

# Part-of-Speech Tagging
def tag_and_remove(data_str):
    cleaned_str = ' '
    # noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    # verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags

    # break string into 'words'
    text = data_str.split()

    # tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            cleaned_str += tagged_word[0] + ' '

    return cleaned_str
  • 勒马石化

# lemmatization
def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str
  • 设置pyspark udf函数

# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
check_blanks_udf = udf(check_blanks, StringType())
check_lang_udf = udf(check_lang, StringType())
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
remove_stops_udf = udf(remove_stops, StringType())
remove_features_udf = udf(remove_features, StringType())
tag_and_remove_udf = udf(tag_and_remove, StringType())
lemmatize_udf = udf(lemmatize, StringType())
  1. 文本处理

  • 更正数据架构

rawdata = rawdata.withColumn('rating', rawdata.rating.cast('float'))
rawdata.printSchema()
root
|-- id: string (nullable = true)
|-- airline: string (nullable = true)
|-- date: string (nullable = true)
|-- location: string (nullable = true)
|-- rating: float (nullable = true)
|-- cabin: string (nullable = true)
|-- value: string (nullable = true)
|-- recommended: string (nullable = true)
|-- review: string (nullable = true)
from datetime import datetime
from pyspark.sql.functions import col

# https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior
# 21-Jun-14 <----> %d-%b-%y
to_date =  udf (lambda x: datetime.strptime(x, '%d-%b-%y'), DateType())

rawdata = rawdata.withColumn('date', to_date(col('date')))
rawdata.printSchema()
root
 |-- id: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- cabin: string (nullable = true)
 |-- value: string (nullable = true)
 |-- recommended: string (nullable = true)
 |-- review: string (nullable = true)
rawdata.show(5)
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+
only showing top 5 rows
rawdata = rawdata.withColumn('non_asci', strip_non_ascii_udf(rawdata['review']))


+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|            non_asci|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|Flight 3246 from ...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|Flight from Manch...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|I'm Executive Pla...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|MSP-JFK-MXP and r...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|Worst airline I h...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+
only showing top 5 rows
rawdata = rawdata.select(raw_cols+['non_asci'])\
                 .withColumn('fixed_abbrev',fix_abbreviation_udf(rawdata['non_asci']))

+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|            non_asci|        fixed_abbrev|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|Flight 3246 from ...|flight 3246 from ...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|Flight from Manch...|flight from manch...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|I'm Executive Pla...|i'm executive pla...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|MSP-JFK-MXP and r...|msp-jfk-mxp and r...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|Worst airline I h...|worst airline i h...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows
 rawdata = rawdata.select(raw_cols+['fixed_abbrev'])\
                  .withColumn('stop_text',remove_stops_udf(rawdata['fixed_abbrev']))

+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|        fixed_abbrev|           stop_text|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|flight 3246 from ...|flight 3246 chica...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|flight from manch...|flight manchester...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|i'm executive pla...|i'm executive pla...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|msp-jfk-mxp and r...|msp-jfk-mxp retur...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|worst airline i h...|worst airline eve...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows
rawdata = rawdata.select(raw_cols+['stop_text'])\
                 .withColumn('feat_text',remove_features_udf(rawdata['stop_text']))

+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|           stop_text|           feat_text|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|flight 3246 chica...|flight chicago mi...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|flight manchester...|flight manchester...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|i'm executive pla...|executive platinu...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|msp-jfk-mxp retur...|msp jfk mxp retur...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|worst airline eve...|worst airline eve...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows
 rawdata = rawdata.select(raw_cols+['feat_text'])\
                  .withColumn('tagged_text',tag_and_remove_udf(rawdata['feat_text']))

+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|           feat_text|         tagged_text|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|flight chicago mi...| flight chicago m...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|flight manchester...| flight mancheste...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|executive platinu...| executive platin...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|msp jfk mxp retur...| msp jfk mxp retu...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|worst airline eve...| worst airline ua...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows
 rawdata = rawdata.select(raw_cols+['tagged_text']) \
                  .withColumn('lemm_text',lemmatize_udf(rawdata['tagged_text'])


+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|         tagged_text|           lemm_text|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...| flight chicago m...|flight chicago mi...|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...| flight mancheste...|flight manchester...|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...| executive platin...|executive platinu...|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...| msp jfk mxp retu...|msp jfk mxp retur...|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...| worst airline ua...|worst airline ual...|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows
 rawdata = rawdata.select(raw_cols+['lemm_text']) \
                  .withColumn("is_blank", check_blanks_udf(rawdata["lemm_text"]))


+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|           lemm_text|is_blank|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|flight chicago mi...|   False|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|flight manchester...|   False|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|executive platinu...|   False|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|msp jfk mxp retur...|   False|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|worst airline ual...|   False|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+
only showing top 5 rows
from pyspark.sql.functions import monotonically_increasing_id
# Create Unique ID
rawdata = rawdata.withColumn("uid", monotonically_increasing_id())
data = rawdata.filter(rawdata["is_blank"] == "False")

+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+---+
|   id|           airline|      date|location|rating|   cabin|value|recommended|              review|           lemm_text|is_blank|uid|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+---+
|10551|Southwest Airlines|2013-11-06|     USA|   1.0|Business|    2|         NO|Flight 3246 from ...|flight chicago mi...|   False|  0|
|10298|        US Airways|2014-03-31|      UK|   1.0|Business|    0|         NO|Flight from Manch...|flight manchester...|   False|  1|
|10564|Southwest Airlines|2013-09-06|     USA|  10.0| Economy|    5|        YES|I'm Executive Pla...|executive platinu...|   False|  2|
|10134|   Delta Air Lines|2013-12-10|     USA|   8.0| Economy|    4|        YES|MSP-JFK-MXP and r...|msp jfk mxp retur...|   False|  3|
|10912|   United Airlines|2014-04-07|     USA|   3.0| Economy|    1|         NO|Worst airline I h...|worst airline ual...|   False|  4|
+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+---+
only showing top 5 rows

#LDA模型管道

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.clustering import LDA
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import CountVectorizer

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="lemm_text", outputCol="words")
#data = tokenizer.transform(data)
vectorizer = CountVectorizer(inputCol= "words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
#idfModel = idf.fit(data)

lda = LDA(k=20, seed=1, optimizer="em")

pipeline = Pipeline(stages=[tokenizer, vectorizer,idf, lda])


model = pipeline.fit(data)
  1. 结果展示

  • 话题

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[60, 7, 12, 483, ...|[0.01349507958269...|
|    1|[363, 29, 187, 55...|[0.01247250144447...|
|    2|[46, 107, 672, 27...|[0.01188684264641...|
|    3|[76, 43, 285, 152...|[0.01132638300115...|
|    4|[201, 13, 372, 69...|[0.01337529863256...|
|    5|[122, 103, 181, 4...|[0.00930415977117...|
|    6|[14, 270, 18, 74,...|[0.01253817708163...|
|    7|[111, 36, 341, 10...|[0.01269584954257...|
|    8|[477, 266, 297, 1...|[0.01017486869509...|
|    9|[10, 73, 46, 1, 2...|[0.01050875237546...|
|   10|[57, 29, 411, 10,...|[0.01777350667863...|
|   11|[293, 119, 385, 4...|[0.01280305149305...|
|   12|[116, 218, 256, 1...|[0.01570714218509...|
|   13|[433, 171, 176, 3...|[0.00819684813575...|
|   14|[74, 84, 45, 108,...|[0.01700630002172...|
|   15|[669, 215, 14, 58...|[0.00779310974971...|
|   16|[198, 21, 98, 164...|[0.01030577084202...|
|   17|[96, 29, 569, 444...|[0.01297142577633...|
|   18|[18, 60, 140, 64,...|[0.01306356985169...|
|   19|[33, 178, 95, 2, ...|[0.00907425683229...|
+-----+--------------------+--------------------+
  • 主题词

from pyspark.sql.types import ArrayType, StringType

def termsIdx2Term(vocabulary):
    def termsIdx2Term(termIndices):
        return [vocabulary[int(index)] for index in termIndices]
    return udf(termsIdx2Term, ArrayType(StringType()))

vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
final = ldatopics.withColumn("Terms", termsIdx2Term(vocabList)("termIndices"))
+-----+------------------------------------------------+-------------------------------------------------------------------------------------+
|topic|termIndices                                     |Terms                                                                                |
+-----+------------------------------------------------+-------------------------------------------------------------------------------------+
|0    |[60, 7, 12, 483, 292, 326, 88, 4, 808, 32]      |[pm, plane, board, kid, online, lga, schedule, get, memphis, arrive]                 |
|1    |[363, 29, 187, 55, 48, 647, 30, 9, 204, 457]    |[dublin, class, th, sit, entertainment, express, say, delay, dl, son]                |
|2    |[46, 107, 672, 274, 92, 539, 23, 27, 279, 8]    |[economy, sfo, milwaukee, decent, comfortable, iad, return, united, average, airline]|
|3    |[76, 43, 285, 152, 102, 34, 300, 113, 24, 31]   |[didn, pay, lose, different, extra, bag, mile, baggage, leave, day]                  |
|4    |[201, 13, 372, 692, 248, 62, 211, 187, 105, 110]|[houston, crew, heathrow, louisville, london, great, denver, th, land, jfk]          |
|5    |[122, 103, 181, 48, 434, 10, 121, 147, 934, 169]|[lhr, serve, screen, entertainment, ny, delta, excellent, atl, sin, newark]          |
|6    |[14, 270, 18, 74, 70, 37, 16, 450, 3, 20]       |[check, employee, gate, line, change, wait, take, fll, time, tell]                   |
|7    |[111, 36, 341, 10, 320, 528, 844, 19, 195, 524] |[atlanta, first, toilet, delta, washington, card, global, staff, route, amsterdam]   |
|8    |[477, 266, 297, 185, 1, 33, 22, 783, 17, 908]   |[fuel, group, pas, boarding, seat, trip, minute, orleans, make, select]              |
|9    |[10, 73, 46, 1, 248, 302, 213, 659, 48, 228]    |[delta, lax, economy, seat, london, detroit, comfo, weren, entertainment, wife]      |
|10   |[57, 29, 411, 10, 221, 121, 661, 19, 805, 733]  |[business, class, fra, delta, lounge, excellent, syd, staff, nov, mexico]            |
|11   |[293, 119, 385, 481, 503, 69, 13, 87, 176, 545] |[march, ua, manchester, phx, envoy, drink, crew, american, aa, canada]               |
|12   |[116, 218, 256, 156, 639, 20, 365, 18, 22, 136] |[san, clt, francisco, second, text, tell, captain, gate, minute, available]          |
|13   |[433, 171, 176, 339, 429, 575, 10, 26, 474, 796]|[daughter, small, aa, ba, segment, proceed, delta, passenger, size, similar]         |
|14   |[74, 84, 45, 108, 342, 111, 315, 87, 52, 4]     |[line, agent, next, hotel, standby, atlanta, dallas, american, book, get]            |
|15   |[669, 215, 14, 58, 561, 59, 125, 179, 93, 5]    |[fit, carry, check, people, bathroom, ask, thing, row, don, fly]                     |
|16   |[198, 21, 98, 164, 57, 141, 345, 62, 121, 174]  |[ife, good, nice, much, business, lot, dfw, great, excellent, carrier]               |
|17   |[96, 29, 569, 444, 15, 568, 21, 103, 657, 505]  |[phl, class, diego, lady, food, wheelchair, good, serve, miami, mia]                 |
|18   |[18, 60, 140, 64, 47, 40, 31, 35, 2, 123]       |[gate, pm, phoenix, connection, cancel, connect, day, airpo, hour, charlotte]        |
|19   |[33, 178, 95, 2, 9, 284, 42, 4, 89, 31]         |[trip, counter, philadelphia, hour, delay, stay, way, get, southwest, day]           |
+-----+------------------------------------------------+-------------------------------------------------------------------------------------+
  • LDA结果

+-----+------------------+----------+-----------+------+--------------------+--------------------+--------------------+
|   id|           airline|      date|      cabin|rating|               words|            features|   topicDistribution|
+-----+------------------+----------+-----------+------+--------------------+--------------------+--------------------+
|10551|Southwest Airlines|2013-11-06|   Business|   1.0|[flight, chicago,...|(4695,[0,2,3,6,11...|[0.03640342580508...|
|10298|        US Airways|2014-03-31|   Business|   1.0|[flight, manchest...|(4695,[0,1,2,6,7,...|[0.01381306271470...|
|10564|Southwest Airlines|2013-09-06|    Economy|  10.0|[executive, plati...|(4695,[0,1,6,7,11...|[0.05063554352934...|
|10134|   Delta Air Lines|2013-12-10|    Economy|   8.0|[msp, jfk, mxp, r...|(4695,[0,1,3,10,1...|[0.01494708959842...|
|10912|   United Airlines|2014-04-07|    Economy|   3.0|[worst, airline, ...|(4695,[0,1,7,8,13...|[0.04421751181232...|
|10089|   Delta Air Lines|2014-02-18|    Economy|   2.0|[dl, mia, lax, im...|(4695,[2,4,5,7,8,...|[0.02158861273876...|
|10385|        US Airways|2013-10-21|    Economy|  10.0|[flew, gla, phl, ...|(4695,[0,1,3,5,14...|[0.03343845991816...|
|10249|        US Airways|2014-06-17|    Economy|   1.0|[friend, book, fl...|(4695,[0,2,3,4,5,...|[0.02362432562165...|
|10289|        US Airways|2014-04-12|    Economy|  10.0|[flew, air, rome,...|(4695,[0,1,5,8,13...|[0.01664012816210...|
|10654|Southwest Airlines|2012-07-10|    Economy|   8.0|[lhr, jfk, think,...|(4695,[0,4,5,6,8,...|[0.01526072330297...|
|10754| American Airlines|2014-05-04|    Economy|  10.0|[san, diego, moli...|(4695,[0,2,8,15,2...|[0.03571177612496...|
|10646|Southwest Airlines|2012-08-17|    Economy|   7.0|[toledo, co, stop...|(4695,[0,2,3,4,7,...|[0.02394775146271...|
|10097|   Delta Air Lines|2014-02-03|First Class|  10.0|[honolulu, la, fi...|(4695,[0,4,6,7,13...|[0.02008375619661...|
|10132|   Delta Air Lines|2013-12-16|    Economy|   7.0|[manchester, uk, ...|(4695,[0,1,2,3,5,...|[0.01463126146601...|
|10560|Southwest Airlines|2013-09-20|    Economy|   9.0|[first, time, sou...|(4695,[0,3,7,8,9,...|[0.04934836409896...|
|10579|Southwest Airlines|2013-07-25|    Economy|   0.0|[plane, land, pm,...|(4695,[2,3,4,5,7,...|[0.06106959241722...|
|10425|        US Airways|2013-08-06|    Economy|   3.0|[airway, bad, pro...|(4695,[2,3,4,7,8,...|[0.01770471771322...|
|10650|Southwest Airlines|2012-07-27|    Economy|   9.0|[flew, jfk, lhr, ...|(4695,[0,1,6,13,1...|[0.02676226245086...|
|10260|        US Airways|2014-06-03|    Economy|   1.0|[february, air, u...|(4695,[0,2,4,17,2...|[0.02887390875079...|
|10202|   Delta Air Lines|2013-09-14|    Economy|  10.0|[aug, lhr, jfk, b...|(4695,[1,2,4,7,10...|[0.02377704988307...|
+-----+------------------+----------+-----------+------+--------------------+--------------------+--------------------+
only showing top 20 rows
  • 每天的平均评级和航空公司

images/avg_rating_airlines.png
  • 每月平均评级和航空公司

images/avg_rating_mon.png
  • 主题1对应时间线

images/topic_time.png
  • 与专题1相关的评论(文件)

images/review2topic.png