13. 文本挖掘¶
中国谚语
文章的内容超出了预期。 --沈相龙

13.1. 文本采集¶
13.1.1. 图像到文本¶
我的
img2txt
功能
def img2txt(img_dir):
"""
convert images to text
"""
import os, PythonMagick
from datetime import datetime
import PyPDF2
from PIL import Image
import pytesseract
f = open('doc4img.txt','wa')
for img in [img_file for img_file in os.listdir(img_dir)
if (img_file.endswith(".png") or
img_file.endswith(".jpg") or
img_file.endswith(".jpeg"))]:
start_time = datetime.now()
input_img = img_dir + "/" + img
print('--------------------------------------------------------------------')
print(img)
print('Converting ' + img +'.......')
print('--------------------------------------------------------------------')
# extract the text information from images
text = pytesseract.image_to_string(Image.open(input_img))
print(text)
# ouput text file
f.write( img + "\n")
f.write(text.encode('utf-8'))
print "CPU Time for converting" + img +":"+ str(datetime.now() - start_time) +"\n"
f.write( "\n-------------------------------------------------------------\n")
f.close()
演示
我申请了我的 img2txt
图像的函数 Image folder .
image_dir = r"Image"
img2txt(image_dir)
然后我得到了以下结果:
--------------------------------------------------------------------
feng.pdf_0.png
Converting feng.pdf_0.png.......
--------------------------------------------------------------------
l I l w
Wenqiang Feng
Data Scientist
DST APPLIED ANALYTICS GROUP
Wenqiang Feng is Data Scientist for DST’s Applied Analytics Group. Dr. Feng’s responsibilities
include providing DST clients with access to cutting—edge skills and technologies, including Big
Data analytic solutions, advanced analytic and data enhancement techniques and modeling.
Dr. Feng has deep analytic expertise in data mining, analytic systems, machine learning
algorithms, business intelligence, and applying Big Data tools to strategically solve industry
problems in a cross—functional business. Before joining the DST Applied Analytics Group, Dr.
Feng holds a MA Data Science Fellow at The Institute for Mathematics and Its Applications
{IMA) at the University of Minnesota. While there, he helped startup companies make
marketing decisions based on deep predictive analytics.
Dr. Feng graduated from University of Tennessee, Knoxville with PhD in Computational
mathematics and Master’s degree in Statistics. He also holds Master’s degree in Computational
Mathematics at Missouri University of Science and Technology (MST) and Master’s degree in
Applied Mathematics at University of science and technology of China (USTC).
CPU Time for convertingfeng.pdf_0.png:0:00:02.061208
13.1.2. 图像增强到文本¶
我的
img2txt_enhance
功能
def img2txt_enhance(img_dir,scaler):
"""
convert images files to text
"""
import numpy as np
import os, PythonMagick
from datetime import datetime
import PyPDF2
from PIL import Image, ImageEnhance, ImageFilter
import pytesseract
f = open('doc4img.txt','wa')
for img in [img_file for img_file in os.listdir(img_dir)
if (img_file.endswith(".png") or
img_file.endswith(".jpg") or
img_file.endswith(".jpeg"))]:
start_time = datetime.now()
input_img = img_dir + "/" + img
enhanced_img = img_dir + "/" +"Enhanced" + "/"+ img
im = Image.open(input_img) # the second one
im = im.filter(ImageFilter.MedianFilter())
enhancer = ImageEnhance.Contrast(im)
im = enhancer.enhance(1)
im = im.convert('1')
im.save(enhanced_img)
for scale in np.ones(scaler):
im = Image.open(enhanced_img) # the second one
im = im.filter(ImageFilter.MedianFilter())
enhancer = ImageEnhance.Contrast(im)
im = enhancer.enhance(scale)
im = im.convert('1')
im.save(enhanced_img)
print('--------------------------------------------------------------------')
print(img)
print('Converting ' + img +'.......')
print('--------------------------------------------------------------------')
# extract the text information from images
text = pytesseract.image_to_string(Image.open(enhanced_img))
print(text)
# ouput text file
f.write( img + "\n")
f.write(text.encode('utf-8'))
print "CPU Time for converting" + img +":"+ str(datetime.now() - start_time) +"\n"
f.write( "\n-------------------------------------------------------------\n")
f.close()
演示
我申请了我的 img2txt_enhance
以下噪声图像的函数 Enhance folder .

