sqlite扩展

默认值 SqliteDatabase 已经包含许多特定于sqlite的功能:

这个 playhouse.sqlite_ext 包括更多的sqlite功能,包括:

入门

要开始使用本文档中描述的功能,您需要使用 SqliteExtDatabase 类从 playhouse.sqlite_ext 模块。此外,某些功能需要 playhouse._sqlite_ext C扩展——这些特性将在文档中注明。

实例化A SqliteExtDatabase

from playhouse.sqlite_ext import SqliteExtDatabase

db = SqliteExtDatabase('my_app.db', pragmas=(
    ('cache_size', -1024 * 64),  # 64MB page-cache.
    ('journal_mode', 'wal'),  # Use WAL-mode (you should always use this!).
    ('foreign_keys', 1))  # Enforce foreign-key constraints.

APIs

class SqliteExtDatabase(database[, pragmas=None[, timeout=5[, c_extensions=None[, rank_functions=True[, hash_functions=False[, regexp_function=False[, bloomfilter=False]]]]]]])
参数:
  • pragmas (list) -- 包含每次打开连接时要设置的pragma key和value的2元组列表。
  • timeout -- 在sqlite驱动程序上设置忙超时(秒)。
  • c_extensions (bool) -- 声明必须/不得使用C扩展加速。如果设置为 True 扩展模块不可用,将引发 ImproperlyConfigured 例外。
  • rank_functions (bool) -- 使搜索结果排名功能可用。
  • hash_functions (bool) -- 使哈希函数可用(MD5、SHA1等)。
  • regexp_function (bool) -- 使regexp函数可用。
  • bloomfilter (bool) -- 使 bloom filter 可用。

延伸 SqliteDatabase 并继承用于声明用户定义函数、pragma等的方法。

class CSqliteExtDatabase(database[, pragmas=None[, timeout=5[, c_extensions=None[, rank_functions=True[, hash_functions=False[, regexp_function=False[, bloomfilter=False[, replace_busy_handler=False]]]]]]]])
参数:
  • pragmas (list) -- 包含每次打开连接时要设置的pragma key和value的2元组列表。
  • timeout -- 在sqlite驱动程序上设置忙超时(秒)。
  • c_extensions (bool) -- 声明必须/不得使用C扩展加速。如果设置为 True 扩展模块不可用,将引发 ImproperlyConfigured 例外。
  • rank_functions (bool) -- 使搜索结果排名功能可用。
  • hash_functions (bool) -- 使哈希函数可用(MD5、SHA1等)。
  • regexp_function (bool) -- 使regexp函数可用。
  • bloomfilter (bool) -- 使 bloom filter 可用。
  • replace_busy_handler (bool) -- 使用更智能的繁忙处理程序实现。

延伸 SqliteExtDatabase 并要求 playhouse._sqlite_ext 扩展模块可用。

on_commit(fn)

注册在当前连接上提交事务时要执行的回调。回调不接受任何参数,返回值被忽略。

但是,如果回调引发 ValueError ,事务将中止并回滚。

例子:

db = CSqliteExtDatabase(':memory:')

@db.on_commit
def on_commit():
    logger.info('COMMITing changes')
on_rollback(fn)

注册在当前连接上回滚事务时要执行的回调。回调不接受任何参数,返回值被忽略。

例子:

@db.on_rollback
def on_rollback():
    logger.info('Rolling back changes')
on_update(fn)

注册一个回调,以便在数据库写入时执行(通过 UPDATE, INSERTDELETE 查询)。回调应接受以下参数:

  • query -查询的类型,或者 INSERT, UPDATE 或 DELETE.
  • 数据库名称-默认数据库命名为 main.
  • table name-要修改的表的名称。
  • rowid—要修改的行的rowid。

回调的返回值被忽略。

例子:

db = CSqliteExtDatabase(':memory:')

@db.on_update
def on_update(query_type, db, table, rowid):
    # e.g. INSERT row 3 into table users.
    logger.info('%s row %s into table %s', query_type, rowid, table)
changes()

返回当前打开的事务中修改的行数。

autocommit

返回一个布尔值,指示是否启用自动提交。默认情况下,此值将为 True 除非在交易中(或 atomic() 块)。

例子:

>>> db = CSqliteExtDatabase(':memory:')
>>> db.autocommit
True
>>> with db.atomic():
...     print(db.autocommit)
...
False
>>> db.autocommit
True
backup(destination[, pages=None, name=None, progress=None])
参数:
  • destination (SqliteDatabase) -- 用作备份目标的数据库对象。
  • pages (int) -- 每次迭代的页数。默认值-1表示所有页面都应在一个步骤中备份。
  • name (str) -- 源数据库的名称(如果使用附加数据库加载多个数据库,则可能不同)。默认为“主”。
  • progress -- 使用三个参数调用进度回调:剩余页数、总页数以及备份是否完成。

例子:

master = CSqliteExtDatabase('master.db')
replica = CSqliteExtDatabase('replica.db')

# Backup the contents of master to replica.
master.backup(replica)
backup_to_file(filename[, pages, name, progress])
参数:
  • filename -- 存储数据库备份的文件名。
  • pages (int) -- 每次迭代的页数。默认值-1表示所有页面都应在一个步骤中备份。
  • name (str) -- 源数据库的名称(如果使用附加数据库加载多个数据库,则可能不同)。默认为“主”。
  • progress -- 使用三个参数调用进度回调:剩余页数、总页数以及备份是否完成。

将当前数据库备份到文件。备份的数据不是数据库转储,而是实际的sqlite数据库文件。

例子:

db = CSqliteExtDatabase('app.db')

def nightly_backup():
    filename = 'backup-%s.db' % (datetime.date.today())
    db.backup_to_file(filename)
blob_open(table, column, rowid[, read_only=False])
参数:
  • table (str) -- 包含数据的表的名称。
  • column (str) -- 包含数据的列的名称。
  • rowid (int) -- 要检索的行的ID。
  • read_only (bool) -- 打开blob仅供阅读。
返回:

Blob 提供对底层二进制数据的有效访问的实例。

返回类型:

Blob

BlobZeroBlob 更多信息。

例子:

class Image(Model):
    filename = TextField()
    data = BlobField()

buf_size = 1024 * 1024 * 8  # Allocate 8MB for storing file.
rowid = Image.insert({Image.filename: 'thefile.jpg',
                      Image.data: ZeroBlob(buf_size)}).execute()

# Open the blob, returning a file-like object.
blob = db.blob_open('image', 'data', rowid)

# Write some data to the blob.
blob.write(image_data)
img_size = blob.tell()

# Read the data back out of the blob.
blob.seek(0)
image_data = blob.read(img_size)
class RowIDField

对应于sqlite的主键字段 rowid 字段。有关详细信息,请参阅上的sqlite文档 rowid tables

例子:

class Note(Model):
    rowid = RowIDField()  # Will be primary key.
    content = TextField()
    timestamp = TimestampField()
class DocIDField

的子类 RowIDField 用于专门使用 docid 对于主键。据我所知,这只适用于使用fts3和fts4全文搜索扩展的表。

注意

在fts3和fts4中,“docid”只是“rowid”的别名。为了减少混乱,最好总是使用 RowIDField 从不使用 DocIDField .

class NoteIndex(FTSModel):
    docid = DocIDField()  # "docid" is used as an alias for "rowid".
    content = SearchField()

    class Meta:
        database = db
class AutoIncrementField

默认情况下,在删除行后,SQLite可以重用主键值。以确保主键是 always 单调递增,不考虑删除,应使用 AutoIncrementField . 此功能的性能成本很低。有关详细信息,请参阅上的sqlite文档 autoincrement .

class JSONField(json_dumps=None, json_loads=None, ...)

适用于存储JSON数据的字段类,具有设计用于 json1 extension .

添加了sqlite 3.9.0 JSON support in the form of an extension library. The SQLite json1 extension provides a number of helper functions for working with JSON data. These APIs are exposed as methods of a special field-type, JSONField.

要访问或修改JSON结构中的特定对象键或数组索引,可以处理 JSONField 就像是一本字典/一张清单。

参数:
  • json_dumps -- (可选)用于将数据序列化为JSON字符串的函数。如果没有提供,将使用stdlib json.dumps .
  • json_loads -- (可选)用于将JSON反序列化为Python对象的函数。如果没有提供,将使用stdlib json.loads .

注解

要自定义JSON序列化或反序列化,可以指定自定义 json_dumpsjson_loads 召唤物。这些函数应该分别接受一个参数:要序列化的对象和JSON字符串。要修改stdlib json函数的参数,可以使用 functools.partial

# Do not escape unicode code-points.
my_json_dumps = functools.partial(json.dumps, ensure_ascii=False)

class SomeModel(Model):
    # Specify our custom serialization function.
    json_data = JSONField(json_dumps=my_json_dumps)

让我们来看一些使用sqlite json1扩展和peewee的例子。在这里,我们将准备一个数据库和一个简单的模型来测试 json1 extension

>>> from playhouse.sqlite_ext import *
>>> db = SqliteExtDatabase(':memory:')
>>> class KV(Model):
...     key = TextField()
...     value = JSONField()
...     class Meta:
...         database = db
...

>>> KV.create_table()

存储数据可以按预期工作。无需将字典或列表序列化为json,因为这是由peewee自动完成的:

>>> KV.create(key='a', value={'k1': 'v1'})
<KV: 1>
>>> KV.get(KV.key == 'a').value
{'k1': 'v1'}

我们可以使用字典查找访问JSON数据的特定部分:

>>> KV.get(KV.value['k1'] == 'v1').key
'a'

可以使用 update() 方法。注意“k1=v1”保留:

>>> KV.update(value=KV.value.update({'k2': 'v2', 'k3': 'v3'})).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}

