5. 用RDD编程

中国谚语

知彼知己,百战不殆;不知彼而知己,一胜一负;不知彼,不知己,每战必殆。 –《孙子·谋攻篇》

RDD表示 弹性分布式数据集 . spark中的RDD只是对象集的不可变分布式集合。每个RDD被分割成多个分区(类似的模式,集合较小),可以在集群的不同节点上计算。

5.1. 创建RDD

通常,有两种常用的创建RDD的方法:加载外部数据集,或分发一组对象集合。下面的示例展示了通过使用 parallelize() 函数,它接受程序中已经存在的集合并将其传递给Spark上下文。

  1. 通过使用 parallelize( ) 功能性

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),
             (4, 5, 6, 'd e f'),
             (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])

然后您将得到RDD数据:

df.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])

然后您将得到RDD数据:

myData.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
  1. 通过使用 createDataFrame( ) 功能

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Employee = spark.createDataFrame([
                        ('1', 'Joe',   '70000', '1'),
                        ('2', 'Henry', '80000', '2'),
                        ('3', 'Sam',   '60000', '2'),
                        ('4', 'Max',   '90000', '1')],
                        ['Id', 'Name', 'Sallary','DepartmentId']
                       )

然后您将得到RDD数据:

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+
  1. 通过使用 readload 功能

  1. 从.csv文件读取数据集

## set up  SparkSession
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load("/home/feng/Spark/Code/data/Advertising.csv",header=True)

df.show(5)
df.printSchema()

然后您将得到RDD数据:

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows

root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)

创建后,RDD提供两种类型的操作:转换和操作。

  1. 从数据库读取数据集

## set up  SparkSession
from pyspark.sql import SparkSession

spark = SparkSession \
            .builder \
            .appName("Python Spark create RDD example") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()

## User information
user = 'your_username'
pw   = 'your_password'

## Database information
table_name = 'table_name'
url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'&password='+pw
properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user': user}

df = spark.read.jdbc(url=url, table=table_name, properties=properties)

df.show(5)
df.printSchema()

然后您将得到RDD数据:

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows

root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)

注解

从数据库读取表需要相应数据库的适当驱动器。例如,上面的演示需要 org.postgresql.Driver 你需要下载并放进去 jars spark安装路径的文件夹。我下载 postgresql-42.1.1.jar 从官方网站把它放进去 jars 文件夹。

  1. 从HDFS读取数据集

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext

sc= SparkContext('local','example')
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://cdhstltest/user/data/demo.CSV")
print(tf1.first())

hc.sql("use intg_cme_w")
spf = hc.sql("SELECT * FROM spf LIMIT 100")
print(spf.show(5))

5.2. Spark 操作

警告

以下所有数字均来自杰弗里汤普森。感兴趣的读者被介绍给 pyspark pictures

Spark 操作有两种主要类型:转换和操作 [Karau2015].

images/visualapi_006.png

注解

有些人定义了三种类型的操作:转换、操作和无序排列。

5.2.1. Spark 变换

转换从以前的RDD构造新的RDD。例如,一种常见的转换是筛选与谓词匹配的数据。

images/transforms1.png
images/transforms2.png

5.2.2. Spark 作用

另一方面,操作根据RDD计算结果,并将其返回到驱动程序或保存到外部存储系统(如HDF)。

images/actions1.png
images/actions2.png

5.3. rdd.DataFrame vs pd.DataFrame

5.3.1. 创建数据帧

  1. 从列表中

my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']

:: Python Code:

# caution for the columns=
pd.DataFrame(my_list,columns= col_name)
#
spark.createDataFrame(my_list, col_name).show()

:: Comparison:

                  +---+---+---+
                  |  A|  B|  C|
   A  B  C        +---+---+---+
0  a  1  2        |  a|  1|  2|
1  b  2  3        |  b|  2|  3|
2  c  3  4        |  c|  3|  4|
                  +---+---+---+

注意

注意参数 columns= 在里面 pd.DataFrame . 因为默认值将使列表成为行。

:: Python Code:

# caution for the columns=
pd.DataFrame(my_list, columns= col_name)
#
pd.DataFrame(my_list, col_name)

:: Comparison:

   A  B  C             0  1  2
0  a  1  2          A  a  1  2
1  b  2  3          B  b  2  3
2  c  3  4          C  c  3  4
  1. 从DICT

d = {'A': [0, 1, 0],
     'B': [1, 0, 1],
     'C': [1, 0, 0]}

:: Python Code:

pd.DataFrame(d)for
# Tedious for PySpark
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

:: Comparison:

                   +---+---+---+
                   |  A|  B|  C|
   A  B  C         +---+---+---+
0  0  1  1         |  0|  1|  1|
1  1  0  0         |  1|  0|  0|
2  0  1  0         |  0|  1|  0|
                   +---+---+---+

5.3.2. 加载数据帧

  1. 从数据库

大多数时候,您需要与同事共享您的代码,或者发布代码以进行代码审查或质量保证(QA)。你肯定不想 User Information 在代码中。因此,您可以将它们保存在login.txt中:

runawayhorse001
PythonTips

并使用以下代码导入 User Information

#User Information
try:
    login = pd.read_csv(r'login.txt', header=None)
    user = login[0][0]
    pw = login[0][1]
    print('User information is ready!')
except:
    print('Login information is not available!!!')

#Database information
host = '##.###.###.##'
db_name = 'db_name'
table_name = 'table_name'

:: Comparison:

conn = psycopg2.connect(host=host, database=db_name, user=user, password=pw)
cur = conn.cursor()

sql = """
      select *
      from {table_name}
      """.format(table_name=table_name)
dp = pd.read_sql(sql, conn)
# connect to database
url = 'jdbc:postgresql://'+host+':5432/'+db_name+'?user='+user+'&password='+pw
properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user': user}
ds = spark.read.jdbc(url=url, table=table_name, properties=properties)

注意

使用pyspark从数据库读取表需要相应数据库的适当驱动器。例如,上面的演示需要org.postgresql.driver,您需要下载并放入 jars spark安装路径的文件夹。我从官方网站下载了postgresql-42.1.1.jar并把它放在jars文件夹中。

  1. .csv

:: Comparison:

# pd.DataFrame dp: DataFrame pandas
dp = pd.read_csv('Advertising.csv')
#rdd.DataFrame. dp: DataFrame spark
ds = spark.read.csv(path='Advertising.csv',
#                sep=',',
#                encoding='UTF-8',
#                comment=None,
               header=True,
               inferSchema=True)
  1. .json

数据来源:http://api.luftdaten.info/static/v1/data.json

dp = pd.read_json("data/data.json")
ds = spark.read.json('data/data.json')

:: Python Code:

dp[['id','timestamp']].head(4)
#
ds[['id','timestamp']].show(4)

:: Comparison:

                                                +----------+-------------------+
                                                |        id|          timestamp|
            id  timestamp                       +----------+-------------------+
0   2994551481  2019-02-28 17:23:52             |2994551481|2019-02-28 17:23:52|
1   2994551482  2019-02-28 17:23:52             |2994551482|2019-02-28 17:23:52|
2   2994551483  2019-02-28 17:23:52             |2994551483|2019-02-28 17:23:52|
3   2994551484  2019-02-28 17:23:52             |2994551484|2019-02-28 17:23:52|
                                                +----------+-------------------+
                                                only showing top 4 rows

5.3.3. 弗斯特 n

:: Python Code:

dp.head(4)
#
ds.show(4)

:: Comparison:

                                        +-----+-----+---------+-----+
                                        |   TV|Radio|Newspaper|Sales|
      TV  Radio  Newspaper  Sales       +-----+-----+---------+-----+
0  230.1   37.8       69.2   22.1       |230.1| 37.8|     69.2| 22.1|
1   44.5   39.3       45.1   10.4       | 44.5| 39.3|     45.1| 10.4|
2   17.2   45.9       69.3    9.3       | 17.2| 45.9|     69.3|  9.3|
3  151.5   41.3       58.5   18.5       |151.5| 41.3|     58.5| 18.5|
                                        +-----+-----+---------+-----+
                                        only showing top 4 rows