image_dir = r"Enhance"
pdf2txt_enhance(image_dir)
然后我得到了以下结果:
--------------------------------------------------------------------
noised.jpg
Converting noised.jpg.......
--------------------------------------------------------------------
zHHH
CPU Time for convertingnoised.jpg:0:00:00.135465
而结果来自 img2txt
函数是
--------------------------------------------------------------------
noised.jpg
Converting noised.jpg.......
--------------------------------------------------------------------
,2 WW
CPU Time for convertingnoised.jpg:0:00:00.133508
这是不正确的。
13.1.3. 文本的PDF¶
我的
pdf2txt
功能
def pdf2txt(pdf_dir,image_dir):
"""
convert PDF to text
"""
import os, PythonMagick
from datetime import datetime
import PyPDF2
from PIL import Image
import pytesseract
f = open('doc.txt','wa')
for pdf in [pdf_file for pdf_file in os.listdir(pdf_dir) if pdf_file.endswith(".pdf")]:
start_time = datetime.now()
input_pdf = pdf_dir + "/" + pdf
pdf_im = PyPDF2.PdfFileReader(file(input_pdf, "rb"))
npage = pdf_im.getNumPages()
print('--------------------------------------------------------------------')
print(pdf)
print('Converting %d pages.' % npage)
print('--------------------------------------------------------------------')
f.write( "\n--------------------------------------------------------------------\n")
for p in range(npage):
pdf_file = input_pdf + '[' + str(p) +']'
image_file = image_dir + "/" + pdf+ '_' + str(p)+ '.png'
# convert PDF files to Images
im = PythonMagick.Image()
im.density('300')
im.read(pdf_file)
im.write(image_file)
# extract the text information from images
text = pytesseract.image_to_string(Image.open(image_file))
#print(text)
# ouput text file
f.write( pdf + "\n")
f.write(text.encode('utf-8'))
print "CPU Time for converting" + pdf +":"+ str(datetime.now() - start_time) +"\n"
f.close()
演示
我申请了我的 pdf2txt
函数到我的扫描的bio pdf文件 pdf folder .
pdf_dir = r"pdf"
image_dir = r"Image"
pdf2txt(pdf_dir,image_dir)
然后我得到了以下结果:
--------------------------------------------------------------------
feng.pdf
Converting 1 pages.
--------------------------------------------------------------------
l I l w
Wenqiang Feng
Data Scientist
DST APPLIED ANALYTICS GROUP
Wenqiang Feng is Data Scientist for DST’s Applied Analytics Group. Dr. Feng’s responsibilities
include providing DST clients with access to cutting—edge skills and technologies, including Big
Data analytic solutions, advanced analytic and data enhancement techniques and modeling.
Dr. Feng has deep analytic expertise in data mining, analytic systems, machine learning
algorithms, business intelligence, and applying Big Data tools to strategically solve industry
problems in a cross—functional business. Before joining the DST Applied Analytics Group, Dr.
Feng holds a MA Data Science Fellow at The Institute for Mathematics and Its Applications
{IMA) at the University of Minnesota. While there, he helped startup companies make
marketing decisions based on deep predictive analytics.
Dr. Feng graduated from University of Tennessee, Knoxville with PhD in Computational
mathematics and Master’s degree in Statistics. He also holds Master’s degree in Computational
Mathematics at Missouri University of Science and Technology (MST) and Master’s degree in
Applied Mathematics at University of science and technology of China (USTC).
CPU Time for convertingfeng.pdf:0:00:03.143800
13.1.4. 音频到文本¶
我的
audio2txt
功能
def audio2txt(audio_dir):
''' convert audio to text'''
import speech_recognition as sr
r = sr.Recognizer()
f = open('doc.txt','wa')
for audio_n in [audio_file for audio_file in os.listdir(audio_dir) \
if audio_file.endswith(".wav")]:
filename = audio_dir + "/" + audio_n
# Read audio data
with sr.AudioFile(filename) as source:
audio = r.record(source) # read the entire audio file
# Google Speech Recognition
text = r.recognize_google(audio)
# ouput text file
f.write( audio_n + ": ")
f.write(text.encode('utf-8'))
f.write("\n")
print('You said: ' + text)
f.close()
演示
我申请了我的 audio2txt
功能到我的音频记录 audio folder .
audio_dir = r"audio"
audio2txt(audio_dir)
然后我得到了以下结果:
You said: hello this is George welcome to my tutorial
You said: mathematics is important in daily life
You said: call me tomorrow
You said: do you want something to eat
You said: I want to speak with him
You said: nice to see you
You said: can you speak slowly
You said: have a good day
顺便说一下,您可以使用我下面的python代码来录制您自己的音频并播放 audio2txt
命令行中的函数 python record.py "demo2.wav"
:
import sys, getopt
import speech_recognition as sr
audio_filename = sys.argv[1]
r = sr.Recognizer()
with sr.Microphone() as source:
r.adjust_for_ambient_noise(source)
print("Hey there, say something, I am recording!")
audio = r.listen(source)
print("Done listening!")
with open(audio_filename, "wb") as f:
f.write(audio.get_wav_data())
13.2. 文本预处理¶
检查一行是否只包含空格
def check_blanks(data_str):
is_blank = str(data_str.isspace())
return is_blank
确定文本内容的语言是否为英语:使用langid模块对语言进行分类,以确保对英语langid应用正确的清理操作。
def check_lang(data_str):
predict_lang = langid.classify(data_str)
if predict_lang[1] >= .9:
language = predict_lang[0]
else:
language = 'NA'
return language
删除特征
def remove_features(data_str):
# compile regex
url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
punc_re = re.compile('[%s]' % re.escape(string.punctuation))
num_re = re.compile('(\\d+)')
mention_re = re.compile('@(\w+)')
alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
data_str = data_str.lower()
# remove hyperlinks
data_str = url_re.sub(' ', data_str)
# remove @mentions
data_str = mention_re.sub(' ', data_str)
# remove puncuation
data_str = punc_re.sub(' ', data_str)
# remove numeric 'words'
data_str = num_re.sub(' ', data_str)
# remove non a-z 0-9 characters and words shorter than 3 characters
list_pos = 0
cleaned_str = ''
for word in data_str.split():
if list_pos == 0:
if alpha_num_re.match(word) and len(word) > 2:
cleaned_str = word
else:
cleaned_str = ' '
else:
if alpha_num_re.match(word) and len(word) > 2:
cleaned_str = cleaned_str + ' ' + word
else:
cleaned_str += ' '
list_pos += 1
return cleaned_str
删除停止字
def remove_stops(data_str):
# expects a string
stops = set(stopwords.words("english"))
list_pos = 0
cleaned_str = ''
text = data_str.split()
for word in text:
if word not in stops:
# rebuild cleaned_str
if list_pos == 0:
cleaned_str = word
else:
cleaned_str = cleaned_str + ' ' + word
list_pos += 1
return cleaned_str
标记文本
def tag_and_remove(data_str):
cleaned_str = ' '
# noun tags
nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
nltk_tags = nn_tags + jj_tags + vb_tags
# break string into 'words'
text = data_str.split()
# tag the text and keep only those with the right tags
tagged_text = pos_tag(text)
for tagged_word in tagged_text:
if tagged_word[1] in nltk_tags:
cleaned_str += tagged_word[0] + ' '
return cleaned_str
勒马石化
def lemmatize(data_str):
# expects a string
list_pos = 0
cleaned_str = ''
lmtzr = WordNetLemmatizer()
text = data_str.split()
tagged_words = pos_tag(text)
for word in tagged_words:
if 'v' in word[1].lower():
lemma = lmtzr.lemmatize(word[0], pos='v')
else:
lemma = lmtzr.lemmatize(word[0], pos='n')
if list_pos == 0:
cleaned_str = lemma
else:
cleaned_str = cleaned_str + ' ' + lemma
list_pos += 1
return cleaned_str
定义pyspark中的预处理函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())
13.3. 文本分类¶
从理论上讲,您可以应用任何分类算法来进行分类。我将只介绍朴素的贝叶斯方法如下。