我们还可以自动更新现有数据,或者通过将键的值设置为来删除键。 None . 在下面的示例中,我们将更新“k1”的值并删除“k3”(“k2”不会被修改):

>>> KV.update(value=KV.value.update({'k1': 'v1-x', 'k3': None})).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1-x', 'k2': 'v2'}

我们还可以使用 set() 方法:

>>> KV.update(value=KV.value['k1'].set('v1')).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1', 'k2': 'v2'}

这个 set() 除了标量值之外,方法还可以用于对象:

>>> KV.update(value=KV.value['k2'].set({'x2': 'y2'})).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1', 'k2': {'x2': 'y2'}}

也可以使用原子方式删除JSON数据的各个部分。 remove()

>>> KV.update(value=KV.value['k2'].remove()).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1'}

我们还可以使用 json_type() 方法:

>>> KV.select(KV.value.json_type(), KV.value['k1'].json_type()).tuples()[:]
[('object', 'text')]

让我们添加一个嵌套值,然后看看如何使用 tree() 方法:

>>> KV.create(key='b', value={'x1': {'y1': 'z1', 'y2': 'z2'}, 'x2': [1, 2]})
<KV: 2>
>>> tree = KV.value.tree().alias('tree')
>>> query = KV.select(KV.key, tree.c.fullkey, tree.c.value).from_(KV, tree)
>>> query.tuples()[:]
[('a', '$', {'k1': 'v1'}),
 ('a', '$.k1', 'v1'),
 ('b', '$', {'x1': {'y1': 'z1', 'y2': 'z2'}, 'x2': [1, 2]}),
 ('b', '$.x2', [1, 2]),
 ('b', '$.x2[0]', 1),
 ('b', '$.x2[1]', 2),
 ('b', '$.x1', {'y1': 'z1', 'y2': 'z2'}),
 ('b', '$.x1.y1', 'z1'),
 ('b', '$.x1.y2', 'z2')]

这个 tree()children() 方法是强大的。有关如何使用它们的详细信息,请参阅 json1 extension documentation .

还要注意的是 JSONField 查找可以链接:

>>> query = KV.select().where(KV.value['x1']['y1'] == 'z1')
>>> for obj in query:
...     print(obj.key, obj.value)
...

'b', {'x1': {'y1': 'z1', 'y2': 'z2'}, 'x2': [1, 2]}

有关详细信息,请参阅 sqlite json1 documentation .