5.3.4. 列名

:: Python Code:

dp.columns
#
ds.columns

:: Comparison:

Index(['TV', 'Radio', 'Newspaper', 'Sales'], dtype='object')
['TV', 'Radio', 'Newspaper', 'Sales']

5.3.5. 数据类型

:: Python Code:

dp.dtypes
#
ds.dtypes

:: Comparison:

TV           float64                    [('TV', 'double'),
Radio        float64                     ('Radio', 'double'),
Newspaper    float64                     ('Newspaper', 'double'),
Sales        float64                     ('Sales', 'double')]
dtype: object

5.3.6. 填充零点

my_list = [['a', 1, None], ['b', 2, 3],['c', 3, 4]]
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
#
dp.head()
ds.show()

:: Comparison:

                                        +------+---+----+
                                        |     A|  B|   C|
        A  B    C                       +------+---+----+
0    male  1  NaN                       |  male|  1|null|
1  female  2  3.0                       |female|  2|   3|
2    male  3  4.0                       |  male|  3|   4|
                                        +------+---+----+

:: Python Code:

dp.fillna(-99)
#
ds.fillna(-99).show()

:: Comparison:

                                        +------+---+----+
                                        |     A|  B|   C|
        A  B    C                       +------+---+----+
0    male  1  -99                       |  male|  1| -99|
1  female  2  3.0                       |female|  2|   3|
2    male  3  4.0                       |  male|  3|   4|
                                        +------+---+----+

5.3.7. 替换值

:: Python Code:

# caution: you need to chose specific col
dp.A.replace(['male', 'female'],[1, 0], inplace=True)
dp
#caution: Mixed type replacements are not supported
ds.na.replace(['male','female'],['1','0']).show()

:: Comparison:

                                +---+---+----+
                                |  A|  B|   C|
   A  B    C                    +---+---+----+
0  1  1  NaN                    |  1|  1|null|
1  0  2  3.0                    |  0|  2|   3|
2  1  3  4.0                    |  1|  3|   4|
                                +---+---+----+

5.3.8. 重命名列

  1. 重命名所有列

:: Python Code:

dp.columns = ['a','b','c','d']
dp.head(4)
#
ds.toDF('a','b','c','d').show(4)

:: Comparison:

                                                +-----+----+----+----+
                                                |    a|   b|   c|   d|
       a     b     c     d                      +-----+----+----+----+
0  230.1  37.8  69.2  22.1                      |230.1|37.8|69.2|22.1|
1   44.5  39.3  45.1  10.4                      | 44.5|39.3|45.1|10.4|
2   17.2  45.9  69.3   9.3                      | 17.2|45.9|69.3| 9.3|
3  151.5  41.3  58.5  18.5                      |151.5|41.3|58.5|18.5|
                                                +-----+----+----+----+
                                                only showing top 4 rows
  1. 重命名一列或多列

mapping = {'Newspaper':'C','Sales':'D'}

:: Python Code:

dp.rename(columns=mapping).head(4)
#
new_names = [mapping.get(col,col) for col in ds.columns]
ds.toDF(*new_names).show(4)

:: Comparison:

                                        +-----+-----+----+----+
                                        |   TV|Radio|   C|   D|
      TV  Radio     C     D             +-----+-----+----+----+
0  230.1   37.8  69.2  22.1             |230.1| 37.8|69.2|22.1|
1   44.5   39.3  45.1  10.4             | 44.5| 39.3|45.1|10.4|
2   17.2   45.9  69.3   9.3             | 17.2| 45.9|69.3| 9.3|
3  151.5   41.3  58.5  18.5             |151.5| 41.3|58.5|18.5|
                                        +-----+-----+----+----+
                                        only showing top 4 rows

注解

您也可以使用 withColumnRenamed 在pyspark中重命名一列。

:: Python Code:

ds.withColumnRenamed('Newspaper','Paper').show(4

:: Comparison:

+-----+-----+-----+-----+
|   TV|Radio|Paper|Sales|
+-----+-----+-----+-----+
|230.1| 37.8| 69.2| 22.1|
| 44.5| 39.3| 45.1| 10.4|
| 17.2| 45.9| 69.3|  9.3|
|151.5| 41.3| 58.5| 18.5|
+-----+-----+-----+-----+
only showing top 4 rows

5.3.9. 落柱

drop_name = ['Newspaper','Sales']

:: Python Code:

dp.drop(drop_name,axis=1).head(4)
#
ds.drop(*drop_name).show(4)

:: Comparison:

                                +-----+-----+
                                |   TV|Radio|
      TV  Radio                 +-----+-----+
0  230.1   37.8                 |230.1| 37.8|
1   44.5   39.3                 | 44.5| 39.3|
2   17.2   45.9                 | 17.2| 45.9|
3  151.5   41.3                 |151.5| 41.3|
                                +-----+-----+
                                only showing top 4 rows

5.3.10. 滤波器

dp = pd.read_csv('Advertising.csv')
#
ds = spark.read.csv(path='Advertising.csv',
                    header=True,
                    inferSchema=True)

:: Python Code:

dp[dp.Newspaper<20].head(4)
#
ds[ds.Newspaper<20].show(4)

:: Comparison:

                                                +-----+-----+---------+-----+
                                                |   TV|Radio|Newspaper|Sales|
       TV  Radio  Newspaper  Sales              +-----+-----+---------+-----+
7   120.2   19.6       11.6   13.2              |120.2| 19.6|     11.6| 13.2|
8     8.6    2.1        1.0    4.8              |  8.6|  2.1|      1.0|  4.8|
11  214.7   24.0        4.0   17.4              |214.7| 24.0|      4.0| 17.4|
13   97.5    7.6        7.2    9.7              | 97.5|  7.6|      7.2|  9.7|
                                                +-----+-----+---------+-----+
                                                only showing top 4 rows

:: Python Code:

dp[(dp.Newspaper<20)&(dp.TV>100)].head(4)
#
ds[(ds.Newspaper<20)&(ds.TV>100)].show(4)

:: Comparison:

                                                +-----+-----+---------+-----+
                                                |   TV|Radio|Newspaper|Sales|
       TV  Radio  Newspaper  Sales              +-----+-----+---------+-----+
7   120.2   19.6       11.6   13.2              |120.2| 19.6|     11.6| 13.2|
11  214.7   24.0        4.0   17.4              |214.7| 24.0|      4.0| 17.4|
19  147.3   23.9       19.1   14.6              |147.3| 23.9|     19.1| 14.6|
25  262.9    3.5       19.5   12.0              |262.9|  3.5|     19.5| 12.0|
                                                +-----+-----+---------+-----+
                                                only showing top 4 rows

5.3.11. 新栏

:: Python Code:

dp['tv_norm'] = dp.TV/sum(dp.TV)
dp.head(4)
#
ds.withColumn('tv_norm', ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0]).show(4)

:: Comparison:

                                                +-----+-----+---------+-----+--------------------+
                                                |   TV|Radio|Newspaper|Sales|             tv_norm|
      TV  Radio  Newspaper  Sales   tv_norm     +-----+-----+---------+-----+--------------------+
0  230.1   37.8       69.2   22.1  0.007824     |230.1| 37.8|     69.2| 22.1|0.007824268493802813|
1   44.5   39.3       45.1   10.4  0.001513     | 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
2   17.2   45.9       69.3    9.3  0.000585     | 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
3  151.5   41.3       58.5   18.5  0.005152     |151.5| 41.3|     58.5| 18.5|0.005151571824472517|
                                                +-----+-----+---------+-----+--------------------+
                                                only showing top 4 rows

:: Python Code:

dp['cond'] = dp.apply(lambda c: 1 if ((c.TV>100)&(c.Radio<40)) else 2 if c.Sales> 10 else 3,axis=1)
#
ds.withColumn('cond',F.when((ds.TV>100)&(ds.Radio<40),1)\
                      .when(ds.Sales>10, 2)\
                      .otherwise(3)).show(4)