13.3.1. 介绍¶
13.3.2. 演示¶
创建Spark上下文
import pyspark
from pyspark.sql import SQLContext
# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
负载数据集
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("../data/raw_data.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
#Create DataFrame
data_df = sqlContext.createDataFrame(typed_rdd, ["text", "id", "label"])
# get the raw columns
raw_cols = data_df.columns
#data_df.show()
data_df.printSchema()
root
|-- text: string (nullable = true)
|-- id: string (nullable = true)
|-- label: double (nullable = true)
+--------------------+------------------+-----+
| text| id|label|
+--------------------+------------------+-----+
|Fresh install of ...| 1018769417| 1.0|
|Well. Now I know ...| 10284216536| 1.0|
|"Literally six we...| 10298589026| 1.0|
|Mitsubishi i MiEV...|109017669432377344| 1.0|
+--------------------+------------------+-----+
only showing top 4 rows
设置pyspark udf函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())
语言识别
lang_df = data_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")
en_df.show(4)
+--------------------+------------------+-----+----+
| text| id|label|lang|
+--------------------+------------------+-----+----+
|RT @goeentertain:...|665305154954989568| 1.0| en|
|Teforia Uses Mach...|660668007975268352| 1.0| en|
| Apple TV or Roku?| 25842461136| 1.0| en|
|Finished http://t...| 9412369614| 1.0| en|
+--------------------+------------------+-----+----+
only showing top 4 rows
删除停止词
rm_stops_df = en_df.select(raw_cols)\
.withColumn("stop_text", remove_stops_udf(en_df["text"]))
rm_stops_df.show(4)
+--------------------+------------------+-----+--------------------+
| text| id|label| stop_text|
+--------------------+------------------+-----+--------------------+
|RT @goeentertain:...|665305154954989568| 1.0|RT @goeentertain:...|
|Teforia Uses Mach...|660668007975268352| 1.0|Teforia Uses Mach...|
| Apple TV or Roku?| 25842461136| 1.0| Apple TV Roku?|
|Finished http://t...| 9412369614| 1.0|Finished http://t...|
+--------------------+------------------+-----+--------------------+
only showing top 4 rows
删除不相关的功能
rm_features_df = rm_stops_df.select(raw_cols+["stop_text"])\
.withColumn("feat_text", \
remove_features_udf(rm_stops_df["stop_text"]))
rm_features_df.show(4)
+--------------------+------------------+-----+--------------------+--------------------+
| text| id|label| stop_text| feat_text|
+--------------------+------------------+-----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568| 1.0|RT @goeentertain:...| future blase ...|
|Teforia Uses Mach...|660668007975268352| 1.0|Teforia Uses Mach...|teforia uses mach...|
| Apple TV or Roku?| 25842461136| 1.0| Apple TV Roku?| apple roku|
|Finished http://t...| 9412369614| 1.0|Finished http://t...| finished|
+--------------------+------------------+-----+--------------------+--------------------+
only showing top 4 rows
标记单词
tagged_df = rm_features_df.select(raw_cols+["feat_text"]) \
.withColumn("tagged_text", \
tag_and_remove_udf(rm_features_df.feat_text))
tagged_df.show(4)
+--------------------+------------------+-----+--------------------+--------------------+
| text| id|label| feat_text| tagged_text|
+--------------------+------------------+-----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568| 1.0| future blase ...| future blase vic...|
|Teforia Uses Mach...|660668007975268352| 1.0|teforia uses mach...| teforia uses mac...|
| Apple TV or Roku?| 25842461136| 1.0| apple roku| apple roku |
|Finished http://t...| 9412369614| 1.0| finished| finished |
+--------------------+------------------+-----+--------------------+--------------------+
only showing top 4 rows
词的引理化
lemm_df = tagged_df.select(raw_cols+["tagged_text"]) \
.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))
lemm_df.show(4)
+--------------------+------------------+-----+--------------------+--------------------+
| text| id|label| tagged_text| lemm_text|
+--------------------+------------------+-----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568| 1.0| future blase vic...|future blase vice...|
|Teforia Uses Mach...|660668007975268352| 1.0| teforia uses mac...|teforia use machi...|
| Apple TV or Roku?| 25842461136| 1.0| apple roku | apple roku|
|Finished http://t...| 9412369614| 1.0| finished | finish|
+--------------------+------------------+-----+--------------------+--------------------+
only showing top 4 rows
删除空行并删除重复项
check_blanks_df = lemm_df.select(raw_cols+["lemm_text"])\
.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
# remove blanks
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")
# drop duplicates
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])
dedup_df.show(4)
+--------------------+------------------+-----+--------------------+--------+
| text| id|label| lemm_text|is_blank|
+--------------------+------------------+-----+--------------------+--------+
|RT @goeentertain:...|665305154954989568| 1.0|future blase vice...| False|
|Teforia Uses Mach...|660668007975268352| 1.0|teforia use machi...| False|
| Apple TV or Roku?| 25842461136| 1.0| apple roku| False|
|Finished http://t...| 9412369614| 1.0| finish| False|
+--------------------+------------------+-----+--------------------+--------+
only showing top 4 rows
添加UNIUQID
from pyspark.sql.functions import monotonically_increasing_id
# Create Unique ID
dedup_df = dedup_df.withColumn("uid", monotonically_increasing_id())
dedup_df.show(4)
+--------------------+------------------+-----+--------------------+--------+------------+
| text| id|label| lemm_text|is_blank| uid|
+--------------------+------------------+-----+--------------------+--------+------------+
| dragon| 1546813742| 1.0| dragon| False| 85899345920|
| hurt much| 1558492525| 1.0| hurt much| False|111669149696|
|seth blog word se...|383221484023709697| 1.0|seth blog word se...| False|128849018880|
|teforia use machi...|660668007975268352| 1.0|teforia use machi...| False|137438953472|
+--------------------+------------------+-----+--------------------+--------+------------+
only showing top 4 rows
创建最终数据集
data = dedup_df.select('uid','id', 'text','label')
data.show(4)
+------------+------------------+--------------------+-----+
| uid| id| text|label|
+------------+------------------+--------------------+-----+
| 85899345920| 1546813742| dragon| 1.0|
|111669149696| 1558492525| hurt much| 1.0|
|128849018880|383221484023709697|seth blog word se...| 1.0|
|137438953472|660668007975268352|teforia use machi...| 1.0|
+------------+------------------+--------------------+-----+
only showing top 4 rows
创建训练和测试集
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
NaiveBayes管道
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import CountVectorizer
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
# vectorizer = CountVectorizer(inputCol= "words", outputCol="rawFeatures")
idf = IDF(minDocFreq=3, inputCol="rawFeatures", outputCol="features")
# Naive Bayes model
nb = NaiveBayes()
# Pipeline Architecture
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
做出预测
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("text", "label", "prediction").show(5,False)
+-----------------------------------------------+-----+----------+
|text |label|prediction|
+-----------------------------------------------+-----+----------+
|finish |1.0 |1.0 |
|meet rolo dogsofthinkgeek happy nationaldogday |1.0 |1.0 |
|pumpkin family |1.0 |1.0 |
|meet jet dogsofthinkgeek happy nationaldogday |1.0 |1.0 |
|meet vixie dogsofthinkgeek happy nationaldogday|1.0 |1.0 |
+-----------------------------------------------+-----+----------+
only showing top 5 rows
评价
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
0.912655971479501
13.4. 情绪分析¶