__getitem__(item)
参数:item -- 访问JSON数据中的特定键或数组索引。
返回:公开对JSON数据访问的特殊对象。
返回类型:JSONPath

访问JSON数据中的特定键或数组索引。返回A JSONPath 对象,它公开了读取或修改JSON对象特定部分的方便方法。

例子:

# If metadata contains {"tags": ["list", "of", "tags"]}, we can
# extract the first tag in this way:
Post.select(Post, Post.metadata['tags'][0].alias('first_tag'))

有关更多示例,请参见 JSONPath API文档。

set(value[, as_json=None])
参数:
  • value -- 标量值、列表或字典。
  • as_json (bool) -- 强制将该值视为JSON,在这种情况下,它将在python中预先序列化为JSON。默认情况下,列表和字典被视为要序列化的JSON,而字符串和整数则按原样传递。

设置存储在 JSONField .

使用 json_set() 来自JSON1扩展的函数。

update(data)
参数:data -- 要与当前存储在 JSONField . 要删除特定的密钥,请将该密钥设置为 None 在更新的数据中。

使用rfc-7396 merge patch算法将新数据合并到json值中以应用补丁( data 参数)。MergePatch可以添加、修改或删除JSON对象的元素,这意味着 update() 是两者的通用替换 set()remove() . MergePatch将JSON数组对象视为原子对象,因此 update() 不能追加到数组,也不能修改数组的单个元素。

有关更多信息和示例,请参见sqlite json_patch() 功能文档。

remove()

删除存储在 JSONField .

使用 json_remove 来自JSON1扩展的函数。

json_type()

返回一个标识列中存储的值类型的字符串。

返回的类型将是以下类型之一:

  • 对象
  • 数组
  • 整数
  • 真实的
  • 文本
  • 空<--字符串“空”表示实际的空值
  • 空<--实际空值表示找不到路径

使用 json_type 来自JSON1扩展的函数。

length()

返回存储在列中的数组的长度。

使用 json_array_length 来自JSON1扩展的函数。

children()

这个 children 函数对应于 json_each ,一个表值函数,用于遍历所提供的JSON值并返回顶级数组或对象的直接子级。如果指定了路径,则该路径将被视为最上面的元素。

