扩展到大型数据集#
Pandas为内存分析提供了数据结构,这使得使用Pandas分析比内存数据集更大的数据集有些棘手。即使是占内存相当大一部分的数据集也变得笨拙,因为一些Pandas操作需要制作中间副本。
本文档提供了一些建议,帮助您将分析扩展到更大的数据集。它是对 提升性能 ,它专注于加快对适合内存的数据集的分析。
但首先,值得考虑的是 不使用Pandas 。Pandas并不是适用于所有情况的工具。如果您正在处理非常大的数据集,并且像PostgreSQL这样的工具符合您的需求,那么您可能应该使用它。假设你想要或需要Pandas的表现力和力量,让我们继续下去。
加载更少的数据#
假设磁盘上的原始数据集有许多列:
id_0 name_0 x_0 y_0 id_1 name_1 x_1 ... name_8 x_8 y_8 id_9 name_9 x_9 y_9
timestamp ...
2000-01-01 00:00:00 1015 Michael -0.399453 0.095427 994 Frank -0.176842 ... Dan -0.315310 0.713892 1025 Victor -0.135779 0.346801
2000-01-01 00:01:00 969 Patricia 0.650773 -0.874275 1003 Laura 0.459153 ... Ursula 0.913244 -0.630308 1047 Wendy -0.886285 0.035852
2000-01-01 00:02:00 1016 Victor -0.721465 -0.584710 1046 Michael 0.524994 ... Ray -0.656593 0.692568 1064 Yvonne 0.070426 0.432047
2000-01-01 00:03:00 939 Alice -0.746004 -0.908008 996 Ingrid -0.414523 ... Jerry -0.958994 0.608210 978 Wendy 0.855949 -0.648988
2000-01-01 00:04:00 1017 Dan 0.919451 -0.803504 1048 Jerry -0.569235 ... Frank -0.577022 -0.409088 994 Bob -0.270132 0.335176
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2000-12-30 23:56:00 999 Tim 0.162578 0.512817 973 Kevin -0.403352 ... Tim -0.380415 0.008097 1041 Charlie 0.191477 -0.599519
2000-12-30 23:57:00 970 Laura -0.433586 -0.600289 958 Oliver -0.966577 ... Zelda 0.971274 0.402032 1038 Ursula 0.574016 -0.930992
2000-12-30 23:58:00 1065 Edith 0.232211 -0.454540 971 Tim 0.158484 ... Alice -0.222079 -0.919274 1022 Dan 0.031345 -0.657755
2000-12-30 23:59:00 1019 Ingrid 0.322208 -0.615974 981 Hannah 0.607517 ... Sarah -0.424440 -0.117274 990 George -0.375530 0.563312
2000-12-31 00:00:00 937 Ursula -0.906523 0.943178 1018 Alice -0.564513 ... Jerry 0.236837 0.807650 985 Oliver 0.777642 0.783392
[525601 rows x 40 columns]
它可以由以下代码片段生成:
In [1]: import pandas as pd
In [2]: import numpy as np
In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
...: index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
...: n = len(index)
...: state = np.random.RandomState(seed)
...: columns = {
...: "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
...: "id": state.poisson(1000, size=n),
...: "x": state.rand(n) * 2 - 1,
...: "y": state.rand(n) * 2 - 1,
...: }
...: df = pd.DataFrame(columns, index=index, columns=sorted(columns))
...: if df.index[-1] == end:
...: df = df.iloc[:-1]
...: return df
...:
In [4]: timeseries = [
...: make_timeseries(freq="1T", seed=i).rename(columns=lambda x: f"{x}_{i}")
...: for i in range(10)
...: ]
...:
In [5]: ts_wide = pd.concat(timeseries, axis=1)
In [6]: ts_wide.to_parquet("timeseries_wide.parquet")
要加载我们需要的列,我们有两个选项。选项1加载所有数据,然后筛选出我们需要的数据。
In [7]: columns = ["id_0", "name_0", "x_0", "y_0"]
In [8]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[8]:
id_0 name_0 x_0 y_0
timestamp
2000-01-01 00:00:00 977 Alice -0.821225 0.906222
2000-01-01 00:01:00 1018 Bob -0.219182 0.350855
2000-01-01 00:02:00 927 Alice 0.660908 -0.798511
2000-01-01 00:03:00 997 Bob -0.852458 0.735260
2000-01-01 00:04:00 965 Bob 0.717283 0.393391
... ... ... ... ...
2000-12-30 23:56:00 1037 Bob -0.814321 0.612836
2000-12-30 23:57:00 980 Bob 0.232195 -0.618828
2000-12-30 23:58:00 965 Alice -0.231131 0.026310
2000-12-30 23:59:00 984 Alice 0.942819 0.853128
2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655
[525601 rows x 4 columns]
选项2仅加载我们请求的列。
In [9]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[9]:
id_0 name_0 x_0 y_0
timestamp
2000-01-01 00:00:00 977 Alice -0.821225 0.906222
2000-01-01 00:01:00 1018 Bob -0.219182 0.350855
2000-01-01 00:02:00 927 Alice 0.660908 -0.798511
2000-01-01 00:03:00 997 Bob -0.852458 0.735260
2000-01-01 00:04:00 965 Bob 0.717283 0.393391
... ... ... ... ...
2000-12-30 23:56:00 1037 Bob -0.814321 0.612836
2000-12-30 23:57:00 980 Bob 0.232195 -0.618828
2000-12-30 23:58:00 965 Alice -0.231131 0.026310
2000-12-30 23:59:00 984 Alice 0.942819 0.853128
2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655
[525601 rows x 4 columns]
如果我们要测量这两个调用的内存使用情况,我们会看到指定 columns
在这种情况下,使用大约十分之一的内存。
使用 pandas.read_csv()
,您可以指定 usecols
以限制读取到内存的列。并非所有可由Pandas读取的文件格式都提供了读取列的子集的选项。
使用高效的数据类型#
默认的PANDA数据类型不是最高效的内存。对于唯一值相对较少的文本数据列(通常称为“低基数”数据),情况尤其如此。通过使用更高效的数据类型,您可以在内存中存储更大的数据集。
In [10]: ts = make_timeseries(freq="30S", seed=0)
In [11]: ts.to_parquet("timeseries.parquet")
In [12]: ts = pd.read_parquet("timeseries.parquet")
In [13]: ts
Out[13]:
id name x y
timestamp
2000-01-01 00:00:00 1041 Alice 0.889987 0.281011
2000-01-01 00:00:30 988 Bob -0.455299 0.488153
2000-01-01 00:01:00 1018 Alice 0.096061 0.580473
2000-01-01 00:01:30 992 Bob 0.142482 0.041665
2000-01-01 00:02:00 960 Bob -0.036235 0.802159
... ... ... ... ...
2000-12-30 23:58:00 1022 Alice 0.266191 0.875579
2000-12-30 23:58:30 974 Alice -0.009826 0.413686
2000-12-30 23:59:00 1028 Charlie 0.307108 -0.656789
2000-12-30 23:59:30 1002 Alice 0.202602 0.541335
2000-12-31 00:00:00 987 Alice 0.200832 0.615972
[1051201 rows x 4 columns]
现在,让我们检查数据类型和内存使用情况,看看我们应该将注意力集中在哪里。
In [14]: ts.dtypes
Out[14]:
id int64
name object
x float64
y float64
dtype: object
In [15]: ts.memory_usage(deep=True) # memory usage in bytes
Out[15]:
Index 8409608
id 8409608
name 65176434
x 8409608
y 8409608
dtype: int64
这个 name
列占用的内存比其他任何列都多。它只有几个唯一的值,因此它是转换为 pandas.Categorical
。使用一个 pandas.Categorical
,我们将每个唯一的名称存储一次,并使用节省空间的整数来了解每行中使用的特定名称。
In [16]: ts2 = ts.copy()
In [17]: ts2["name"] = ts2["name"].astype("category")
In [18]: ts2.memory_usage(deep=True)
Out[18]:
Index 8409608
id 8409608
name 1051495
x 8409608
y 8409608
dtype: int64
我们可以更进一步,使用以下命令将数值列向下转换为最小类型 pandas.to_numeric()
。
In [19]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")
In [20]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")
In [21]: ts2.dtypes
Out[21]:
id uint16
name category
x float32
y float32
dtype: object
In [22]: ts2.memory_usage(deep=True)
Out[22]:
Index 8409608
id 2102402
name 1051495
x 4204804
y 4204804
dtype: int64
In [23]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()
In [24]: print(f"{reduction:0.2f}")
0.20
总而言之,我们已经将该数据集的内存占用减少到其原始大小的1/5。
看见 分类数据 了解更多信息 pandas.Categorical
和 数据类型 获取所有大Pandasdtype的概述。
使用分块#
有些工作负载可以通过分块来实现:将一个大问题(如“将CSV目录转换为Parquet”)分解为一系列小问题(“将这个单独的CSV文件转换为一个Parquet文件。现在对此目录中的每个文件重复此操作。”)。只要每个块都可以放入内存,您就可以处理比内存大得多的数据集。
备注
当您正在执行的操作需要块之间的零协调或最小协调时,分块工作得很好。对于更复杂的工作流,您最好 using another library 。
假设我们在磁盘上有一个更大的“逻辑数据集”,它是拼图文件的目录。目录中的每个文件代表整个数据集的不同年份。
In [25]: import pathlib
In [26]: N = 12
In [27]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [28]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [29]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)
In [30]: for i, (start, end) in enumerate(zip(starts, ends)):
....: ts = make_timeseries(start=start, end=end, freq="1T", seed=i)
....: ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
....:
data
└── timeseries
├── ts-00.parquet
├── ts-01.parquet
├── ts-02.parquet
├── ts-03.parquet
├── ts-04.parquet
├── ts-05.parquet
├── ts-06.parquet
├── ts-07.parquet
├── ts-08.parquet
├── ts-09.parquet
├── ts-10.parquet
└── ts-11.parquet
现在,我们将实现一个内核外 pandas.Series.value_counts()
。此工作流的峰值内存使用量是单个最大的区块,外加存储到目前为止的唯一值的小系列。只要每个单独的文件都能存储在内存中,这将适用于任意大小的数据集。
In [31]: %%time
....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
....: counts = pd.Series(dtype=int)
....: for path in files:
....: df = pd.read_parquet(path)
....: counts = counts.add(df["name"].value_counts(), fill_value=0)
....: counts.astype(int)
....:
CPU times: user 581 ms, sys: 70.6 ms, total: 652 ms
Wall time: 431 ms
Out[31]:
Alice 1994645
Bob 1993692
Charlie 1994875
dtype: int64
一些读者,比如 pandas.read_csv()
,提供参数以控制 chunksize
当读取单个文件时。
对于不需要太复杂操作的工作流来说,手动分块是一个不错的选择。一些操作,如 pandas.DataFrame.groupby()
,都很难按大块计算。在这些情况下,您可能更好地切换到为您实现这些核外算法的不同库。
使用其他库#
Pandas只是提供DataFrame API的一个库。由于其受欢迎,Pandas的API已经成为其他库实现的某种标准。Pandas文档维护实现DataFrame API的库列表 our ecosystem page 。
例如, Dask ,一个并行计算库,有 dask.dataframe ,这是一种类似Pandas的API,用于并行处理大于内存的数据集。DASK可以在一台机器上使用多个线程或进程,也可以在一群机器上并行处理数据。
我们将进口 dask.dataframe
请注意,API的感觉类似于Pandas。我们可以用达斯克的 read_parquet
函数,但提供要读入的文件的全局字符串。
In [32]: import dask.dataframe as dd
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Input In [32], in <cell line: 1>()
----> 1 import dask.dataframe as dd
ModuleNotFoundError: No module named 'dask'
In [33]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [33], in <cell line: 1>()
----> 1 ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
NameError: name 'dd' is not defined
In [34]: ddf
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [34], in <cell line: 1>()
----> 1 ddf
NameError: name 'ddf' is not defined
正在检查 ddf
对象,我们看到了一些东西
有一些熟悉的属性,如
.columns
和.dtypes
有一些熟悉的方法,比如
.groupby
,.sum
等。有新的属性,如
.npartitions
和.divisions
分区和划分是DASK并行化计算的方式。一个 Dask DataFrame由许多Pandas组成 pandas.DataFrame
。对DaskDataFrame的单个方法调用最终会进行许多Pandas方法调用,而Dask知道如何协调所有内容以获得结果。
In [35]: ddf.columns
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [35], in <cell line: 1>()
----> 1 ddf.columns
NameError: name 'ddf' is not defined
In [36]: ddf.dtypes
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [36], in <cell line: 1>()
----> 1 ddf.dtypes
NameError: name 'ddf' is not defined
In [37]: ddf.npartitions
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [37], in <cell line: 1>()
----> 1 ddf.npartitions
NameError: name 'ddf' is not defined
一个主要的区别是: dask.dataframe
接口为 lazy 。如果您查看上面的epr,您会注意到实际上并没有打印值;只是列名和dtype。这是因为达斯克还没有真正读过数据。执行操作不是立即执行,而是构建一个 任务图 。
In [38]: ddf
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [38], in <cell line: 1>()
----> 1 ddf
NameError: name 'ddf' is not defined
In [39]: ddf["name"]
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [39], in <cell line: 1>()
----> 1 ddf["name"]
NameError: name 'ddf' is not defined
In [40]: ddf["name"].value_counts()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [40], in <cell line: 1>()
----> 1 ddf["name"].value_counts()
NameError: name 'ddf' is not defined
这些调用中的每个都是即时的,因为结果还没有计算出来。我们只是在建立一个计算清单,当有人需要结果时要做的事情。DASK知道一个 pandas.Series.value_counts
是一只Pandas吗 pandas.Series
具有特定的数据类型和特定的名称。因此,DASK版本返回具有相同数据类型和相同名称的DASK系列。
要获得实际结果,您可以调用 .compute()
。
In [41]: %time ddf["name"].value_counts().compute()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed eval>:1, in <module>
NameError: name 'ddf' is not defined
在这一点上,你会得到和Pandas一样的东西,在这种情况下,你会得到一只混凝土Pandas pandas.Series
每一次的计数 name
。
呼叫 .compute
导致执行完整的任务图。这包括读取数据、选择列和执行 value_counts
。行刑结束了 并行的 在可能的情况下,DASK会尽量保持较小的总内存占用量。您可以使用比内存大得多的数据集,只要每个分区(一个普通的Pandas pandas.DataFrame
)符合记忆。
默认情况下, dask.dataframe
操作使用线程池并行执行操作。我们还可以连接到一个集群,将工作分配到多台机器上。在本例中,我们将连接到由这台机器上的几个进程组成的本地“集群”。
>>> from dask.distributed import Client, LocalCluster
>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>
一旦这一次 client
创建后,所有Dask值的计算都将在集群上进行(在本例中就是进程)。
DASK实现了PandasAPI中最常用的部分。例如,我们可以进行熟悉的Groupby聚合。
In [42]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed eval>:1, in <module>
NameError: name 'ddf' is not defined
分组和聚合是在核心外并行完成的。
当达斯克知道 divisions
对于数据集,某些优化是可能的。当读取DASK编写的镶木地板数据集时,将自动知道分区。在本例中,由于我们手动创建镶木地板文件,因此需要手动提供分区。
In [43]: N = 12
In [44]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [45]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [46]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)
In [47]: ddf.divisions = divisions
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [47], in <cell line: 1>()
----> 1 ddf.divisions = divisions
NameError: name 'ddf' is not defined
In [48]: ddf
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [48], in <cell line: 1>()
----> 1 ddf
NameError: name 'ddf' is not defined
现在我们可以做一些事情,如快速随机访问 .loc
。
In [49]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [49], in <cell line: 1>()
----> 1 ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
NameError: name 'ddf' is not defined
达斯克知道只需查看第三个分区就可以选择2002年的值。它不需要查看任何其他数据。
许多工作流涉及大量数据,并以一种将大小缩小到内存可以容纳的大小的方式对其进行处理。在这种情况下,我们将重新采样到每日频率并取平均值。一旦我们取了平均值,我们知道结果将适合在内存中,所以我们可以安全地调用 compute
而不会耗尽内存。在这一点上,它只是一个普通的Pandas物体。
In [50]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Input In [50], in <cell line: 1>()
----> 1 ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
NameError: name 'ddf' is not defined

这些DASK示例都是在一台机器上使用多个进程完成的。达斯克可以是 deployed on a cluster 以扩大到更大的数据集。
您可以在https://examples.dask.org.上看到更多DASK示例