:: Comparison:

                                                +-----+-----+---------+-----+----+
                                                |   TV|Radio|Newspaper|Sales|cond|
      TV  Radio  Newspaper  Sales  cond         +-----+-----+---------+-----+----+
0  230.1   37.8       69.2   22.1     1         |230.1| 37.8|     69.2| 22.1|   1|
1   44.5   39.3       45.1   10.4     2         | 44.5| 39.3|     45.1| 10.4|   2|
2   17.2   45.9       69.3    9.3     3         | 17.2| 45.9|     69.3|  9.3|   3|
3  151.5   41.3       58.5   18.5     2         |151.5| 41.3|     58.5| 18.5|   2|
                                                +-----+-----+---------+-----+----+
                                                only showing top 4 rows

:: Python Code:

dp['log_tv'] = np.log(dp.TV)
dp.head(4)
#
ds.withColumn('log_tv',F.log(ds.TV)).show(4)

:: Comparison:

                                                +-----+-----+---------+-----+------------------+
                                                |   TV|Radio|Newspaper|Sales|            log_tv|
      TV  Radio  Newspaper  Sales    log_tv     +-----+-----+---------+-----+------------------+
0  230.1   37.8       69.2   22.1  5.438514     |230.1| 37.8|     69.2| 22.1|  5.43851399704132|
1   44.5   39.3       45.1   10.4  3.795489     | 44.5| 39.3|     45.1| 10.4|3.7954891891721947|
2   17.2   45.9       69.3    9.3  2.844909     | 17.2| 45.9|     69.3|  9.3|2.8449093838194073|
3  151.5   41.3       58.5   18.5  5.020586     |151.5| 41.3|     58.5| 18.5| 5.020585624949423|
                                                +-----+-----+---------+-----+------------------+
                                                only showing top 4 rows

:: Python Code:

dp['tv+10'] = dp.TV.apply(lambda x: x+10)
dp.head(4)
#
ds.withColumn('tv+10', ds.TV+10).show(4)

:: Comparison:

                                                +-----+-----+---------+-----+-----+
                                                |   TV|Radio|Newspaper|Sales|tv+10|
      TV  Radio  Newspaper  Sales  tv+10        +-----+-----+---------+-----+-----+
0  230.1   37.8       69.2   22.1  240.1        |230.1| 37.8|     69.2| 22.1|240.1|
1   44.5   39.3       45.1   10.4   54.5        | 44.5| 39.3|     45.1| 10.4| 54.5|
2   17.2   45.9       69.3    9.3   27.2        | 17.2| 45.9|     69.3|  9.3| 27.2|
3  151.5   41.3       58.5   18.5  161.5        |151.5| 41.3|     58.5| 18.5|161.5|
                                                +-----+-----+---------+-----+-----+
                                                only showing top 4 rows

5.3.12. 加入

leftp = pd.DataFrame({'A': ['A0', 'A1', 'A2', 'A3'],
                    'B': ['B0', 'B1', 'B2', 'B3'],
                    'C': ['C0', 'C1', 'C2', 'C3'],
                    'D': ['D0', 'D1', 'D2', 'D3']},
                    index=[0, 1, 2, 3])

rightp = pd.DataFrame({'A': ['A0', 'A1', 'A6', 'A7'],
                       'F': ['B4', 'B5', 'B6', 'B7'],
                       'G': ['C4', 'C5', 'C6', 'C7'],
                       'H': ['D4', 'D5', 'D6', 'D7']},
                       index=[4, 5, 6, 7])

lefts = spark.createDataFrame(leftp)
rights = spark.createDataFrame(rightp)
    A   B   C   D                   A   F   G   H