调用返回的行 children() 具有以下属性:

  • key :当前元素相对于其父元素的键。
  • value :当前元素的值。
  • type :数据类型之一(请参见 json_type()
  • atom :基元类型的标量值, NULL 用于数组和对象。
  • id :引用树中当前节点的唯一ID。
  • parent :包含节点的ID。
  • fullkey :描述当前元素的完整路径。
  • path :当前行的容器路径。

在内部,此方法使用 json_each (文档链接)json1扩展中的函数。

示例用法(与 tree() 方法:

class KeyData(Model):
    key = TextField()
    data = JSONField()

KeyData.create(key='a', data={'k1': 'v1', 'x1': {'y1': 'z1'}})
KeyData.create(key='b', data={'x1': {'y1': 'z1', 'y2': 'z2'}})

# We will query the KeyData model for the key and all the
# top-level keys and values in it's data field.
kd = KeyData.data.children().alias('children')
query = (KeyData
         .select(kd.c.key, kd.c.value, kd.c.fullkey)
         .from_(KeyData, kd)
         .order_by(kd.c.key)
         .tuples())
print(query[:])

# PRINTS:
[('a', 'k1', 'v1',                    '$.k1'),
 ('a', 'x1', '{"y1":"z1"}',           '$.x1'),
 ('b', 'x1', '{"y1":"z1","y2":"z2"}', '$.x1')]
tree()

这个 tree 函数对应于 json_tree ,一个表值函数,它递归地遍历所提供的JSON值,并返回有关每个级别的键的信息。如果指定了路径,则该路径将被视为最上面的元素。

调用返回的行 tree() 具有与调用返回的行相同的属性 children()

  • key :当前元素相对于其父元素的键。
  • value :当前元素的值。
  • type :数据类型之一(请参见 json_type()
  • atom :基元类型的标量值, NULL 用于数组和对象。
  • id :引用树中当前节点的唯一ID。
  • parent :包含节点的ID。
  • fullkey :描述当前元素的完整路径。
  • path :当前行的容器路径。

在内部,此方法使用 json_tree (文档链接)json1扩展中的函数。

示例用法:

class KeyData(Model):
    key = TextField()
    data = JSONField()

KeyData.create(key='a', data={'k1': 'v1', 'x1': {'y1': 'z1'}})
KeyData.create(key='b', data={'x1': {'y1': 'z1', 'y2': 'z2'}})

# We will query the KeyData model for the key and all the
# keys and values in it's data field, recursively.
kd = KeyData.data.tree().alias('tree')
query = (KeyData
         .select(kd.c.key, kd.c.value, kd.c.fullkey)
         .from_(KeyData, kd)
         .order_by(kd.c.key)
         .tuples())
print(query[:])

# PRINTS:
[('a',  None,  '{"k1":"v1","x1":{"y1":"z1"}}', '$'),
 ('b',  None,  '{"x1":{"y1":"z1","y2":"z2"}}', '$'),
 ('a',  'k1',  'v1',                           '$.k1'),
 ('a',  'x1',  '{"y1":"z1"}',                  '$.x1'),
 ('b',  'x1',  '{"y1":"z1","y2":"z2"}',        '$.x1'),
 ('a',  'y1',  'z1',                           '$.x1.y1'),
 ('b',  'y1',  'z1',                           '$.x1.y1'),
 ('b',  'y2',  'z2',                           '$.x1.y2')]
class JSONPath(field[, path=None])
参数:
  • field (JSONField) -- 我们要访问的字段对象。
  • path (tuple) -- 组成JSON路径的组件。

一种表示JSON路径的方便的python方法,用于 JSONField .

这个 JSONPath 对象实现 __getitem__ ,正在累积路径组件,可以将其转换为相应的JSON路径表达式。

__getitem__(item)
参数:item -- 访问子键或数组索引。
返回:JSONPath 表示新路径。

访问JSON数据中的子键或数组索引。返回A JSONPath 对象,它公开了读取或修改JSON对象特定部分的方便方法。

例子:

# If metadata contains {"tags": ["list", "of", "tags"]}, we can
# extract the first tag in this way:
first_tag = Post.metadata['tags'][0]
query = (Post
         .select(Post, first_tag.alias('first_tag'))
         .order_by(first_tag))
set(value[, as_json=None])
参数:
  • value -- 标量值、列表或字典。
  • as_json (bool) -- 强制将该值视为JSON,在这种情况下,它将在python中预先序列化为JSON。默认情况下,列表和字典被视为要序列化的JSON,而字符串和整数则按原样传递。

在JSON数据中的给定位置设置该值。

使用 json_set() 来自JSON1扩展的函数。

update(data)
参数:data -- 要与JSON数据中给定位置的数据合并的标量值、列表或字典。要删除特定的密钥,请将该密钥设置为 None 在更新的数据中。

使用rfc-7396 merge patch算法将新数据合并到json值中以应用补丁( data 参数)。MergePatch可以添加、修改或删除JSON对象的元素,这意味着 update() 是两者的通用替换 set()remove() . MergePatch将JSON数组对象视为原子对象,因此 update() 不能追加到数组,也不能修改数组的单个元素。

有关更多信息和示例,请参见sqlite json_patch() 功能文档。

remove()

删除JSON数据中给定位置存储的数据。

使用 json_type 来自JSON1扩展的函数。

json_type()

返回一个字符串,标识存储在JSON数据中给定位置的值的类型。

返回的类型将是以下类型之一:

  • 对象
  • 数组
  • 整数
  • 真实的
  • 文本
  • 空<--字符串“空”表示实际的空值
  • 空<--实际空值表示找不到路径

使用 json_type 来自JSON1扩展的函数。

length()

返回存储在JSON数据中给定位置的数组的长度。

使用 json_array_length 来自JSON1扩展的函数。

children()

表值函数,在给定位置公开JSON对象的直接后代。也见 JSONField.children() .

tree()

表值函数,以递归方式公开给定位置上JSON对象的所有子代。另请参见 JSONField.tree() .

class SearchField([unindexed=False[, column_name=None]])

字段类,用于表示全文搜索虚拟表的模型上的列。全文搜索扩展名禁止对列指定任何类型或约束。这种行为是由 SearchField ,如果尝试任何与全文搜索扩展不兼容的配置,则会引发异常。

文档搜索索引的示例模型(时间戳存储在表中,但其数据不可搜索):

class DocumentIndex(FTSModel):
    title = SearchField()
    content = SearchField()
    tags = SearchField()
    timestamp = SearchField(unindexed=True)
match(term)
参数:term (str) -- 全文搜索查询/术语
返回:Expression 对应于 MATCH 操作员。

sqlite的全文搜索支持搜索整个表,包括所有索引列, or 搜索单个列。这个 match() 方法可用于将搜索限制为单个列:

class SearchIndex(FTSModel):
    title = SearchField()
    body = SearchField()

# Search *only* the title field and return results ordered by
# relevance, using bm25.
query = (SearchIndex
         .select(SearchIndex, SearchIndex.bm25().alias('score'))
         .where(SearchIndex.title.match('python'))
         .order_by(SearchIndex.bm25()))

搜索 all 索引列,使用 FTSModel.match() 方法:

# Searches *both* the title and body and return results ordered by
# relevance, using bm25.
query = (SearchIndex
         .select(SearchIndex, SearchIndex.bm25().alias('score'))
         .where(SearchIndex.match('python'))
         .order_by(SearchIndex.bm25()))
class VirtualModel

设计用于表示虚拟表的模型类。默认的元数据设置略有不同,以匹配虚拟表经常使用的元数据设置。

元数据选项:

  • arguments -传递给虚拟表构造函数的参数。
  • extension_module -用于虚拟表的扩展名。
  • options -要在虚拟表中应用的设置字典
    建造师。
  • primary_key -默认为 False ,表示没有主键。

这些都是按以下方式组合的:

CREATE VIRTUAL TABLE <table_name>
USING <extension_module>
([prefix_arguments, ...] fields, ... [arguments, ...], [options...])
class FTSModel

的子类 VirtualModel 用于 FTS3 and FTS4 全文搜索扩展。

FTSModel子类应正常定义,但有几个注意事项:

  • 不支持唯一约束、非空约束、检查约束和外键。
  • 完全忽略字段索引和多列索引
  • sqlite将所有列类型视为 TEXT (尽管您可以存储其他数据类型,但sqlite会将其视为文本)。
  • FTS模型包含 rowid 由sqlite自动创建和管理的字段(除非您选择在模型创建期间显式设置它)。查找此列 快速高效.

鉴于这些约束,强烈建议在 FTSModel 子类是的实例 SearchField (尽管明确声明 RowIDField )使用 SearchField 有助于防止意外创建无效的列约束。如果希望在索引中存储元数据,但不希望将其包含在全文索引中,请指定 unindexed=True 当实例化 SearchField .

上述唯一例外是 rowid 主键,可以使用 RowIDField .查找 rowid 效率很高。如果使用FTS4,也可以使用 DocIDField ,这是rowid的别名(尽管这样做没有好处)。

由于缺少辅助索引,通常使用 rowid 主键作为指向常规表中某一行的指针。例如:

class Document(Model):
    # Canonical source of data, stored in a regular table.
    author = ForeignKeyField(User, backref='documents')
    title = TextField(null=False, unique=True)
    content = TextField(null=False)
    timestamp = DateTimeField()

    class Meta:
        database = db

class DocumentIndex(FTSModel):
    # Full-text search index.
    rowid = RowIDField()
    title = SearchField()
    content = SearchField()

    class Meta:
        database = db
        # Use the porter stemming algorithm to tokenize content.
        options = {'tokenize': 'porter'}

要在文档索引中存储文档,我们将 INSERT 排成一行 DocumentIndex 表,手动设置 rowid 使其与对应的 Document

def store_document(document):
    DocumentIndex.insert({
        DocumentIndex.rowid: document.id,
        DocumentIndex.title: document.title,
        DocumentIndex.content: document.content}).execute()

要执行搜索并返回排名结果,我们可以查询 Document 上的表和联接 DocumentIndex . 此联接将是有效的,因为查找FTSModel的 rowid 字段很快:

def search(phrase):
    # Query the search index and join the corresponding Document
    # object on each search result.
    return (Document
            .select()
            .join(
                DocumentIndex,
                on=(Document.id == DocumentIndex.rowid))
            .where(DocumentIndex.match(phrase))
            .order_by(DocumentIndex.bm25()))

警告

上的所有SQL查询 FTSModel 类将是全表扫描 except 全文搜索和 rowid 查找。

如果正在索引的内容的主源存在于单独的表中,则可以通过指示sqlite不存储搜索索引内容的其他副本来节省一些磁盘空间。SQLite仍将创建对内容执行搜索所需的元数据和数据结构,但内容本身不会存储在搜索索引中。

要完成此操作,可以使用 content 选择权。这个 FTS4 documentation 有更多信息。

下面是一个简短的示例,说明如何使用peewee实现此功能:

class Blog(Model):
    title = TextField()
    pub_date = DateTimeField(default=datetime.datetime.now)
    content = TextField()  # We want to search this.

    class Meta:
        database = db

class BlogIndex(FTSModel):
    content = SearchField()

    class Meta:
        database = db
        options = {'content': Blog.content}  # <-- specify data source.

db.create_tables([Blog, BlogIndex])

# Now, we can manage content in the BlogIndex. To populate the
# search index:
BlogIndex.rebuild()

# Optimize the index.
BlogIndex.optimize()

这个 content 选项接受单个 Field 或A Model 可以减少数据库文件使用的存储量。但是,需要手动将内容移到/移出关联的 FTSModel .

classmethod match(term)
参数:term -- 搜索词或表达式。

生成一个SQL表达式,表示在表中搜索给定的术语或表达式。SQLite使用 MATCH 用于指示全文搜索的运算符。

例子:

# Search index for "search phrase" and return results ranked
# by relevancy using the BM25 algorithm.
query = (DocumentIndex
         .select()
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.bm25()))
for result in query:
    print('Result: %s' % result.title)
classmethod search(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])
参数:
  • term (str) -- 要使用的搜索词。
  • weights -- 列的权重列表,按列在表中的位置排序。 Or, 由字段名或字段名键入并映射到值的字典。
  • with_score -- 分数是否应作为 SELECT 语句。
  • score_alias (str) -- 用于计算的排名分数的别名。这是用于访问分数的属性,如果 with_score=True .
  • explicit_ordering (bool) -- 使用完整的SQL函数计算排名,而不是简单地引用ORDERBY子句中的分数别名。

根据匹配的质量搜索术语和排序结果的简写方法。

注解

该方法使用一种简化的算法来确定结果的相关性等级。要获得更复杂的结果排名,请使用 search_bm25() 方法。

# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print(result.title)

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print(result.title, result.search_score)
classmethod search_bm25(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])
参数:
  • term (str) -- 要使用的搜索词。
  • weights -- 列的权重列表,按列在表中的位置排序。 Or, 由字段名或字段名键入并映射到值的字典。
  • with_score -- 分数是否应作为 SELECT 语句。
  • score_alias (str) -- 用于计算的排名分数的别名。这是用于访问分数的属性,如果 with_score=True .
  • explicit_ordering (bool) -- 使用完整的SQL函数计算排名,而不是简单地引用ORDERBY子句中的分数别名。

