大容量写入操作#
本教程介绍如何利用PyMongo的大容量写操作特性。成批执行写操作减少了网络往返的次数,增加了写吞吐量。
大容量插入#
在 2.6 版本加入.
通过将列表传递给 insert_many()
方法。PyMongo将根据MongoDB所接受的最大消息大小自动将批处理拆分为更小的子批,支持非常大的大容量插入操作。
>>> import pymongo
>>> db = pymongo.MongoClient().bulk_example
>>> db.test.insert_many([{"i": i} for i in range(10000)]).inserted_ids
[...]
>>> db.test.count_documents({})
10000
混合大容量写入操作#
在 2.7 版本加入.
PyMongo还支持执行混合的大容量写操作。可以使用大容量写入操作API一起执行一批插入、更新和删除操作。
有序的大容量写入操作#
有序的大容量写入操作是批处理的,并按照为串行执行提供的顺序发送到服务器。返回值是 BulkWriteResult
描述执行的操作的类型和计数。
>>> from pprint import pprint
>>> from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
>>> result = db.test.bulk_write(
... [
... DeleteMany({}), # Remove all documents from the previous example.
... InsertOne({"_id": 1}),
... InsertOne({"_id": 2}),
... InsertOne({"_id": 3}),
... UpdateOne({"_id": 1}, {"$set": {"foo": "bar"}}),
... UpdateOne({"_id": 4}, {"$inc": {"j": 1}}, upsert=True),
... ReplaceOne({"j": 1}, {"j": 2}),
... ]
... )
>>> pprint(result.bulk_api_result)
{'nInserted': 3,
'nMatched': 2,
'nModified': 2,
'nRemoved': 10000,
'nUpserted': 1,
'upserted': [{'_id': 4, 'index': 5}],
'writeConcernErrors': [],
'writeErrors': []}
发生的第一个写入失败(例如,重复密钥错误)将中止其余操作,并引发PyMongo BulkWriteError
。这个 details
属性提供直到故障发生为止的执行结果以及有关故障的详细信息--包括导致故障的操作。
>>> from pymongo import InsertOne, DeleteOne, ReplaceOne
>>> from pymongo.errors import BulkWriteError
>>> requests = [
... ReplaceOne({"j": 2}, {"i": 5}),
... InsertOne({"_id": 4}), # Violates the unique key constraint on _id.
... DeleteOne({"i": 5}),
... ]
>>> try:
... db.test.bulk_write(requests)
... except BulkWriteError as bwe:
... pprint(bwe.details)
...
{'nInserted': 0,
'nMatched': 1,
'nModified': 1,
'nRemoved': 0,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [{'code': 11000,
'errmsg': '...E11000...duplicate key error...',
'index': 1,...
'op': {'_id': 4}}]}
无序大容量写入操作#
无序的大容量写入操作被批处理并发送到中的服务器 任意顺序 它们可以并行执行。尝试执行所有操作后,将报告发生的任何错误。
在下一个例子中,第一个和第三个操作由于对_id的唯一约束而失败。
>>> requests = [
... InsertOne({"_id": 1}),
... DeleteOne({"_id": 2}),
... InsertOne({"_id": 3}),
... ReplaceOne({"_id": 4}, {"i": 1}),
... ]
>>> try:
... db.test.bulk_write(requests, ordered=False)
... except BulkWriteError as bwe:
... pprint(bwe.details)
...
{'nInserted': 0,
'nMatched': 1,
'nModified': 1,
'nRemoved': 1,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [{'code': 11000,
'errmsg': '...E11000...duplicate key error...',
'index': 0,...
'op': {'_id': 1}},
{'code': 11000,
'errmsg': '...',
'index': 2,...
'op': {'_id': 3}}]}
写关注点#
批量操作使用 write_concern
他们被处决的藏品。无论执行顺序如何,在尝试所有操作后,都将报告写入问题错误(例如wtimeout)。
- ::
>>> from pymongo import WriteConcern >>> coll = db.get_collection( ... 'test', write_concern=WriteConcern(w=3, wtimeout=1)) >>> try: ... coll.bulk_write([InsertOne({'a': i}) for i in range(4)]) ... except BulkWriteError as bwe: ... pprint(bwe.details) ... {'nInserted': 4, 'nMatched': 0, 'nModified': 0, 'nRemoved': 0, 'nUpserted': 0, 'upserted': [], 'writeConcernErrors': [{'code': 64... 'errInfo': {'wtimeout': True}, 'errmsg': 'waiting for replication timed out'}], 'writeErrors': []}