sqlite-utils/tests/test_introspect.py
from sqlite_utils.db import Index, View, Database, XIndex, XIndexColumn
import pytest
def test_table_names(existing_db):
assert ["foo"] == existing_db.table_names()
def test_view_names(fresh_db):
fresh_db.create_view("foo_view", "select 1")
assert ["foo_view"] == fresh_db.view_names()
def test_table_names_fts4(existing_db):
existing_db["woo"].insert({"title": "Hello"}).enable_fts(
["title"], fts_version="FTS4"
)
existing_db["woo2"].insert({"title": "Hello"}).enable_fts(
["title"], fts_version="FTS5"
)
assert ["woo_fts"] == existing_db.table_names(fts4=True)
assert ["woo2_fts"] == existing_db.table_names(fts5=True)
def test_detect_fts(existing_db):
existing_db["woo"].insert({"title": "Hello"}).enable_fts(
["title"], fts_version="FTS4"
)
existing_db["woo2"].insert({"title": "Hello"}).enable_fts(
["title"], fts_version="FTS5"
)
assert "woo_fts" == existing_db["woo"].detect_fts()
assert "woo_fts" == existing_db["woo_fts"].detect_fts()
assert "woo2_fts" == existing_db["woo2"].detect_fts()
assert "woo2_fts" == existing_db["woo2_fts"].detect_fts()
assert existing_db["foo"].detect_fts() is None
@pytest.mark.parametrize("reverse_order", (True, False))
def test_detect_fts_similar_tables(fresh_db, reverse_order):
# https://github.com/simonw/sqlite-utils/issues/434
table1, table2 = ("demo", "demo2")
if reverse_order:
table1, table2 = table2, table1
fresh_db[table1].insert({"title": "Hello"}).enable_fts(
["title"], fts_version="FTS4"
)
fresh_db[table2].insert({"title": "Hello"}).enable_fts(
["title"], fts_version="FTS4"
)
assert fresh_db[table1].detect_fts() == "{}_fts".format(table1)
assert fresh_db[table2].detect_fts() == "{}_fts".format(table2)
def test_tables(existing_db):
assert len(existing_db.tables) == 1
assert existing_db.tables[0].name == "foo"
def test_views(fresh_db):
fresh_db.create_view("foo_view", "select 1")
assert len(fresh_db.views) == 1
view = fresh_db.views[0]
assert isinstance(view, View)
assert view.name == "foo_view"
assert repr(view) == "<View foo_view (1)>"
assert view.columns_dict == {"1": str}
def test_count(existing_db):
assert existing_db["foo"].count == 3
assert existing_db["foo"].count_where() == 3
assert existing_db["foo"].execute_count() == 3
def test_count_where(existing_db):
assert existing_db["foo"].count_where("text != ?", ["two"]) == 2
assert existing_db["foo"].count_where("text != :t", {"t": "two"}) == 2
def test_columns(existing_db):
table = existing_db["foo"]
assert [{"name": "text", "type": "TEXT"}] == [
{"name": col.name, "type": col.type} for col in table.columns
]
def test_table_schema(existing_db):
assert existing_db["foo"].schema == "CREATE TABLE foo (text TEXT)"
def test_database_schema(existing_db):
assert existing_db.schema == "CREATE TABLE foo (text TEXT);"
def test_table_repr(fresh_db):
table = fresh_db["dogs"].insert({"name": "Cleo", "age": 4})
assert "<Table dogs (name, age)>" == repr(table)
assert "<Table cats (does not exist yet)>" == repr(fresh_db["cats"])
def test_indexes(fresh_db):
fresh_db.executescript(
"""
create table Gosh (c1 text, c2 text, c3 text);
create index Gosh_c1 on Gosh(c1);
create index Gosh_c2c3 on Gosh(c2, c3);
"""
)
assert [
Index(
seq=0,
name="Gosh_c2c3",
unique=0,
origin="c",
partial=0,
columns=["c2", "c3"],
),
Index(seq=1, name="Gosh_c1", unique=0, origin="c", partial=0, columns=["c1"]),
] == fresh_db["Gosh"].indexes
def test_xindexes(fresh_db):
fresh_db.executescript(
"""
create table Gosh (c1 text, c2 text, c3 text);
create index Gosh_c1 on Gosh(c1);
create index Gosh_c2c3 on Gosh(c2, c3 desc);
"""
)
assert fresh_db["Gosh"].xindexes == [
XIndex(
name="Gosh_c2c3",
columns=[
XIndexColumn(seqno=0, cid=1, name="c2", desc=0, coll="BINARY", key=1),
XIndexColumn(seqno=1, cid=2, name="c3", desc=1, coll="BINARY", key=1),
XIndexColumn(seqno=2, cid=-1, name=None, desc=0, coll="BINARY", key=0),
],
),
XIndex(
name="Gosh_c1",
columns=[
XIndexColumn(seqno=0, cid=0, name="c1", desc=0, coll="BINARY", key=1),
XIndexColumn(seqno=1, cid=-1, name=None, desc=0, coll="BINARY", key=0),
],
),
]
@pytest.mark.parametrize(
"column,expected_table_guess",
(
("author", "authors"),
("author_id", "authors"),
("authors", "authors"),
("genre", "genre"),
("genre_id", "genre"),
),
)
def test_guess_foreign_table(fresh_db, column, expected_table_guess):
fresh_db.create_table("authors", {"name": str})
fresh_db.create_table("genre", {"name": str})
assert expected_table_guess == fresh_db["books"].guess_foreign_table(column)
@pytest.mark.parametrize(
"pk,expected", ((None, ["rowid"]), ("id", ["id"]), (["id", "id2"], ["id", "id2"]))
)
def test_pks(fresh_db, pk, expected):
fresh_db["foo"].insert_all([{"id": 1, "id2": 2}], pk=pk)
assert expected == fresh_db["foo"].pks
def test_triggers_and_triggers_dict(fresh_db):
assert [] == fresh_db.triggers
authors = fresh_db["authors"]
authors.insert_all(
[
{"name": "Frank Herbert", "famous_works": "Dune"},
{"name": "Neal Stephenson", "famous_works": "Cryptonomicon"},
]
)
fresh_db["other"].insert({"foo": "bar"})
assert authors.triggers == []
assert authors.triggers_dict == {}
assert fresh_db["other"].triggers == []
assert fresh_db.triggers_dict == {}
authors.enable_fts(
["name", "famous_works"], fts_version="FTS4", create_triggers=True
)
expected_triggers = {
("authors_ai", "authors"),
("authors_ad", "authors"),
("authors_au", "authors"),
}
assert expected_triggers == {(t.name, t.table) for t in fresh_db.triggers}
assert expected_triggers == {
(t.name, t.table) for t in fresh_db["authors"].triggers
}
expected_triggers = {
"authors_ai": (
"CREATE TRIGGER [authors_ai] AFTER INSERT ON [authors] BEGIN\n"
" INSERT INTO [authors_fts] (rowid, [name], [famous_works]) VALUES (new.rowid, new.[name], new.[famous_works]);\n"
"END"
),
"authors_ad": (
"CREATE TRIGGER [authors_ad] AFTER DELETE ON [authors] BEGIN\n"
" INSERT INTO [authors_fts] ([authors_fts], rowid, [name], [famous_works]) VALUES('delete', old.rowid, old.[name], old.[famous_works]);\n"
"END"
),
"authors_au": (
"CREATE TRIGGER [authors_au] AFTER UPDATE ON [authors] BEGIN\n"
" INSERT INTO [authors_fts] ([authors_fts], rowid, [name], [famous_works]) VALUES('delete', old.rowid, old.[name], old.[famous_works]);\n"
" INSERT INTO [authors_fts] (rowid, [name], [famous_works]) VALUES (new.rowid, new.[name], new.[famous_works]);\nEND"
),
}
assert authors.triggers_dict == expected_triggers
assert fresh_db["other"].triggers == []
assert fresh_db["other"].triggers_dict == {}
assert fresh_db.triggers_dict == expected_triggers
def test_has_counts_triggers(fresh_db):
authors = fresh_db["authors"]
authors.insert({"name": "Frank Herbert"})
assert not authors.has_counts_triggers
authors.enable_counts()
assert authors.has_counts_triggers
@pytest.mark.parametrize(
"sql,expected_name,expected_using",
[
(
"""
CREATE VIRTUAL TABLE foo USING FTS5(name)
""",
"foo",
"FTS5",
),
(
"""
CREATE VIRTUAL TABLE "foo" USING FTS4(name)
""",
"foo",
"FTS4",
),
(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS `foo` USING FTS4(name)
""",
"foo",
"FTS4",
),
(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS `foo` USING fts5(name)
""",
"foo",
"FTS5",
),
(
"""
CREATE TABLE IF NOT EXISTS `foo` (id integer primary key)
""",
"foo",
None,
),
],
)
def test_virtual_table_using(fresh_db, sql, expected_name, expected_using):
fresh_db.execute(sql)
assert fresh_db[expected_name].virtual_table_using == expected_using
def test_use_rowid(fresh_db):
fresh_db["rowid_table"].insert({"name": "Cleo"})
fresh_db["regular_table"].insert({"id": 1, "name": "Cleo"}, pk="id")
assert fresh_db["rowid_table"].use_rowid
assert not fresh_db["regular_table"].use_rowid
@pytest.mark.skipif(
not Database(memory=True).supports_strict,
reason="Needs SQLite version that supports strict",
)
@pytest.mark.parametrize(
"create_table,expected_strict",
(
("create table t (id integer) strict", True),
("create table t (id integer) STRICT", True),
("create table t (id integer primary key) StriCt, WITHOUT ROWID", True),
("create table t (id integer primary key) WITHOUT ROWID", False),
("create table t (id integer)", False),
),
)
def test_table_strict(fresh_db, create_table, expected_strict):
fresh_db.execute(create_table)
table = fresh_db["t"]
assert table.strict == expected_strict
@pytest.mark.parametrize(
"value",
(
1,
1.3,
"foo",
True,
b"binary",
),
)
def test_table_default_values(fresh_db, value):
fresh_db["default_values"].insert(
{"nodefault": 1, "value": value}, defaults={"value": value}
)
default_values = fresh_db["default_values"].default_values
assert default_values == {"value": value}