使用bm25算法通过匹配的质量搜索术语和排序结果的简捷方法。

注意

BM25排名算法仅适用于FTS4。如果使用FTS3,请使用 search() 方法。

classmethod search_bm25f(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])

等同于 FTSModel.search_bm25() 但使用的是BM25F变种的BM25排序算法。

classmethod search_lucene(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])

等同于 FTSModel.search_bm25() ,但使用Lucene搜索引擎的结果排序算法。

classmethod rank([col1_weight, col2_weight...coln_weight])
参数:col_weight (float) -- (可选)给予 ith 模型的列。默认情况下,所有列的权重都为 1.0 .

生成一个表达式,该表达式将计算并返回搜索匹配的质量。这个 rank 可用于对搜索结果进行排序。等级分数越高表示匹配越好。

这个 rank 函数接受可选参数,这些参数允许您为各个列指定权重。如果未指定权重,则认为所有列的重要性相同。

注解

使用的算法 rank() 比较简单,速度也比较快。要获得更复杂的结果排名,请使用:

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.rank().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.rank()))

for search_result in query:
    print search_result.title, search_result.score
classmethod bm25([col1_weight, col2_weight...coln_weight])
参数:col_weight (float) -- (可选)给予 ith 模型的列。默认情况下,所有列的权重都为 1.0 .

生成一个表达式,该表达式将使用 BM25 algorithm . 该值可用于对搜索结果进行排序,分数越高,匹配越好。

喜欢 rank()bm25 函数接受可选参数,这些参数允许您为各个列指定权重。如果未指定权重,则认为所有列的重要性相同。

注意

BM25结果排序算法需要FTS4。如果使用FTS3,请使用 rank() 相反。

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.bm25().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.bm25()))

for search_result in query:
    print(search_result.title, search_result.score)

注解

上面的代码示例相当于调用 search_bm25() 方法:

query = DocumentIndex.search_bm25('search phrase', with_score=True)
for search_result in query:
    print(search_result.title, search_result.score)
classmethod bm25f([col1_weight, col2_weight...coln_weight])

相同的 bm25() 但它使用了BM25排名算法的BM25F变体。

classmethod lucene([col1_weight, col2_weight...coln_weight])

相同的 bm25() 除了使用Lucene搜索结果排序算法。

classmethod rebuild()

重新生成搜索索引--仅当 content 创建表期间指定了选项。

classmethod optimize()

优化搜索索引。

class FTS5Model

的子类 VirtualModel 用于 FTS5 全文搜索扩展。

FTS5Model子类应正常定义,但有几个注意事项:

  • fts5显式地不允许在列上指定任何约束、数据类型或索引。因此,所有列 must 是实例 SearchField .
  • FTS5模型包含 rowid 由sqlite自动创建和管理的字段(除非您选择在模型创建期间显式设置它)。查找此列 快速高效.
  • 不支持字段索引和多列索引。