0  A0  B0  C0  D0               4  A0  B4  C4  D4
1  A1  B1  C1  D1               5  A1  B5  C5  D5
2  A2  B2  C2  D2               6  A6  B6  C6  D6
3  A3  B3  C3  D3               7  A7  B7  C7  D7
  1. 左连接

    :: Python Code:

    leftp.merge(rightp,on='A',how='left')
    #
    lefts.join(rights,on='A',how='left')
         .orderBy('A',ascending=True).show()
    

    :: Comparison:

                                            +---+---+---+---+----+----+----+
                                            |  A|  B|  C|  D|   F|   G|   H|
        A   B   C   D    F    G    H        +---+---+---+---+----+----+----+
    0  A0  B0  C0  D0   B4   C4   D4        | A0| B0| C0| D0|  B4|  C4|  D4|
    1  A1  B1  C1  D1   B5   C5   D5        | A1| B1| C1| D1|  B5|  C5|  D5|
    2  A2  B2  C2  D2  NaN  NaN  NaN        | A2| B2| C2| D2|null|null|null|
    3  A3  B3  C3  D3  NaN  NaN  NaN        | A3| B3| C3| D3|null|null|null|
                                            +---+---+---+---+----+----+----+
    
  2. 右连接

    :: Python Code:

    leftp.merge(rightp,on='A',how='right')
    #
    lefts.join(rights,on='A',how='right')
         .orderBy('A',ascending=True).show()
    

    :: Comparison:

                                            +---+----+----+----+---+---+---+
                                            |  A|   B|   C|   D|  F|  G|  H|
        A    B    C    D   F   G   H        +---+----+----+----+---+---+---+
    0  A0   B0   C0   D0  B4  C4  D4        | A0|  B0|  C0|  D0| B4| C4| D4|
    1  A1   B1   C1   D1  B5  C5  D5        | A1|  B1|  C1|  D1| B5| C5| D5|
    2  A6  NaN  NaN  NaN  B6  C6  D6        | A6|null|null|null| B6| C6| D6|
    3  A7  NaN  NaN  NaN  B7  C7  D7        | A7|null|null|null| B7| C7| D7|
                                            +---+----+----+----+---+---+---+
    
  3. 内连接

    :: Python Code:

    leftp.merge(rightp,on='A',how='inner')
    #
    lefts.join(rights,on='A',how='inner')
         .orderBy('A',ascending=True).show()
    

    :: Comparison:

                                    +---+---+---+---+---+---+---+
                                    |  A|  B|  C|  D|  F|  G|  H|
        A   B   C   D   F   G   H   +---+---+---+---+---+---+---+
    0  A0  B0  C0  D0  B4  C4  D4   | A0| B0| C0| D0| B4| C4| D4|
    1  A1  B1  C1  D1  B5  C5  D5   | A1| B1| C1| D1| B5| C5| D5|
                                    +---+---+---+---+---+---+---+
    
  4. 完全连接

    :: Python Code:

    leftp.merge(rightp,on='A',how='full')
    #
    lefts.join(rights,on='A',how='full')
         .orderBy('A',ascending=True).show()
    

    :: Comparison:

                                            +---+----+----+----+----+----+----+
                                            |  A|   B|   C|   D|   F|   G|   H|
        A    B    C    D    F    G    H     +---+----+----+----+----+----+----+
    0  A0   B0   C0   D0   B4   C4   D4     | A0|  B0|  C0|  D0|  B4|  C4|  D4|
    1  A1   B1   C1   D1   B5   C5   D5     | A1|  B1|  C1|  D1|  B5|  C5|  D5|
    2  A2   B2   C2   D2  NaN  NaN  NaN     | A2|  B2|  C2|  D2|null|null|null|
    3  A3   B3   C3   D3  NaN  NaN  NaN     | A3|  B3|  C3|  D3|null|null|null|
    4  A6  NaN  NaN  NaN   B6   C6   D6     | A6|null|null|null|  B6|  C6|  D6|
    5  A7  NaN  NaN  NaN   B7   C7   D7     | A7|null|null|null|  B7|  C7|  D7|
                                            +---+----+----+----+----+----+----+
    

5.3.13. 康塔特柱

