12. RFM分析¶

上图来源:Blast Analytics Marketing
RFM是一种用于分析客户价值的方法。它常用于数据库营销和直接营销,在零售业和专业服务业得到了特别的重视。更多详情可在维基百科上找到。 RFM_wikipedia .
RFM代表三个维度:
最近-客户最近购买了多少?即自上次购买以来的持续时间
频率——他们多久购买一次?即购买总数
货币价值——他们花了多少钱?即该客户花费的总金额
12.1. RFM分析方法¶
RFM分析包括三个主要步骤:
12.1.1. 1。为每个客户构建RFM特性矩阵¶
+----------+-------+---------+---------+
|CustomerID|Recency|Frequency| Monetary|
+----------+-------+---------+---------+
| 14911| 1| 248|132572.62|
| 12748| 0| 224| 29072.1|
| 17841| 1| 169| 40340.78|
| 14606| 1| 128| 11713.85|
| 15311| 0| 118| 59419.34|
+----------+-------+---------+---------+
only showing top 5 rows
12.1.2. 2。确定每个特征的切割点¶
+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
| 17420| 50| 3| 598.83| 2| 3| 2|
| 16861| 59| 3| 151.65| 3| 3| 1|
| 16503| 106| 5| 1421.43| 3| 2| 3|
| 15727| 16| 7| 5178.96| 1| 1| 4|
| 17389| 0| 43|31300.08| 1| 1| 4|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows
12.1.3. 三。确定RFM得分并汇总相应的业务价值¶
+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
| 17988| 11| 8| 191.17| 1| 1| 1| 111|
| 16892| 1| 7| 496.84| 1| 1| 2| 112|
| 16668| 15| 6| 306.72| 1| 1| 2| 112|
| 16554| 3| 7| 641.55| 1| 1| 2| 112|
| 16500| 4| 6| 400.86| 1| 1| 2| 112|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows
相应的业务描述和营销价值:

资料来源:BLAST分析营销¶
12.2. 演示¶
Jupyter notebook可从 Data Exploration .
数据可以从下载 German Credit .
12.2.1. 加载和清理数据¶
设置Spark上下文和SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark RFM example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
负载数据集
df_raw = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("Online Retail.csv",header=True);
检查数据集
df_raw.show(5)
df_raw.printSchema()
然后你会得到
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55| 17850|United Kingdom|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75| 17850|United Kingdom|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
only showing top 5 rows
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: integer (nullable = true)
|-- Country: string (nullable = true)
数据清理和数据操作
检查并移除
null
价值观
from pyspark.sql.functions import count
def my_count(df_in):
df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
my_count(df_raw)
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
| 541909| 541909| 540455| 541909| 541909| 541909| 406829| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
由于计数结果不相同,因此在 CustomerID
列。我们可以从数据集中除去这些记录。
df = df_raw.dropna(how='any')
my_count(df)
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
| 406829| 406829| 406829| 406829| 406829| 406829| 406829| 406829|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
处理发票人
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col
timeFmt = "MM/dd/yy HH:mm"
df = df.withColumn('NewInvoiceDate'
, to_utc_timestamp(unix_timestamp(col('InvoiceDate'),timeFmt).cast('timestamp')
, 'UTC'))
df.show(5)
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| NewInvoiceDate|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55| 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75| 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:...|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:...|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------------+
only showing top 5 rows
警告
Spark对日期格式非常敏感!
计算总价
from pyspark.sql.functions import round
df = df.withColumn('TotalPrice', round( df.Quantity * df.UnitPrice, 2 ) )
计算时差
from pyspark.sql.functions import mean, min, max, sum, datediff, to_date
date_max = df.select(max('NewInvoiceDate')).toPandas()
current = to_utc_timestamp( unix_timestamp(lit(str(date_max.iloc[0][0])), \
'yy-MM-dd HH:mm').cast('timestamp'), 'UTC' )
# Calculatre Duration
df = df.withColumn('Duration', datediff(lit(current), 'NewInvoiceDate'))
建立新的、频率和货币
recency = df.groupBy('CustomerID').agg(min('Duration').alias('Recency'))
frequency = df.groupBy('CustomerID', 'InvoiceNo').count()\
.groupBy('CustomerID')\
.agg(count("*").alias("Frequency"))
monetary = df.groupBy('CustomerID').agg(round(sum('TotalPrice'), 2).alias('Monetary'))
rfm = recency.join(frequency,'CustomerID', how = 'inner')\
.join(monetary,'CustomerID', how = 'inner')
rfm.show(5)
+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
| 17420| 50| 3| 598.83|
| 16861| 59| 3| 151.65|
| 16503| 106| 5| 1421.43|
| 15727| 16| 7| 5178.96|
| 17389| 0| 43|31300.08|
+----------+-------+---------+--------+
only showing top 5 rows
12.2.2. RFM分割¶
确定切割点
在本节中,您可以在 数据探索 帮助您确定每个属性的切割点的部分。在我看来,切入点主要取决于商业意识。你最好和你的发号施令的人谈谈,从他们那里得到反馈和建议。我将在这个演示中使用分位数作为切入点。
cols = ['Recency','Frequency','Monetary']
describe_pd(rfm,cols,1)
+-------+-----------------+-----------------+------------------+
|summary| Recency| Frequency| Monetary|
+-------+-----------------+-----------------+------------------+
| count| 4372.0| 4372.0| 4372.0|
| mean|91.58119853613907| 5.07548032936871|1898.4597003659655|
| stddev|100.7721393138483|9.338754163574727| 8219.345141139722|
| min| 0.0| 1.0| -4287.63|
| max| 373.0| 248.0| 279489.02|
| 25%| 16.0| 1.0|293.36249999999995|
| 50%| 50.0| 3.0| 648.075|
| 75%| 143.0| 5.0| 1611.725|
+-------+-----------------+-----------------+------------------+
使用切割点的用户定义函数:
def RScore(x):
if x <= 16:
return 1
elif x<= 50:
return 2
elif x<= 143:
return 3
else:
return 4
def FScore(x):
if x <= 1:
return 4
elif x <= 3:
return 3
elif x <= 5:
return 2
else:
return 1
def MScore(x):
if x <= 293:
return 4
elif x <= 648:
return 3
elif x <= 1611:
return 2
else:
return 1
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
R_udf = udf(lambda x: RScore(x), StringType())
F_udf = udf(lambda x: FScore(x), StringType())
M_udf = udf(lambda x: MScore(x), StringType())
RFM分割
rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))
rfm_seg.show(5)
+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
| 17420| 50| 3| 598.83| 2| 3| 2|
| 16861| 59| 3| 151.65| 3| 3| 1|
| 16503| 106| 5| 1421.43| 3| 2| 3|
| 15727| 16| 7| 5178.96| 1| 1| 4|
| 17389| 0| 43|31300.08| 1| 1| 4|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows
rfm_seg = rfm_seg.withColumn('RFMScore',
F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg')))
rfm_seg.sort(F.col('RFMScore')).show(5)
+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
| 17988| 11| 8| 191.17| 1| 1| 1| 111|
| 16892| 1| 7| 496.84| 1| 1| 2| 112|
| 16668| 15| 6| 306.72| 1| 1| 2| 112|
| 16554| 3| 7| 641.55| 1| 1| 2| 112|
| 16500| 4| 6| 400.86| 1| 1| 2| 112|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows
12.2.3. 统计摘要¶
统计摘要
简单总结
rfm_seg.groupBy('RFMScore')\
.agg({'Recency':'mean',
'Frequency': 'mean',
'Monetary': 'mean'} )\
.sort(F.col('RFMScore')).show(5)
+--------+-----------------+------------------+------------------+
|RFMScore| avg(Recency)| avg(Monetary)| avg(Frequency)|
+--------+-----------------+------------------+------------------+
| 111| 11.0| 191.17| 8.0|
| 112| 8.0| 505.9775| 7.5|
| 113|7.237113402061856|1223.3604123711339| 7.752577319587629|
| 114|6.035123966942149| 8828.888595041324|18.882231404958677|
| 121| 9.6| 207.24| 4.4|
+--------+-----------------+------------------+------------------+
only showing top 5 rows
复杂总结
grp = 'RFMScore'
num_cols = ['Recency','Frequency','Monetary']
df_input = rfm_seg
quantile_grouped = quantile_agg(df_input,grp,num_cols)
quantile_grouped.toPandas().to_csv(output_dir+'quantile_grouped.csv')
deciles_grouped = deciles_agg(df_input,grp,num_cols)
deciles_grouped.toPandas().to_csv(output_dir+'deciles_grouped.csv')
12.3. 延伸¶
您还可以在 聚类 要进行分段的部分。
12.3.1. 构建特征矩阵¶
构建密集特征矩阵
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
# method 1 (good for small feature):
#def transData(row):
# return Row(label=row["Sales"],
# features=Vectors.dense([row["TV"],
# row["Radio"],
# row["Newspaper"]]))
# Method 2 (good for large features):
def transData(data):
return data.rdd.map(lambda r: [r[0],Vectors.dense(r[1:])]).toDF(['CustomerID','rfm'])
transformed= transData(rfm)
transformed.show(5)
+----------+-------------------+
|CustomerID| rfm|
+----------+-------------------+
| 17420| [50.0,3.0,598.83]|
| 16861| [59.0,3.0,151.65]|
| 16503|[106.0,5.0,1421.43]|
| 15727| [16.0,7.0,5178.96]|
| 17389|[0.0,43.0,31300.08]|
+----------+-------------------+
only showing top 5 rows
特征矩阵定标器
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="rfm",\
outputCol="features")
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)
scaledData.show(5,False)
+----------+-------------------+--------------------------------------------------------------+
|CustomerID|rfm |features |
+----------+-------------------+--------------------------------------------------------------+
|17420 |[50.0,3.0,598.83] |[0.13404825737265416,0.008097165991902834,0.01721938714830836]|
|16861 |[59.0,3.0,151.65] |[0.1581769436997319,0.008097165991902834,0.01564357039241953] |
|16503 |[106.0,5.0,1421.43]|[0.28418230563002683,0.016194331983805668,0.02011814573186342]|
|15727 |[16.0,7.0,5178.96] |[0.04289544235924933,0.024291497975708502,0.03335929858922501]|
|17389 |[0.0,43.0,31300.08]|[0.0,0.1700404858299595,0.12540746393334334] |
+----------+-------------------+--------------------------------------------------------------+
only showing top 5 rows
12.3.2. k-均值聚类¶
找到最佳簇数
我将介绍两种常用的方法来确定集群的最佳数量。
弯头分析
#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
from pyspark.ml.clustering import KMeans
#from pyspark.ml.evaluation import ClusteringEvaluator # requires Spark 2.4 or later
import numpy as np
cost = np.zeros(20)
for k in range(2,20):
kmeans = KMeans()\
.setK(k)\
.setSeed(1) \
.setFeaturesCol("features")\
.setPredictionCol("cluster")
model = kmeans.fit(scaledData)
cost[k] = model.computeCost(scaledData) # requires Spark 2.0 or later
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20], marker = "o")
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()