这个 FTS5 扩展附带了BM25排名功能的内置实现。因此, searchsearch_bm25 方法已被重写以使用内置的排名函数,而不是用户定义的函数。

classmethod fts5_installed()

返回一个布尔值,指示是否安装了fts5扩展。如果未安装,将尝试加载扩展。

classmethod search(term[, weights=None[, with_score=False[, score_alias='score']]])
参数:
  • term (str) -- 要使用的搜索词。
  • weights -- 列的权重列表,按列在表中的位置排序。 Or, 由字段名或字段名键入并映射到值的字典。
  • with_score -- 分数是否应作为 SELECT 语句。
  • score_alias (str) -- 用于计算的排名分数的别名。这是用于访问分数的属性,如果 with_score=True .
  • explicit_ordering (bool) -- 使用完整的SQL函数计算排名,而不是简单地引用ORDERBY子句中的分数别名。

根据匹配的质量搜索术语和排序结果的简写方法。这个 FTS5 扩展提供了BM25算法的内置实现,用于按相关性对结果进行排序。

分数越高,比赛越好。

# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print(result.title)

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print(result.title, result.search_score)
classmethod search_bm25(term[, weights=None[, with_score=False[, score_alias='score']]])

使用FTS5, search_bm25()search() 方法。

classmethod rank([col1_weight, col2_weight...coln_weight])
参数:col_weight (float) -- (可选)给予 ith 模型的列。默认情况下,所有列的权重都为 1.0 .

生成一个表达式,该表达式将使用 BM25 algorithm . 该值可用于对搜索结果进行排序,分数越高,匹配越好。

这个 rank() 函数接受可选参数,这些参数允许您为各个列指定权重。如果未指定权重,则认为所有列的重要性相同。

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.rank().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.rank()))

for search_result in query:
    print(search_result.title, search_result.score)

注解

上面的代码示例相当于调用 search() 方法:

query = DocumentIndex.search('search phrase', with_score=True)
for search_result in query:
    print(search_result.title, search_result.score)
classmethod bm25([col1_weight, col2_weight...coln_weight])

因为fts5为bm25提供了内置支持, bm25() 方法与 rank() 方法。

classmethod VocabModel([table_type='row'|'col'|'instance'[, table_name=None]])
参数:
  • table_type (str) -- “row”、“col”或“instance”。
  • table_name -- 声乐表的名称。如果未指定,则为“fts5tablename_v”。

生成适用于访问 vocab table 对应于fts5搜索索引。

class TableFunction

实现用户定义的表值函数。不像一个简单的 scalar or aggregate 函数返回单个标量值,表值函数可以返回任意数量的表格数据行。

简单例子:

from playhouse.sqlite_ext import TableFunction


class Series(TableFunction):
    # Name of columns in each row of generated data.
    columns = ['value']

    # Name of parameters the function may be called with.
    params = ['start', 'stop', 'step']

    def initialize(self, start=0, stop=None, step=1):
        """
        Table-functions declare an initialize() method, which is
        called with whatever arguments the user has called the
        function with.
        """
        self.start = self.current = start
        self.stop = stop or float('Inf')
        self.step = step

    def iterate(self, idx):
        """
        Iterate is called repeatedly by the SQLite database engine
        until the required number of rows has been read **or** the
        function raises a `StopIteration` signalling no more rows
        are available.
        """
        if self.current > self.stop:
            raise StopIteration

        ret, self.current = self.current, self.current + self.step
        return (ret,)

# Register the table-function with our database, which ensures it
# is declared whenever a connection is opened.
db.table_function('series')(Series)

# Usage:
cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
for value, in cursor:
    print(value)

注解

A TableFunction 必须先用数据库连接注册,然后才能使用它。为了确保table函数始终可用,可以使用 SqliteDatabase.table_function() decorator在数据库中注册函数。

TableFunction 实现必须提供两个属性并实现两个方法,如下所述。

columns

包含函数返回数据的列名称的列表。例如,用于在分隔符上拆分字符串的函数可以指定3列: [substring, start_idx, end_idx] .

params

可以用函数调用的参数的名称。应列出所有参数,包括可选参数。例如,用于在分隔符上拆分字符串的函数可以指定2个参数: [string, delimiter] .

name

Optional -指定表函数的名称。如果没有提供,名称将从类名中提取。

print_tracebacks = True

打印表函数回调方法中发生的任何错误的完整跟踪。当设置为false时,只有通用operationalerror可见。

initialize(**parameter_values)
参数:parameter_values -- 调用函数时使用的参数。
返回:没有返回值。

这个 initialize 调用方法以使用用户在调用函数时指定的参数初始化表函数。

iterate(idx)
参数:idx (int) -- 当前迭代步骤
返回:与中命名的列对应的行数据元组。 columns 属性。
引发:StopIteration -- 表示没有更多行可用。

此函数重复调用,并返回连续的数据行。函数可以在使用所有行之前终止(特别是当用户指定了 LIMIT 在结果上)。或者,该函数可以通过提高 StopIteration 例外。

classmethod register(conn)
参数:conn -- A sqlite3.Connection 对象。

用db-api 2.0注册table函数 sqlite3.Connection 对象。表值函数 must 在可以在查询中使用它们之前进行注册。

例子:

class MyTableFunction(TableFunction):
    name = 'my_func'
    # ... other attributes and methods ...

db = SqliteDatabase(':memory:')
db.connect()

MyTableFunction.register(db.connection())

确保 TableFunction 每次打开连接时都注册,请使用 table_function() 装饰者。

ClosureTable(model_class[, foreign_key=None[, referencing_class=None[, referencing_key=None]]])
参数:
  • model_class -- 包含树中节点的模型类。
  • foreign_key -- 模型类上的自引用父节点字段。如果没有提供,Peewee将反省模型以找到合适的密钥。
  • referencing_class -- 多对多关系的中间表。
  • referencing_key -- 对于多对多关系来说,是关系的起源方。
返回:

返回A VirtualModel 用于处理关闭表。