13.4.1. 介绍¶
Sentiment analysis (有时被称为意见挖掘或情绪人工智能)是指使用自然语言处理、文本分析、计算语言学和生物特征来系统地识别、提取、量化和研究情感状态和主观信息。情感分析广泛应用于客户的声音材料,如评论和调查回复、在线和社交媒体,以及医疗保健材料,应用范围从市场营销到客户服务再到临床医学。
一般来说,情绪分析的目的是 决定态度 指演说者、作家或其他主题关于某个主题或整个语境的极性或对某个文档、互动或事件的情感反应。态度可以是一种判断或评价(见评价理论)、情感状态(即作者或演讲者的情感状态)或预期的情感交流(即作者或对话者预期的情感效果)。
商业中的情绪分析,也称为意见挖掘,是一个根据文本所传达的语调来识别和编目文本的过程。它具有广泛的应用:
商务智能构建中的情绪分析
企业竞争优势的情绪分析
通过业务中的情绪分析提升客户体验
13.4.3. 演示¶
设置Spark上下文和SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Sentiment Analysis example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
负载数据集
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/newtwitter.csv",header=True);
+--------------------+----------+-------+
| text| id|pubdate|
+--------------------+----------+-------+
|10 Things Missing...|2602860537| 18536|
|RT @_NATURALBWINN...|2602850443| 18536|
|RT @HBO24 yo the ...|2602761852| 18535|
|Aaaaaaaand I have...|2602738438| 18535|
|can I please have...|2602684185| 18535|
+--------------------+----------+-------+
only showing top 5 rows
文本预处理
删除非ASCII字符
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag
import string
import re
# remove non ASCII characters
def strip_non_ascii(data_str):
''' Returns the string without non ASCII characters'''
stripped = (c for c in data_str if 0 < ord(c) < 127)
return ''.join(stripped)
# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
检查:
df = df.withColumn('text_non_asci',strip_non_ascii_udf(df['text']))
df.show(5,True)
输出:
+--------------------+----------+-------+--------------------+
| text| id|pubdate| text_non_asci|
+--------------------+----------+-------+--------------------+
|10 Things Missing...|2602860537| 18536|10 Things Missing...|
|RT @_NATURALBWINN...|2602850443| 18536|RT @_NATURALBWINN...|
|RT @HBO24 yo the ...|2602761852| 18535|RT @HBO24 yo the ...|
|Aaaaaaaand I have...|2602738438| 18535|Aaaaaaaand I have...|
|can I please have...|2602684185| 18535|can I please have...|
+--------------------+----------+-------+--------------------+
only showing top 5 rows
固定缩写
# fixed abbreviation
def fix_abbreviation(data_str):
data_str = data_str.lower()
data_str = re.sub(r'\bthats\b', 'that is', data_str)
data_str = re.sub(r'\bive\b', 'i have', data_str)
data_str = re.sub(r'\bim\b', 'i am', data_str)
data_str = re.sub(r'\bya\b', 'yeah', data_str)
data_str = re.sub(r'\bcant\b', 'can not', data_str)
data_str = re.sub(r'\bdont\b', 'do not', data_str)
data_str = re.sub(r'\bwont\b', 'will not', data_str)
data_str = re.sub(r'\bid\b', 'i would', data_str)
data_str = re.sub(r'wtf', 'what the fuck', data_str)
data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
data_str = re.sub(r'\br\b', 'are', data_str)
data_str = re.sub(r'\bu\b', 'you', data_str)
data_str = re.sub(r'\bk\b', 'OK', data_str)
data_str = re.sub(r'\bsux\b', 'sucks', data_str)
data_str = re.sub(r'\bno+\b', 'no', data_str)
data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
data_str = re.sub(r'rt\b', '', data_str)
data_str = data_str.strip()
return data_str
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
- 检查:
df = df.withColumn('fixed_abbrev',fix_abbreviation_udf(df['text_non_asci'])) df.show(5,True)
输出:
+--------------------+----------+-------+--------------------+--------------------+
| text| id|pubdate| text_non_asci| fixed_abbrev|
+--------------------+----------+-------+--------------------+--------------------+
|10 Things Missing...|2602860537| 18536|10 Things Missing...|10 things missing...|
|RT @_NATURALBWINN...|2602850443| 18536|RT @_NATURALBWINN...|@_naturalbwinner ...|
|RT @HBO24 yo the ...|2602761852| 18535|RT @HBO24 yo the ...|@hbo24 yo the #ne...|
|Aaaaaaaand I have...|2602738438| 18535|Aaaaaaaand I have...|aaaaaaaand i have...|
|can I please have...|2602684185| 18535|can I please have...|can i please have...|
+--------------------+----------+-------+--------------------+--------------------+
only showing top 5 rows
删除不相关的功能
def remove_features(data_str):
# compile regex
url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
punc_re = re.compile('[%s]' % re.escape(string.punctuation))
num_re = re.compile('(\\d+)')
mention_re = re.compile('@(\w+)')
alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
data_str = data_str.lower()
# remove hyperlinks
data_str = url_re.sub(' ', data_str)
# remove @mentions
data_str = mention_re.sub(' ', data_str)
# remove puncuation
data_str = punc_re.sub(' ', data_str)
# remove numeric 'words'
data_str = num_re.sub(' ', data_str)
# remove non a-z 0-9 characters and words shorter than 1 characters
list_pos = 0
cleaned_str = ''
for word in data_str.split():
if list_pos == 0:
if alpha_num_re.match(word) and len(word) > 1:
cleaned_str = word
else:
cleaned_str = ' '
else:
if alpha_num_re.match(word) and len(word) > 1:
cleaned_str = cleaned_str + ' ' + word
else:
cleaned_str += ' '
list_pos += 1
# remove unwanted space, *.split() will automatically split on
# whitespace and discard duplicates, the " ".join() joins the
# resulting list into one string.
return " ".join(cleaned_str.split())
# setup pyspark udf function
remove_features_udf = udf(remove_features, StringType())
- 检查:
df = df.withColumn('removed',remove_features_udf(df['fixed_abbrev'])) df.show(5,True)
输出:
+--------------------+----------+-------+--------------------+--------------------+--------------------+
| text| id|pubdate| text_non_asci| fixed_abbrev| removed|
+--------------------+----------+-------+--------------------+--------------------+--------------------+
|10 Things Missing...|2602860537| 18536|10 Things Missing...|10 things missing...|things missing in...|
|RT @_NATURALBWINN...|2602850443| 18536|RT @_NATURALBWINN...|@_naturalbwinner ...|oh and do not lik...|
|RT @HBO24 yo the ...|2602761852| 18535|RT @HBO24 yo the ...|@hbo24 yo the #ne...|yo the newtwitter...|
|Aaaaaaaand I have...|2602738438| 18535|Aaaaaaaand I have...|aaaaaaaand i have...|aaaaaaaand have t...|
|can I please have...|2602684185| 18535|can I please have...|can i please have...|can please have t...|
+--------------------+----------+-------+--------------------+--------------------+--------------------+
only showing top 5 rows
情绪分析主要功能
from pyspark.sql.types import FloatType
from textblob import TextBlob
def sentiment_analysis(text):
return TextBlob(text).sentiment.polarity
sentiment_analysis_udf = udf(sentiment_analysis , FloatType())
df = df.withColumn("sentiment_score", sentiment_analysis_udf( df['removed'] ))
df.show(5,True)
情感评分
+--------------------+---------------+
| removed|sentiment_score|
+--------------------+---------------+
|things missing in...| -0.03181818|
|oh and do not lik...| -0.03181818|
|yo the newtwitter...| 0.3181818|
|aaaaaaaand have t...| 0.11818182|
|can please have t...| 0.13636364|
+--------------------+---------------+
only showing top 5 rows
词频