my_list = [('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9),
           ('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9)]
col_name = ['col1', 'col2', 'col3']
#
dp = pd.DataFrame(my_list,columns=col_name)
ds = spark.createDataFrame(my_list,schema=col_name)
  col1  col2  col3
0    a     2     3
1    b     5     6
2    c     8     9
3    a     2     3
4    b     5     6
5    c     8     9

:: Python Code:

dp['concat'] = dp.apply(lambda x:'%s%s'%(x['col1'],x['col2']),axis=1)
dp
#
ds.withColumn('concat',F.concat('col1','col2')).show()

:: Comparison:

                                        +----+----+----+------+
                                        |col1|col2|col3|concat|
  col1  col2  col3 concat               +----+----+----+------+
0    a     2     3     a2               |   a|   2|   3|    a2|
1    b     5     6     b5               |   b|   5|   6|    b5|
2    c     8     9     c8               |   c|   8|   9|    c8|
3    a     2     3     a2               |   a|   2|   3|    a2|
4    b     5     6     b5               |   b|   5|   6|    b5|
5    c     8     9     c8               |   c|   8|   9|    c8|
                                        +----+----+----+------+

5.3.14. GroupBy

:: Python Code:

dp.groupby(['col1']).agg({'col2':'min','col3':'mean'})
#
ds.groupBy(['col1']).agg({'col2': 'min', 'col3': 'avg'}).show()

:: Comparison:

                                        +----+---------+---------+
      col2  col3                        |col1|min(col2)|avg(col3)|
col1                                    +----+---------+---------+
a        2     3                        |   c|        8|      9.0|
b        5     6                        |   b|        5|      6.0|
c        8     9                        |   a|        2|      3.0|
                                        +----+---------+---------+

5.3.15. 枢轴

:: Python Code:

pd.pivot_table(dp, values='col3', index='col1', columns='col2', aggfunc=np.sum)
#
ds.groupBy(['col1']).pivot('col2').sum('col3').show()

:: Comparison:

                                +----+----+----+----+
col2    2     5     8           |col1|   2|   5|   8|
col1                            +----+----+----+----+
a     6.0   NaN   NaN           |   c|null|null|  18|
b     NaN  12.0   NaN           |   b|null|  12|null|
c     NaN   NaN  18.0           |   a|   6|null|null|
                                +----+----+----+----+

5.3.16. 窗口

d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)

:: Python Code:

dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False)
#
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds = ds.withColumn('rank',F.rank().over(w))

:: Comparison:

                        +---+---+---+----+
                        |  A|  B|  C|rank|
   A  B  C  rank        +---+---+---+----+
0  a  m  1   2.0        |  b|  m|  2|   1|
1  b  m  2   1.0        |  a|  m|  1|   2|
2  c  n  3   2.0        |  d|  n|  6|   1|
3  d  n  6   1.0        |  c|  n|  3|   2|
                        +---+---+---+----+

5.3.17. rank vs dense_rank

d ={'Id':[1,2,3,4,5,6],
    'Score': [4.00, 4.00, 3.85, 3.65, 3.65, 3.50]}
#
data = pd.DataFrame(d)
dp = data.copy()
ds = spark.createDataFrame(data)
   Id  Score
0   1   4.00
1   2   4.00
2   3   3.85
3   4   3.65
4   5   3.65
5   6   3.50

:: Python Code:

dp['Rank_dense'] = dp['Score'].rank(method='dense',ascending =False)
dp['Rank'] = dp['Score'].rank(method='min',ascending =False)
dp
#
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.orderBy(ds.Score.desc())
ds = ds.withColumn('Rank_spark_dense',F.dense_rank().over(w))
ds = ds.withColumn('Rank_spark',F.rank().over(w))
ds.show()

:: Comparison:

                                +---+-----+----------------+----------+
                                | Id|Score|Rank_spark_dense|Rank_spark|
   Id  Score  Rank_dense  Rank  +---+-----+----------------+----------+
0   1   4.00         1.0   1.0  |  1|  4.0|               1|         1|
1   2   4.00         1.0   1.0  |  2|  4.0|               1|         1|
2   3   3.85         2.0   3.0  |  3| 3.85|               2|         3|
3   4   3.65         3.0   4.0  |  4| 3.65|               3|         4|
4   5   3.65         3.0   4.0  |  5| 3.65|               3|         4|
5   6   3.50         4.0   6.0  |  6|  3.5|               4|         6|
                                +---+-----+----------------+----------+