用于创建适用于使用的模型类的工厂函数 transitive closure table. Closure tables are VirtualModel 使用可传递闭包sqlite扩展的子类。这些特殊的表旨在使高效查询分层数据变得容易。sqlite扩展名在后台管理一个avl树,在表更改时透明地更新树,并使对分层数据执行常见查询变得容易。

要在项目中使用闭包表扩展,需要:

  1. sqlite扩展的副本。源代码可以在 SQLite code repository 或者通过克隆 this gist

    $ git clone https://gist.github.com/coleifer/7f3593c5c2a645913b92 closure
    $ cd closure/
    
  2. 将扩展编译为共享库,例如

    $ gcc -g -fPIC -shared closure.c -o closure.so
    
  3. 为分层数据创建模型。这里唯一的要求是模型有一个整数主键和一个自引用的外键。任何附加字段都可以。

    class Category(Model):
        name = CharField()
        metadata = TextField()
        parent = ForeignKeyField('self', index=True, null=True)  # Required.
    
    # Generate a model for the closure virtual table.
    CategoryClosure = ClosureTable(Category)
    

    自我参照性也可以通过中间表实现(对于多对多关系)。

    class User(Model):
        name = CharField()
    
    class UserRelations(Model):
        user = ForeignKeyField(User)
        knows = ForeignKeyField(User, backref='_known_by')
    
        class Meta:
            primary_key = CompositeKey('user', 'knows') # Alternatively, a unique index on both columns.
    
    # Generate a model for the closure virtual table, specifying the UserRelations as the referencing table
    UserClosure = ClosureTable(
        User,
        referencing_class=UserRelations,
        foreign_key=UserRelations.knows,
        referencing_key=UserRelations.user)
    
  4. 在应用程序代码中,确保在实例化 Database 对象。这是通过将路径传递到共享库 load_extension() 方法。

    db = SqliteExtDatabase('my_database.db')
    db.load_extension('/path/to/closure')
    

警告

在使用 transitive_closure 延伸。首先,它要求 source model 有一个整数主键。其次,强烈建议您在自引用外键上创建索引。

例子:

class Category(Model):
    name = CharField()
    metadata = TextField()
    parent = ForeignKeyField('self', index=True, null=True)  # Required.

# Generate a model for the closure virtual table.
CategoryClosure = ClosureTable(Category)

 # Create the tables if they do not exist.
 db.create_tables([Category, CategoryClosure], True)

现在可以使用闭包表中的数据执行有趣的查询:

# Get all ancestors for a particular node.
laptops = Category.get(Category.name == 'Laptops')
for parent in Closure.ancestors(laptops):
    print parent.name

# Computer Hardware
# Computers
# Electronics
# All products

# Get all descendants for a particular node.
hardware = Category.get(Category.name == 'Computer Hardware')
for node in Closure.descendants(hardware):
    print node.name

# Laptops
# Desktops
# Hard-drives
# Monitors
# LCD Monitors
# LED Monitors

API VirtualModel 返回的 ClosureTable() .

class BaseClosureTable
id

给定节点的主键字段。

depth

表示给定节点的相对深度的字段。

root

表示相对根节点的字段。

descendants(node[, depth=None[, include_node=False]])

检索给定节点的所有后代。如果指定了深度,则只返回该深度处的节点(相对于给定节点)。

node = Category.get(Category.name == 'Electronics')

# Direct child categories.
children = CategoryClosure.descendants(node, depth=1)

# Grand-child categories.
children = CategoryClosure.descendants(node, depth=2)

# Descendants at all depths.
all_descendants = CategoryClosure.descendants(node)
ancestors(node[, depth=None[, include_node=False]])

检索给定节点的所有祖先。如果指定了深度,则只返回该深度处的节点(相对于给定节点)。

node = Category.get(Category.name == 'Laptops')

# All ancestors.
all_ancestors = CategoryClosure.ancestors(node)

# Grand-parent category.
grandparent = CategoryClosure.ancestores(node, depth=2)
siblings(node[, include_node=False])

检索作为指定节点父级的子级的所有节点。

注解

要深入讨论sqlite可传递闭包扩展,请查看此博客文章, Querying Tree Structures in SQLite using Python and the Transitive Closure Extension .

class LSMTable

VirtualModel 子类适用于 lsm1 extension 这个 lsm1 扩展是一个虚拟表,它为 lsm key/value storage engine from SQLite4 .

注解

LSM1扩展还没有发布(在编写时是3.22版的sqlite),因此考虑到这个特性是实验性的,在随后的版本中可能会发生变化。

LSM表定义一个主键列和任意数量的附加值列(这些列被序列化并存储在存储引擎的单个值字段中)。主键必须全部为同一类型,并使用以下字段类型之一:

由于LSM存储引擎是键/值存储,因此应用程序必须指定主键(包括整数)。

注意

LSM引擎不支持辅助索引,因此唯一有效的查询将是对主键的查找(或范围查询)。可以查询和筛选其他字段,但可能导致完整的表扫描。

示例模型声明:

db = SqliteExtDatabase('my_app.db')
db.load_extension('lsm.so')  # Load shared library.

class EventLog(LSMTable):
    timestamp = IntegerField(primary_key=True)
    action = TextField()
    sender = TextField()
    target = TextField()

    class Meta:
        database = db
        filename = 'eventlog.ldb'  # LSM data is stored in separate db.

# Declare virtual table.
EventLog.create_table()

示例查询:

# Use dictionary operators to get, set and delete rows from the LSM
# table. Slices may be passed to represent a range of key values.
def get_timestamp():
    # Return time as integer expressing time in microseconds.
    return int(time.time() * 1000000)

# Create a new row, at current timestamp.
ts = get_timestamp()
EventLog[ts] = ('pageview', 'search', '/blog/some-post/')

# Retrieve row from event log.
log = EventLog[ts]
print(log.action, log.sender, log.target)
# Prints ("pageview", "search", "/blog/some-post/")

# Delete the row.
del EventLog[ts]

# We can also use the "create()" method.
EventLog.create(
    timestamp=get_timestamp(),
    action='signup',
    sender='newsletter',
    target='sqlite-news')

简单键/值模型声明:

class KV(LSMTable):
    key = TextField(primary_key=True)
    value = TextField()

    class Meta:
        database = db
        filename = 'kv.ldb'