情绪分类
def condition(r): if (r >=0.1): label = "positive" elif(r <= -0.1): label = "negative" else: label = "neutral" return label sentiment_udf = udf(lambda x: condition(x), StringType())
产量
情感类

每个情绪班的头条微博
+--------------------+---------------+---------+ | text|sentiment_score|sentiment| +--------------------+---------------+---------+ |and this #newtwit...| 1.0| positive| |"RT @SarahsJokes:...| 1.0| positive| |#newtwitter using...| 1.0| positive| |The #NewTwitter h...| 1.0| positive| |You can now undo ...| 1.0| positive| +--------------------+---------------+---------+ only showing top 5 rows+--------------------+---------------+---------+ | text|sentiment_score|sentiment| +--------------------+---------------+---------+ |Lists on #NewTwit...| -0.1| neutral| |Too bad most of m...| -0.1| neutral| |the #newtwitter i...| -0.1| neutral| |Looks like our re...| -0.1| neutral| |i switched to the...| -0.1| neutral| +--------------------+---------------+---------+ only showing top 5 rows+--------------------+---------------+---------+ | text|sentiment_score|sentiment| +--------------------+---------------+---------+ |oh. #newtwitter i...| -1.0| negative| |RT @chqwn: #NewTw...| -1.0| negative| |Copy that - its W...| -1.0| negative| |RT @chqwn: #NewTw...| -1.0| negative| |#NewTwitter has t...| -1.0| negative| +--------------------+---------------+---------+ only showing top 5 rows
13.5. n-克和相关性¶
13.6. 主题模型:潜在dirichlet分配¶

