大容量写入操作#

本教程介绍如何利用PyMongo的大容量写操作特性。成批执行写操作减少了网络往返的次数,增加了写吞吐量。

大容量插入#

在 2.6 版本加入.

通过将列表传递给 insert_many() 方法。PyMongo将根据MongoDB所接受的最大消息大小自动将批处理拆分为更小的子批,支持非常大的大容量插入操作。

>>> import pymongo
>>> db = pymongo.MongoClient().bulk_example
>>> db.test.insert_many([{"i": i} for i in range(10000)]).inserted_ids
[...]
>>> db.test.count_documents({})
10000

混合大容量写入操作#

在 2.7 版本加入.

PyMongo还支持执行混合的大容量写操作。可以使用大容量写入操作API一起执行一批插入、更新和删除操作。

有序的大容量写入操作#

有序的大容量写入操作是批处理的,并按照为串行执行提供的顺序发送到服务器。返回值是 BulkWriteResult 描述执行的操作的类型和计数。

>>> from pprint import pprint
>>> from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
>>> result = db.test.bulk_write(
...     [
...         DeleteMany({}),  # Remove all documents from the previous example.
...         InsertOne({"_id": 1}),
...         InsertOne({"_id": 2}),
...         InsertOne({"_id": 3}),
...         UpdateOne({"_id": 1}, {"$set": {"foo": "bar"}}),
...         UpdateOne({"_id": 4}, {"$inc": {"j": 1}}, upsert=True),
...         ReplaceOne({"j": 1}, {"j": 2}),
...     ]
... )
>>> pprint(result.bulk_api_result)
{'nInserted': 3,
 'nMatched': 2,
 'nModified': 2,
 'nRemoved': 10000,
 'nUpserted': 1,
 'upserted': [{'_id': 4, 'index': 5}],
 'writeConcernErrors': [],
 'writeErrors': []}

发生的第一个写入失败(例如,重复密钥错误)将中止其余操作,并引发PyMongo BulkWriteError 。这个 details 属性提供直到故障发生为止的执行结果以及有关故障的详细信息--包括导致故障的操作。

>>> from pymongo import InsertOne, DeleteOne, ReplaceOne
>>> from pymongo.errors import BulkWriteError
>>> requests = [
...     ReplaceOne({"j": 2}, {"i": 5}),
...     InsertOne({"_id": 4}),  # Violates the unique key constraint on _id.
...     DeleteOne({"i": 5}),
... ]
>>> try:
...     db.test.bulk_write(requests)
... except BulkWriteError as bwe:
...     pprint(bwe.details)
...
{'nInserted': 0,
 'nMatched': 1,
 'nModified': 1,
 'nRemoved': 0,
 'nUpserted': 0,
 'upserted': [],
 'writeConcernErrors': [],
 'writeErrors': [{'code': 11000,
                  'errmsg': '...E11000...duplicate key error...',
                  'index': 1,...
                  'op': {'_id': 4}}]}

无序大容量写入操作#

无序的大容量写入操作被批处理并发送到中的服务器 任意顺序 它们可以并行执行。尝试执行所有操作后,将报告发生的任何错误。

在下一个例子中,第一个和第三个操作由于对_id的唯一约束而失败。

>>> requests = [
...     InsertOne({"_id": 1}),
...     DeleteOne({"_id": 2}),
...     InsertOne({"_id": 3}),
...     ReplaceOne({"_id": 4}, {"i": 1}),
... ]
>>> try:
...     db.test.bulk_write(requests, ordered=False)
... except BulkWriteError as bwe:
...     pprint(bwe.details)
...
{'nInserted': 0,
 'nMatched': 1,
 'nModified': 1,
 'nRemoved': 1,
 'nUpserted': 0,
 'upserted': [],
 'writeConcernErrors': [],
 'writeErrors': [{'code': 11000,
                  'errmsg': '...E11000...duplicate key error...',
                  'index': 0,...
                  'op': {'_id': 1}},
                 {'code': 11000,
                  'errmsg': '...',
                  'index': 2,...
                  'op': {'_id': 3}}]}

写关注点#

批量操作使用 write_concern 他们被处决的藏品。无论执行顺序如何,在尝试所有操作后,都将报告写入问题错误(例如wtimeout)。

::
>>> from pymongo import WriteConcern
>>> coll = db.get_collection(
...     'test', write_concern=WriteConcern(w=3, wtimeout=1))
>>> try:
...     coll.bulk_write([InsertOne({'a': i}) for i in range(4)])
... except BulkWriteError as bwe:
...     pprint(bwe.details)
...
{'nInserted': 4,
 'nMatched': 0,
 'nModified': 0,
 'nRemoved': 0,
 'nUpserted': 0,
 'upserted': [],
 'writeConcernErrors': [{'code': 64...
                         'errInfo': {'wtimeout': True},
                         'errmsg': 'waiting for replication timed out'}],
 'writeErrors': []}