成本V.S.集群数量¶
在我看来,有时很难选择集群的数量。如图所示 成本V.S.集群数量 ,您可以选择3、5甚至8。我会选择 3
在这个演示中。
轮廓分析
#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
def optimal_k(df_in,index_col,k_min, k_max,num_runs):
'''
Determine optimal number of clusters by using Silhoutte Score Analysis.
:param df_in: the input dataframe
:param index_col: the name of the index column
:param k_min: the train dataset
:param k_min: the minmum number of the clusters
:param k_max: the maxmum number of the clusters
:param num_runs: the number of runs for each fixed clusters
:return k: optimal number of the clusters
:return silh_lst: Silhouette score
:return r_table: the running results table
:author: Wenqiang Feng
:email: von198@gmail.com.com
'''
start = time.time()
silh_lst = []
k_lst = np.arange(k_min, k_max+1)
r_table = df_in.select(index_col).toPandas()
r_table = r_table.set_index(index_col)
centers = pd.DataFrame()
for k in k_lst:
silh_val = []
for run in np.arange(1, num_runs+1):
# Trains a k-means model.
kmeans = KMeans()\
.setK(k)\
.setSeed(int(np.random.randint(100, size=1)))
model = kmeans.fit(df_in)
# Make predictions
predictions = model.transform(df_in)
r_table['cluster_{k}_{run}'.format(k=k, run=run)]= predictions.select('prediction').toPandas()
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
silh_val.append(silhouette)
silh_array=np.asanyarray(silh_val)
silh_lst.append(silh_array.mean())
elapsed = time.time() - start
silhouette = pd.DataFrame(list(zip(k_lst,silh_lst)),columns = ['k', 'silhouette'])
print('+------------------------------------------------------------+')
print("| The finding optimal k phase took %8.0f s. |" %(elapsed))
print('+------------------------------------------------------------+')
return k_lst[np.argmax(silh_lst, axis=0)], silhouette , r_table
k, silh_lst, r_table = optimal_k(scaledData,index_col,k_min, k_max,num_runs)
+------------------------------------------------------------+
| The finding optimal k phase took 1783 s. |
+------------------------------------------------------------+
spark.createDataFrame(silh_lst).show()
+---+------------------+
| k| silhouette|
+---+------------------+
| 3|0.8045154385557953|
| 4|0.6993528775512052|
| 5|0.6689286654221447|
| 6|0.6356184024841809|
| 7|0.7174102265711756|
| 8|0.6720861758298997|
| 9| 0.601771359881241|
| 10|0.6292447334578428|
+---+------------------+
从剪影列表中,我们可以选择 3
作为最佳簇数。
警告
ClusteringEvaluator
在里面 pyspark.ml.evaluation
需要Spark 2.4或更高版本!!
k-均值聚类
k = 3
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(scaledData)
# Make predictions
predictions = model.transform(scaledData)
predictions.show(5,False)
+----------+-------------------+--------------------+----------+
|CustomerID| rfm| features|prediction|
+----------+-------------------+--------------------+----------+
| 17420| [50.0,3.0,598.83]|[0.13404825737265...| 0|
| 16861| [59.0,3.0,151.65]|[0.15817694369973...| 0|
| 16503|[106.0,5.0,1421.43]|[0.28418230563002...| 2|
| 15727| [16.0,7.0,5178.96]|[0.04289544235924...| 0|
| 17389|[0.0,43.0,31300.08]|[0.0,0.1700404858...| 0|
+----------+-------------------+--------------------+----------+
only showing top 5 rows
12.3.3. 统计摘要¶
统计摘要
results = rfm.join(predictions.select('CustomerID','prediction'),'CustomerID',how='left')
results.show(5)
+----------+-------+---------+--------+----------+
|CustomerID|Recency|Frequency|Monetary|prediction|
+----------+-------+---------+--------+----------+
| 13098| 1| 41|28658.88| 0|
| 13248| 124| 2| 465.68| 2|
| 13452| 259| 2| 590.0| 1|
| 13460| 29| 2| 183.44| 0|
| 13518| 85| 1| 659.44| 0|
+----------+-------+---------+--------+----------+
only showing top 5 rows
简单总结
results.groupBy('prediction')\
.agg({'Recency':'mean',
'Frequency': 'mean',
'Monetary': 'mean'} )\
.sort(F.col('prediction')).show(5)
+----------+------------------+------------------+------------------+
|prediction| avg(Recency)| avg(Monetary)| avg(Frequency)|
+----------+------------------+------------------+------------------+
| 0|30.966337980278816|2543.0355321319284| 6.514450867052023|
| 1|296.02403846153845|407.16831730769206|1.5592948717948718|
| 2|154.40148698884758| 702.5096406443623| 2.550185873605948|
+----------+------------------+------------------+------------------+
复杂总结
grp = 'RFMScore'
num_cols = ['Recency','Frequency','Monetary']
df_input = results
quantile_grouped = quantile_agg(df_input,grp,num_cols)
quantile_grouped.toPandas().to_csv(output_dir+'quantile_grouped.csv')
deciles_grouped = deciles_agg(df_input,grp,num_cols)
deciles_grouped.toPandas().to_csv(output_dir+'deciles_grouped.csv')