13.6.1. 介绍¶
在文本挖掘中,主题模型是用于发现文档集合中出现的抽象“主题”的无监督模型。
潜在dirichlet分配(lda)是一种同时估计这两个主题的数学方法:找到与每个主题相关联的单词的混合,同时确定描述每个文档的主题的混合。
13.6.2. 演示¶
加载数据
rawdata = spark.read.load("../data/airlines.csv", format="csv", header=True) rawdata.show(5)+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| +-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+ |10001|Delta Air Lines|21-Jun-14|Thailand| 7| Economy| 4| YES|Flew Mar 30 NRT t...| |10002|Delta Air Lines|19-Jun-14| USA| 0| Economy| 2| NO|Flight 2463 leavi...| |10003|Delta Air Lines|18-Jun-14| USA| 0| Economy| 1| NO|Delta Website fro...| |10004|Delta Air Lines|17-Jun-14| USA| 9|Business| 4| YES|"I just returned ...| |10005|Delta Air Lines|17-Jun-14| Ecuador| 7| Economy| 3| YES|"Round-trip fligh...| +-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+ only showing top 5 rows
文本预处理
我将使用以下原始列名称来保持表的简洁性:
raw_cols = rawdata.columns raw_cols['id', 'airline', 'date', 'location', 'rating', 'cabin', 'value', 'recommended', 'review']rawdata = rawdata.dropDuplicates(['review'])from pyspark.sql.functions import udf, col from pyspark.sql.types import StringType, DoubleType, DateType from nltk.stem.wordnet import WordNetLemmatizer from nltk.corpus import stopwords from nltk import pos_tag import langid import string import re
删除非ASCII字符
# remove non ASCII characters def strip_non_ascii(data_str): ''' Returns the string without non ASCII characters''' stripped = (c for c in data_str if 0 < ord(c) < 127) return ''.join(stripped)
检查是否为空行
# check to see if a row only contains whitespace def check_blanks(data_str): is_blank = str(data_str.isspace()) return is_blank
检查语言(有点慢,我跳过了这一步)
# check the language (only apply to english) def check_lang(data_str): from langid.langid import LanguageIdentifier, model identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True) predict_lang = identifier.classify(data_str) if predict_lang[1] >= .9: language = predict_lang[0] else: language = predict_lang[0] return language
固定缩写
# fixed abbreviation def fix_abbreviation(data_str): data_str = data_str.lower() data_str = re.sub(r'\bthats\b', 'that is', data_str) data_str = re.sub(r'\bive\b', 'i have', data_str) data_str = re.sub(r'\bim\b', 'i am', data_str) data_str = re.sub(r'\bya\b', 'yeah', data_str) data_str = re.sub(r'\bcant\b', 'can not', data_str) data_str = re.sub(r'\bdont\b', 'do not', data_str) data_str = re.sub(r'\bwont\b', 'will not', data_str) data_str = re.sub(r'\bid\b', 'i would', data_str) data_str = re.sub(r'wtf', 'what the fuck', data_str) data_str = re.sub(r'\bwth\b', 'what the hell', data_str) data_str = re.sub(r'\br\b', 'are', data_str) data_str = re.sub(r'\bu\b', 'you', data_str) data_str = re.sub(r'\bk\b', 'OK', data_str) data_str = re.sub(r'\bsux\b', 'sucks', data_str) data_str = re.sub(r'\bno+\b', 'no', data_str) data_str = re.sub(r'\bcoo+\b', 'cool', data_str) data_str = re.sub(r'rt\b', '', data_str) data_str = data_str.strip() return data_str
删除不相关的功能
# remove irrelevant features def remove_features(data_str): # compile regex url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?') punc_re = re.compile('[%s]' % re.escape(string.punctuation)) num_re = re.compile('(\\d+)') mention_re = re.compile('@(\w+)') alpha_num_re = re.compile("^[a-z0-9_.]+$") # convert to lowercase data_str = data_str.lower() # remove hyperlinks data_str = url_re.sub(' ', data_str) # remove @mentions data_str = mention_re.sub(' ', data_str) # remove puncuation data_str = punc_re.sub(' ', data_str) # remove numeric 'words' data_str = num_re.sub(' ', data_str) # remove non a-z 0-9 characters and words shorter than 1 characters list_pos = 0 cleaned_str = '' for word in data_str.split(): if list_pos == 0: if alpha_num_re.match(word) and len(word) > 1: cleaned_str = word else: cleaned_str = ' ' else: if alpha_num_re.match(word) and len(word) > 1: cleaned_str = cleaned_str + ' ' + word else: cleaned_str += ' ' list_pos += 1 # remove unwanted space, *.split() will automatically split on # whitespace and discard duplicates, the " ".join() joins the # resulting list into one string. return " ".join(cleaned_str.split())
删除停止字
# removes stop words def remove_stops(data_str): # expects a string stops = set(stopwords.words("english")) list_pos = 0 cleaned_str = '' text = data_str.split() for word in text: if word not in stops: # rebuild cleaned_str if list_pos == 0: cleaned_str = word else: cleaned_str = cleaned_str + ' ' + word list_pos += 1 return cleaned_str
语音标记的一部分
# Part-of-Speech Tagging def tag_and_remove(data_str): cleaned_str = ' ' # noun tags nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS'] # adjectives jj_tags = ['JJ', 'JJR', 'JJS'] # verbs vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'] nltk_tags = nn_tags + jj_tags + vb_tags # break string into 'words' text = data_str.split() # tag the text and keep only those with the right tags tagged_text = pos_tag(text) for tagged_word in tagged_text: if tagged_word[1] in nltk_tags: cleaned_str += tagged_word[0] + ' ' return cleaned_str
勒马石化
# lemmatization def lemmatize(data_str): # expects a string list_pos = 0 cleaned_str = '' lmtzr = WordNetLemmatizer() text = data_str.split() tagged_words = pos_tag(text) for word in tagged_words: if 'v' in word[1].lower(): lemma = lmtzr.lemmatize(word[0], pos='v') else: lemma = lmtzr.lemmatize(word[0], pos='n') if list_pos == 0: cleaned_str = lemma else: cleaned_str = cleaned_str + ' ' + lemma list_pos += 1 return cleaned_str
设置pyspark udf函数
# setup pyspark udf function strip_non_ascii_udf = udf(strip_non_ascii, StringType()) check_blanks_udf = udf(check_blanks, StringType()) check_lang_udf = udf(check_lang, StringType()) fix_abbreviation_udf = udf(fix_abbreviation, StringType()) remove_stops_udf = udf(remove_stops, StringType()) remove_features_udf = udf(remove_features, StringType()) tag_and_remove_udf = udf(tag_and_remove, StringType()) lemmatize_udf = udf(lemmatize, StringType())
文本处理
更正数据架构
rawdata = rawdata.withColumn('rating', rawdata.rating.cast('float'))rawdata.printSchema()root |-- id: string (nullable = true) |-- airline: string (nullable = true) |-- date: string (nullable = true) |-- location: string (nullable = true) |-- rating: float (nullable = true) |-- cabin: string (nullable = true) |-- value: string (nullable = true) |-- recommended: string (nullable = true) |-- review: string (nullable = true)from datetime import datetime from pyspark.sql.functions import col # https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior # 21-Jun-14 <----> %d-%b-%y to_date = udf (lambda x: datetime.strptime(x, '%d-%b-%y'), DateType()) rawdata = rawdata.withColumn('date', to_date(col('date')))rawdata.printSchema()root |-- id: string (nullable = true) |-- airline: string (nullable = true) |-- date: date (nullable = true) |-- location: string (nullable = true) |-- rating: float (nullable = true) |-- cabin: string (nullable = true) |-- value: string (nullable = true) |-- recommended: string (nullable = true) |-- review: string (nullable = true)rawdata.show(5)+-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+ only showing top 5 rowsrawdata = rawdata.withColumn('non_asci', strip_non_ascii_udf(rawdata['review'])) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| non_asci| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|Flight 3246 from ...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|Flight from Manch...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|I'm Executive Pla...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|MSP-JFK-MXP and r...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|Worst airline I h...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+ only showing top 5 rowsrawdata = rawdata.select(raw_cols+['non_asci'])\ .withColumn('fixed_abbrev',fix_abbreviation_udf(rawdata['non_asci'])) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| non_asci| fixed_abbrev| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|Flight 3246 from ...|flight 3246 from ...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|Flight from Manch...|flight from manch...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|I'm Executive Pla...|i'm executive pla...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|MSP-JFK-MXP and r...|msp-jfk-mxp and r...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|Worst airline I h...|worst airline i h...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ only showing top 5 rowsrawdata = rawdata.select(raw_cols+['fixed_abbrev'])\ .withColumn('stop_text',remove_stops_udf(rawdata['fixed_abbrev'])) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| fixed_abbrev| stop_text| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|flight 3246 from ...|flight 3246 chica...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|flight from manch...|flight manchester...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|i'm executive pla...|i'm executive pla...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|msp-jfk-mxp and r...|msp-jfk-mxp retur...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|worst airline i h...|worst airline eve...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ only showing top 5 rowsrawdata = rawdata.select(raw_cols+['stop_text'])\ .withColumn('feat_text',remove_features_udf(rawdata['stop_text'])) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| stop_text| feat_text| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|flight 3246 chica...|flight chicago mi...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|flight manchester...|flight manchester...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|i'm executive pla...|executive platinu...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|msp-jfk-mxp retur...|msp jfk mxp retur...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|worst airline eve...|worst airline eve...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ only showing top 5 rowsrawdata = rawdata.select(raw_cols+['feat_text'])\ .withColumn('tagged_text',tag_and_remove_udf(rawdata['feat_text'])) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| feat_text| tagged_text| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|flight chicago mi...| flight chicago m...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|flight manchester...| flight mancheste...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|executive platinu...| executive platin...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|msp jfk mxp retur...| msp jfk mxp retu...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|worst airline eve...| worst airline ua...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ only showing top 5 rowsrawdata = rawdata.select(raw_cols+['tagged_text']) \ .withColumn('lemm_text',lemmatize_udf(rawdata['tagged_text']) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ | id| airline| date|location|rating| cabin|value|recommended| review| tagged_text| lemm_text| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...| flight chicago m...|flight chicago mi...| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...| flight mancheste...|flight manchester...| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...| executive platin...|executive platinu...| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...| msp jfk mxp retu...|msp jfk mxp retur...| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...| worst airline ua...|worst airline ual...| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------------------+ only showing top 5 rowsrawdata = rawdata.select(raw_cols+['lemm_text']) \ .withColumn("is_blank", check_blanks_udf(rawdata["lemm_text"])) +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+ | id| airline| date|location|rating| cabin|value|recommended| review| lemm_text|is_blank| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|flight chicago mi...| False| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|flight manchester...| False| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|executive platinu...| False| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|msp jfk mxp retur...| False| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|worst airline ual...| False| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+ only showing top 5 rowsfrom pyspark.sql.functions import monotonically_increasing_id # Create Unique ID rawdata = rawdata.withColumn("uid", monotonically_increasing_id()) data = rawdata.filter(rawdata["is_blank"] == "False") +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+---+ | id| airline| date|location|rating| cabin|value|recommended| review| lemm_text|is_blank|uid| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+---+ |10551|Southwest Airlines|2013-11-06| USA| 1.0|Business| 2| NO|Flight 3246 from ...|flight chicago mi...| False| 0| |10298| US Airways|2014-03-31| UK| 1.0|Business| 0| NO|Flight from Manch...|flight manchester...| False| 1| |10564|Southwest Airlines|2013-09-06| USA| 10.0| Economy| 5| YES|I'm Executive Pla...|executive platinu...| False| 2| |10134| Delta Air Lines|2013-12-10| USA| 8.0| Economy| 4| YES|MSP-JFK-MXP and r...|msp jfk mxp retur...| False| 3| |10912| United Airlines|2014-04-07| USA| 3.0| Economy| 1| NO|Worst airline I h...|worst airline ual...| False| 4| +-----+------------------+----------+--------+------+--------+-----+-----------+--------------------+--------------------+--------+---+ only showing top 5 rows
#LDA模型管道
from pyspark.ml.feature import HashingTF, IDF, Tokenizer from pyspark.ml import Pipeline from pyspark.ml.classification import NaiveBayes, RandomForestClassifier from pyspark.ml.clustering import LDA from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.tuning import ParamGridBuilder from pyspark.ml.tuning import CrossValidator from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer from pyspark.ml.feature import CountVectorizer # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb. tokenizer = Tokenizer(inputCol="lemm_text", outputCol="words") #data = tokenizer.transform(data) vectorizer = CountVectorizer(inputCol= "words", outputCol="rawFeatures") idf = IDF(inputCol="rawFeatures", outputCol="features") #idfModel = idf.fit(data) lda = LDA(k=20, seed=1, optimizer="em") pipeline = Pipeline(stages=[tokenizer, vectorizer,idf, lda]) model = pipeline.fit(data)
结果展示
话题
+-----+--------------------+--------------------+ |topic| termIndices| termWeights| +-----+--------------------+--------------------+ | 0|[60, 7, 12, 483, ...|[0.01349507958269...| | 1|[363, 29, 187, 55...|[0.01247250144447...| | 2|[46, 107, 672, 27...|[0.01188684264641...| | 3|[76, 43, 285, 152...|[0.01132638300115...| | 4|[201, 13, 372, 69...|[0.01337529863256...| | 5|[122, 103, 181, 4...|[0.00930415977117...| | 6|[14, 270, 18, 74,...|[0.01253817708163...| | 7|[111, 36, 341, 10...|[0.01269584954257...| | 8|[477, 266, 297, 1...|[0.01017486869509...| | 9|[10, 73, 46, 1, 2...|[0.01050875237546...| | 10|[57, 29, 411, 10,...|[0.01777350667863...| | 11|[293, 119, 385, 4...|[0.01280305149305...| | 12|[116, 218, 256, 1...|[0.01570714218509...| | 13|[433, 171, 176, 3...|[0.00819684813575...| | 14|[74, 84, 45, 108,...|[0.01700630002172...| | 15|[669, 215, 14, 58...|[0.00779310974971...| | 16|[198, 21, 98, 164...|[0.01030577084202...| | 17|[96, 29, 569, 444...|[0.01297142577633...| | 18|[18, 60, 140, 64,...|[0.01306356985169...| | 19|[33, 178, 95, 2, ...|[0.00907425683229...| +-----+--------------------+--------------------+
主题词
from pyspark.sql.types import ArrayType, StringType def termsIdx2Term(vocabulary): def termsIdx2Term(termIndices): return [vocabulary[int(index)] for index in termIndices] return udf(termsIdx2Term, ArrayType(StringType())) vectorizerModel = model.stages[1] vocabList = vectorizerModel.vocabulary final = ldatopics.withColumn("Terms", termsIdx2Term(vocabList)("termIndices"))+-----+------------------------------------------------+-------------------------------------------------------------------------------------+ |topic|termIndices |Terms | +-----+------------------------------------------------+-------------------------------------------------------------------------------------+ |0 |[60, 7, 12, 483, 292, 326, 88, 4, 808, 32] |[pm, plane, board, kid, online, lga, schedule, get, memphis, arrive] | |1 |[363, 29, 187, 55, 48, 647, 30, 9, 204, 457] |[dublin, class, th, sit, entertainment, express, say, delay, dl, son] | |2 |[46, 107, 672, 274, 92, 539, 23, 27, 279, 8] |[economy, sfo, milwaukee, decent, comfortable, iad, return, united, average, airline]| |3 |[76, 43, 285, 152, 102, 34, 300, 113, 24, 31] |[didn, pay, lose, different, extra, bag, mile, baggage, leave, day] | |4 |[201, 13, 372, 692, 248, 62, 211, 187, 105, 110]|[houston, crew, heathrow, louisville, london, great, denver, th, land, jfk] | |5 |[122, 103, 181, 48, 434, 10, 121, 147, 934, 169]|[lhr, serve, screen, entertainment, ny, delta, excellent, atl, sin, newark] | |6 |[14, 270, 18, 74, 70, 37, 16, 450, 3, 20] |[check, employee, gate, line, change, wait, take, fll, time, tell] | |7 |[111, 36, 341, 10, 320, 528, 844, 19, 195, 524] |[atlanta, first, toilet, delta, washington, card, global, staff, route, amsterdam] | |8 |[477, 266, 297, 185, 1, 33, 22, 783, 17, 908] |[fuel, group, pas, boarding, seat, trip, minute, orleans, make, select] | |9 |[10, 73, 46, 1, 248, 302, 213, 659, 48, 228] |[delta, lax, economy, seat, london, detroit, comfo, weren, entertainment, wife] | |10 |[57, 29, 411, 10, 221, 121, 661, 19, 805, 733] |[business, class, fra, delta, lounge, excellent, syd, staff, nov, mexico] | |11 |[293, 119, 385, 481, 503, 69, 13, 87, 176, 545] |[march, ua, manchester, phx, envoy, drink, crew, american, aa, canada] | |12 |[116, 218, 256, 156, 639, 20, 365, 18, 22, 136] |[san, clt, francisco, second, text, tell, captain, gate, minute, available] | |13 |[433, 171, 176, 339, 429, 575, 10, 26, 474, 796]|[daughter, small, aa, ba, segment, proceed, delta, passenger, size, similar] | |14 |[74, 84, 45, 108, 342, 111, 315, 87, 52, 4] |[line, agent, next, hotel, standby, atlanta, dallas, american, book, get] | |15 |[669, 215, 14, 58, 561, 59, 125, 179, 93, 5] |[fit, carry, check, people, bathroom, ask, thing, row, don, fly] | |16 |[198, 21, 98, 164, 57, 141, 345, 62, 121, 174] |[ife, good, nice, much, business, lot, dfw, great, excellent, carrier] | |17 |[96, 29, 569, 444, 15, 568, 21, 103, 657, 505] |[phl, class, diego, lady, food, wheelchair, good, serve, miami, mia] | |18 |[18, 60, 140, 64, 47, 40, 31, 35, 2, 123] |[gate, pm, phoenix, connection, cancel, connect, day, airpo, hour, charlotte] | |19 |[33, 178, 95, 2, 9, 284, 42, 4, 89, 31] |[trip, counter, philadelphia, hour, delay, stay, way, get, southwest, day] | +-----+------------------------------------------------+-------------------------------------------------------------------------------------+
LDA结果
+-----+------------------+----------+-----------+------+--------------------+--------------------+--------------------+ | id| airline| date| cabin|rating| words| features| topicDistribution| +-----+------------------+----------+-----------+------+--------------------+--------------------+--------------------+ |10551|Southwest Airlines|2013-11-06| Business| 1.0|[flight, chicago,...|(4695,[0,2,3,6,11...|[0.03640342580508...| |10298| US Airways|2014-03-31| Business| 1.0|[flight, manchest...|(4695,[0,1,2,6,7,...|[0.01381306271470...| |10564|Southwest Airlines|2013-09-06| Economy| 10.0|[executive, plati...|(4695,[0,1,6,7,11...|[0.05063554352934...| |10134| Delta Air Lines|2013-12-10| Economy| 8.0|[msp, jfk, mxp, r...|(4695,[0,1,3,10,1...|[0.01494708959842...| |10912| United Airlines|2014-04-07| Economy| 3.0|[worst, airline, ...|(4695,[0,1,7,8,13...|[0.04421751181232...| |10089| Delta Air Lines|2014-02-18| Economy| 2.0|[dl, mia, lax, im...|(4695,[2,4,5,7,8,...|[0.02158861273876...| |10385| US Airways|2013-10-21| Economy| 10.0|[flew, gla, phl, ...|(4695,[0,1,3,5,14...|[0.03343845991816...| |10249| US Airways|2014-06-17| Economy| 1.0|[friend, book, fl...|(4695,[0,2,3,4,5,...|[0.02362432562165...| |10289| US Airways|2014-04-12| Economy| 10.0|[flew, air, rome,...|(4695,[0,1,5,8,13...|[0.01664012816210...| |10654|Southwest Airlines|2012-07-10| Economy| 8.0|[lhr, jfk, think,...|(4695,[0,4,5,6,8,...|[0.01526072330297...| |10754| American Airlines|2014-05-04| Economy| 10.0|[san, diego, moli...|(4695,[0,2,8,15,2...|[0.03571177612496...| |10646|Southwest Airlines|2012-08-17| Economy| 7.0|[toledo, co, stop...|(4695,[0,2,3,4,7,...|[0.02394775146271...| |10097| Delta Air Lines|2014-02-03|First Class| 10.0|[honolulu, la, fi...|(4695,[0,4,6,7,13...|[0.02008375619661...| |10132| Delta Air Lines|2013-12-16| Economy| 7.0|[manchester, uk, ...|(4695,[0,1,2,3,5,...|[0.01463126146601...| |10560|Southwest Airlines|2013-09-20| Economy| 9.0|[first, time, sou...|(4695,[0,3,7,8,9,...|[0.04934836409896...| |10579|Southwest Airlines|2013-07-25| Economy| 0.0|[plane, land, pm,...|(4695,[2,3,4,5,7,...|[0.06106959241722...| |10425| US Airways|2013-08-06| Economy| 3.0|[airway, bad, pro...|(4695,[2,3,4,7,8,...|[0.01770471771322...| |10650|Southwest Airlines|2012-07-27| Economy| 9.0|[flew, jfk, lhr, ...|(4695,[0,1,6,13,1...|[0.02676226245086...| |10260| US Airways|2014-06-03| Economy| 1.0|[february, air, u...|(4695,[0,2,4,17,2...|[0.02887390875079...| |10202| Delta Air Lines|2013-09-14| Economy| 10.0|[aug, lhr, jfk, b...|(4695,[1,2,4,7,10...|[0.02377704988307...| +-----+------------------+----------+-----------+------+--------------------+--------------------+--------------------+ only showing top 20 rows
每天的平均评级和航空公司

每月平均评级和航空公司

主题1对应时间线

与专题1相关的评论(文件)