db.create_tables([KV])

对于由单个值字段组成的表,Peewee将在获取单个项时直接返回值。您还可以请求行切片,在这种情况下,peewee返回相应的 Select 可以迭代的查询。以下是一些例子:

>>> KV['k0'] = 'v0'
>>> print(KV['k0'])
'v0'

>>> data = [{'key': 'k%d' % i, 'value': 'v%d' % i} for i in range(20)]
>>> KV.insert_many(data).execute()

>>> KV.select().count()
20

>>> KV['k8']
'v8'

>>> list(KV['k4.1':'k7.x']
[Row(key='k5', value='v5'),
 Row(key='k6', value='v6'),
 Row(key='k7', value='v7')]

>>> list(KV['k6xxx':])
[Row(key='k7', value='v7'),
 Row(key='k8', value='v8'),
 Row(key='k9', value='v9')]

您还可以索引 LSMTable 使用表达式:

>>> list(KV[KV.key > 'k6'])
[Row(key='k7', value='v7'),
 Row(key='k8', value='v8'),
 Row(key='k9', value='v9')]

>>> list(KV[(KV.key > 'k6') & (KV.value != 'v8')])
[Row(key='k7', value='v7'),
 Row(key='k9', value='v9')]

可以使用删除单行 del 或使用切片或表达式的多行:

>>> del KV['k1']
>>> del KV['k3x':'k8']
>>> del KV[KV.key.between('k10', 'k18')]

>>> list(KV[:])
[Row(key='k0', value='v0'),
 Row(key='k19', value='v19'),
 Row(key='k2', value='v2'),
 Row(key='k3', value='v3'),
 Row(key='k9', value='v9')]

尝试获取单个不存在的密钥将导致 KeyError ,但切片不会引发异常:

>>> KV['k1']
...
KeyError: 'k1'

>>> list(KV['k1':'k1'])
[]
class ZeroBlob(length)
参数:length (int) -- blob的大小(字节)。

ZeroBlob 仅用于为存储支持增量I/O的blob保留空间。若要使用 SQLite BLOB-store 必须首先将所需大小的零块插入要与增量I/O一起使用的行中。

例如,请参见 Blob .

class Blob(database, table, column, rowid[, read_only=False])
参数:
  • database -- SqliteExtDatabase 实例。
  • table (str) -- 正在访问的表的名称。
  • column (str) -- 正在访问的列的名称。
  • rowid (int) -- 正在访问的行的主键。
  • read_only (bool) -- 阻止对blob数据进行任何修改。

打开存储在给定表/列/行中的blob进行增量I/O。要为新数据分配存储,可以使用 ZeroBlob ,效率很高。

class RawData(Model):
    data = BlobField()

# Allocate 100MB of space for writing a large file incrementally:
query = RawData.insert({'data': ZeroBlob(1024 * 1024 * 100)})
rowid = query.execute()

# Now we can open the row for incremental I/O:
blob = Blob(db, 'rawdata', 'data', rowid)

# Read from the file and write to the blob in chunks of 4096 bytes.
while True:
    data = file_handle.read(4096)
    if not data:
        break
    blob.write(data)

bytes_written = blob.tell()
blob.close()
read([n=None])
参数:n (int) -- 最多只能读取 n 文件中当前位置的字节。

读到 n 来自blob文件中当前位置的字节。如果 n 未指定,将读取整个blob。

seek(offset[, whence=0])
参数:
  • offset (int) -- 在文件中查找给定的偏移量。
  • whence (int) -- 相对于指定的参照系查找。

值为 whence

  • 0 :文件开头
  • 1 :当前位置
  • 2 文件结束
tell()

返回文件中的当前偏移量。

write(data)
参数:data (bytes) -- 要写入的数据

从文件中的当前位置开始写入给定的数据。

close()

关闭文件并释放关联资源。

reopen(rowid)
参数:rowid (int) -- 要打开的行的主键。

如果已经为给定的表/列打开了一个blob,则可以使用 reopen() 重复使用的方法 Blob 用于访问表中多行的对象。

其他功能

这个 SqliteExtDatabase 接受初始化选项以注册对简单 bloom filter . 一旦初始化,Bloom过滤器就可以用于对大型数据集进行有效的成员查询。

下面是一个例子:

db = CSqliteExtDatabase(':memory:', bloomfilter=True)

# Create and define a table to store some data.
db.execute_sql('CREATE TABLE "register" ("data" TEXT)')
Register = Table('register', ('data',)).bind(db)

# Populate the database with a bunch of text.
with db.atomic():
    for i in 'abcdefghijklmnopqrstuvwxyz':
        keys = [i * j for j in range(1, 10)]  # a, aa, aaa, ... aaaaaaaaa
        Register.insert([{'data': key} for key in keys]).execute()

# Collect data into a 16KB bloomfilter.
query = Register.select(fn.bloomfilter(Register.data, 16 * 1024).alias('buf'))
row = query.get()
buf = row['buf']

# Use bloomfilter buf to test whether other keys are members.
test_keys = (
    ('aaaa', True),
    ('abc', False),
    ('zzzzzzz', True),
    ('zyxwvut', False))
for key, is_present in test_keys:
    query = Register.select(fn.bloomfilter_contains(key, buf).alias('is_member'))
    answer = query.get()['is_member']
    assert answer == is_present

这个 SqliteExtDatabase 还可以注册其他有用的函数:

  • rank_functions (默认启用):注册用于排序搜索结果的函数,例如 bm25 和 卢肯.
  • hash_functions :注册md5、sha1、sha256、adler32、crc32和杂音散列函数。
  • regexp_function :注册regexp函数。

实例:

def create_new_user(username, password):
    # DO NOT DO THIS IN REAL LIFE. PLEASE.
    query = User.insert({'username': username, 'password': fn.sha1(password)})
    new_user_id = query.execute()

你可以使用 murmurhash 函数将字节散列为整数以用于压缩存储:

>>> db = SqliteExtDatabase(':memory:', hash_functions=True)
>>> db.execute_sql('SELECT murmurhash(?)', ('abcdefg',)).fetchone()
(4188131059,)