sqlite-utils/tests/test_create.py
from sqlite_utils.db import (
Index,
Database,
DescIndex,
AlterError,
NoObviousTable,
OperationalError,
ForeignKey,
Table,
View,
)
from sqlite_utils.utils import hash_record, sqlite3
import collections
import datetime
import decimal
import json
import pathlib
import pytest
import uuid
try:
import pandas as pd # type: ignore
except ImportError:
pd = None # type: ignore
def test_create_table(fresh_db):
assert [] == fresh_db.table_names()
table = fresh_db.create_table(
"test_table",
{
"text_col": str,
"float_col": float,
"int_col": int,
"bool_col": bool,
"bytes_col": bytes,
"datetime_col": datetime.datetime,
},
)
assert ["test_table"] == fresh_db.table_names()
assert [
{"name": "text_col", "type": "TEXT"},
{"name": "float_col", "type": "FLOAT"},
{"name": "int_col", "type": "INTEGER"},
{"name": "bool_col", "type": "INTEGER"},
{"name": "bytes_col", "type": "BLOB"},
{"name": "datetime_col", "type": "TEXT"},
] == [{"name": col.name, "type": col.type} for col in table.columns]
assert (
"CREATE TABLE [test_table] (\n"
" [text_col] TEXT,\n"
" [float_col] FLOAT,\n"
" [int_col] INTEGER,\n"
" [bool_col] INTEGER,\n"
" [bytes_col] BLOB,\n"
" [datetime_col] TEXT\n"
")"
) == table.schema
def test_create_table_compound_primary_key(fresh_db):
table = fresh_db.create_table(
"test_table", {"id1": str, "id2": str, "value": int}, pk=("id1", "id2")
)
assert (
"CREATE TABLE [test_table] (\n"
" [id1] TEXT,\n"
" [id2] TEXT,\n"
" [value] INTEGER,\n"
" PRIMARY KEY ([id1], [id2])\n"
")"
) == table.schema
assert ["id1", "id2"] == table.pks
@pytest.mark.parametrize("pk", ("id", ["id"]))
def test_create_table_with_single_primary_key(fresh_db, pk):
fresh_db["foo"].insert({"id": 1}, pk=pk)
assert (
fresh_db["foo"].schema == "CREATE TABLE [foo] (\n [id] INTEGER PRIMARY KEY\n)"
)
def test_create_table_with_invalid_column_characters(fresh_db):
with pytest.raises(AssertionError):
fresh_db.create_table("players", {"name[foo]": str})
def test_create_table_with_defaults(fresh_db):
table = fresh_db.create_table(
"players",
{"name": str, "score": int},
defaults={"score": 1, "name": "bob''bob"},
)
assert ["players"] == fresh_db.table_names()
assert [{"name": "name", "type": "TEXT"}, {"name": "score", "type": "INTEGER"}] == [
{"name": col.name, "type": col.type} for col in table.columns
]
assert (
"CREATE TABLE [players] (\n [name] TEXT DEFAULT 'bob''''bob',\n [score] INTEGER DEFAULT 1\n)"
) == table.schema
def test_create_table_with_bad_not_null(fresh_db):
with pytest.raises(AssertionError):
fresh_db.create_table(
"players", {"name": str, "score": int}, not_null={"mouse"}
)
def test_create_table_with_not_null(fresh_db):
table = fresh_db.create_table(
"players",
{"name": str, "score": int},
not_null={"name", "score"},
defaults={"score": 3},
)
assert ["players"] == fresh_db.table_names()
assert [{"name": "name", "type": "TEXT"}, {"name": "score", "type": "INTEGER"}] == [
{"name": col.name, "type": col.type} for col in table.columns
]
assert (
"CREATE TABLE [players] (\n [name] TEXT NOT NULL,\n [score] INTEGER NOT NULL DEFAULT 3\n)"
) == table.schema
@pytest.mark.parametrize(
"example,expected_columns",
(
(
{"name": "Ravi", "age": 63},
[{"name": "name", "type": "TEXT"}, {"name": "age", "type": "INTEGER"}],
),
(
{"create": "Reserved word", "table": "Another"},
[{"name": "create", "type": "TEXT"}, {"name": "table", "type": "TEXT"}],
),
({"day": datetime.time(11, 0)}, [{"name": "day", "type": "TEXT"}]),
({"decimal": decimal.Decimal("1.2")}, [{"name": "decimal", "type": "FLOAT"}]),
(
{"memoryview": memoryview(b"hello")},
[{"name": "memoryview", "type": "BLOB"}],
),
({"uuid": uuid.uuid4()}, [{"name": "uuid", "type": "TEXT"}]),
({"foo[bar]": 1}, [{"name": "foo_bar_", "type": "INTEGER"}]),
(
{"timedelta": datetime.timedelta(hours=1)},
[{"name": "timedelta", "type": "TEXT"}],
),
),
)
def test_create_table_from_example(fresh_db, example, expected_columns):
people_table = fresh_db["people"]
assert people_table.last_rowid is None
assert people_table.last_pk is None
people_table.insert(example)
assert people_table.last_rowid == 1
assert people_table.last_pk == 1
assert ["people"] == fresh_db.table_names()
assert expected_columns == [
{"name": col.name, "type": col.type} for col in fresh_db["people"].columns
]
def test_create_table_from_example_with_compound_primary_keys(fresh_db):
record = {"name": "Zhang", "group": "staff", "employee_id": 2}
table = fresh_db["people"].insert(record, pk=("group", "employee_id"))
assert ["group", "employee_id"] == table.pks
assert record == table.get(("staff", 2))
@pytest.mark.parametrize(
"method_name", ("insert", "upsert", "insert_all", "upsert_all")
)
def test_create_table_with_custom_columns(fresh_db, method_name):
table = fresh_db["dogs"]
method = getattr(table, method_name)
record = {"id": 1, "name": "Cleo", "age": "5"}
if method_name.endswith("_all"):
record = [record]
method(record, pk="id", columns={"age": int, "weight": float})
assert ["dogs"] == fresh_db.table_names()
expected_columns = [
{"name": "id", "type": "INTEGER"},
{"name": "name", "type": "TEXT"},
{"name": "age", "type": "INTEGER"},
{"name": "weight", "type": "FLOAT"},
]
assert expected_columns == [
{"name": col.name, "type": col.type} for col in table.columns
]
assert [{"id": 1, "name": "Cleo", "age": 5, "weight": None}] == list(table.rows)
@pytest.mark.parametrize("use_table_factory", [True, False])
def test_create_table_column_order(fresh_db, use_table_factory):
row = collections.OrderedDict(
(
("zzz", "third"),
("abc", "first"),
("ccc", "second"),
("bbb", "second-to-last"),
("aaa", "last"),
)
)
column_order = ("abc", "ccc", "zzz")
if use_table_factory:
fresh_db.table("table", column_order=column_order).insert(row)
else:
fresh_db["table"].insert(row, column_order=column_order)
assert [
{"name": "abc", "type": "TEXT"},
{"name": "ccc", "type": "TEXT"},
{"name": "zzz", "type": "TEXT"},
{"name": "bbb", "type": "TEXT"},
{"name": "aaa", "type": "TEXT"},
] == [{"name": col.name, "type": col.type} for col in fresh_db["table"].columns]
@pytest.mark.parametrize(
"foreign_key_specification,expected_exception",
(
# You can specify triples, pairs, or a list of columns
((("one_id", "one", "id"), ("two_id", "two", "id")), False),
((("one_id", "one"), ("two_id", "two")), False),
(("one_id", "two_id"), False),
# You can also specify ForeignKey tuples:
(
(
ForeignKey("m2m", "one_id", "one", "id"),
ForeignKey("m2m", "two_id", "two", "id"),
),
False,
),
# If you specify a column that doesn't point to a table, you get an error:
(("one_id", "two_id", "three_id"), NoObviousTable),
# Tuples of the wrong length get an error:
((("one_id", "one", "id", "five"), ("two_id", "two", "id")), AssertionError),
# Likewise a bad column:
((("one_id", "one", "id2"),), AlterError),
# Or a list of dicts
(({"one_id": "one"},), AssertionError),
),
)
@pytest.mark.parametrize("use_table_factory", [True, False])
def test_create_table_works_for_m2m_with_only_foreign_keys(
fresh_db, foreign_key_specification, expected_exception, use_table_factory
):
if use_table_factory:
fresh_db.table("one", pk="id").insert({"id": 1})
fresh_db.table("two", pk="id").insert({"id": 1})
else:
fresh_db["one"].insert({"id": 1}, pk="id")
fresh_db["two"].insert({"id": 1}, pk="id")
row = {"one_id": 1, "two_id": 1}
def do_it():
if use_table_factory:
fresh_db.table("m2m", foreign_keys=foreign_key_specification).insert(row)
else:
fresh_db["m2m"].insert(row, foreign_keys=foreign_key_specification)
if expected_exception:
with pytest.raises(expected_exception):
do_it()
return
else:
do_it()
assert [
{"name": "one_id", "type": "INTEGER"},
{"name": "two_id", "type": "INTEGER"},
] == [{"name": col.name, "type": col.type} for col in fresh_db["m2m"].columns]
assert sorted(
[
{"column": "one_id", "other_table": "one", "other_column": "id"},
{"column": "two_id", "other_table": "two", "other_column": "id"},
],
key=lambda s: repr(s),
) == sorted(
[
{
"column": fk.column,
"other_table": fk.other_table,
"other_column": fk.other_column,
}
for fk in fresh_db["m2m"].foreign_keys
],
key=lambda s: repr(s),
)
def test_self_referential_foreign_key(fresh_db):
assert [] == fresh_db.table_names()
table = fresh_db.create_table(
"test_table",
columns={
"id": int,
"ref": int,
},
pk="id",
foreign_keys=(("ref", "test_table", "id"),),
)
assert (
"CREATE TABLE [test_table] (\n"
" [id] INTEGER PRIMARY KEY,\n"
" [ref] INTEGER REFERENCES [test_table]([id])\n"
")"
) == table.schema
def test_create_error_if_invalid_foreign_keys(fresh_db):
with pytest.raises(AlterError):
fresh_db["one"].insert(
{"id": 1, "ref_id": 3},
pk="id",
foreign_keys=(("ref_id", "bad_table", "bad_column"),),
)
def test_create_error_if_invalid_self_referential_foreign_keys(fresh_db):
with pytest.raises(AlterError) as ex:
fresh_db["one"].insert(
{"id": 1, "ref_id": 3},
pk="id",
foreign_keys=(("ref_id", "one", "bad_column"),),
)
assert ex.value.args == ("No such column: one.bad_column",)
@pytest.mark.parametrize(
"col_name,col_type,not_null_default,expected_schema",
(
(
"nickname",
str,
None,
"CREATE TABLE [dogs] (\n [name] TEXT\n, [nickname] TEXT)",
),
(
"dob",
datetime.date,
None,
"CREATE TABLE [dogs] (\n [name] TEXT\n, [dob] TEXT)",
),
("age", int, None, "CREATE TABLE [dogs] (\n [name] TEXT\n, [age] INTEGER)"),
(
"weight",
float,
None,
"CREATE TABLE [dogs] (\n [name] TEXT\n, [weight] FLOAT)",
),
("text", "TEXT", None, "CREATE TABLE [dogs] (\n [name] TEXT\n, [text] TEXT)"),
(
"integer",
"INTEGER",
None,
"CREATE TABLE [dogs] (\n [name] TEXT\n, [integer] INTEGER)",
),
(
"float",
"FLOAT",
None,
"CREATE TABLE [dogs] (\n [name] TEXT\n, [float] FLOAT)",
),
("blob", "blob", None, "CREATE TABLE [dogs] (\n [name] TEXT\n, [blob] BLOB)"),
(
"default_str",
None,
None,
"CREATE TABLE [dogs] (\n [name] TEXT\n, [default_str] TEXT)",
),
(
"nickname",
str,
"",
"CREATE TABLE [dogs] (\n [name] TEXT\n, [nickname] TEXT NOT NULL DEFAULT '')",
),
(
"nickname",
str,
"dawg's dawg",
"CREATE TABLE [dogs] (\n [name] TEXT\n, [nickname] TEXT NOT NULL DEFAULT 'dawg''s dawg')",
),
),
)
def test_add_column(fresh_db, col_name, col_type, not_null_default, expected_schema):
fresh_db.create_table("dogs", {"name": str})
assert fresh_db["dogs"].schema == "CREATE TABLE [dogs] (\n [name] TEXT\n)"
fresh_db["dogs"].add_column(col_name, col_type, not_null_default=not_null_default)
assert fresh_db["dogs"].schema == expected_schema
def test_add_foreign_key(fresh_db):
fresh_db["authors"].insert_all(
[{"id": 1, "name": "Sally"}, {"id": 2, "name": "Asheesh"}], pk="id"
)
fresh_db["books"].insert_all(
[
{"title": "Hedgehogs of the world", "author_id": 1},
{"title": "How to train your wolf", "author_id": 2},
]
)
assert [] == fresh_db["books"].foreign_keys
t = fresh_db["books"].add_foreign_key("author_id", "authors", "id")
# Ensure it returned self:
assert isinstance(t, Table) and t.name == "books"
assert [
ForeignKey(
table="books", column="author_id", other_table="authors", other_column="id"
)
] == fresh_db["books"].foreign_keys
def test_add_foreign_key_if_column_contains_space(fresh_db):
fresh_db["authors"].insert_all([{"id": 1, "name": "Sally"}], pk="id")
fresh_db["books"].insert_all(
[
{"title": "Hedgehogs of the world", "author id": 1},
]
)
fresh_db["books"].add_foreign_key("author id", "authors", "id")
assert fresh_db["books"].foreign_keys == [
ForeignKey(
table="books", column="author id", other_table="authors", other_column="id"
)
]
def test_add_foreign_key_error_if_column_does_not_exist(fresh_db):
fresh_db["books"].insert(
{"id": 1, "title": "Hedgehogs of the world", "author_id": 1}
)
with pytest.raises(AlterError):
fresh_db["books"].add_foreign_key("author2_id", "books", "id")
def test_add_foreign_key_error_if_other_table_does_not_exist(fresh_db):
fresh_db["books"].insert({"title": "Hedgehogs of the world", "author_id": 1})
with pytest.raises(AlterError):
fresh_db["books"].add_foreign_key("author_id", "authors", "id")
def test_add_foreign_key_error_if_already_exists(fresh_db):
fresh_db["books"].insert({"title": "Hedgehogs of the world", "author_id": 1})
fresh_db["authors"].insert({"id": 1, "name": "Sally"}, pk="id")
fresh_db["books"].add_foreign_key("author_id", "authors", "id")
with pytest.raises(AlterError) as ex:
fresh_db["books"].add_foreign_key("author_id", "authors", "id")
assert "Foreign key already exists for author_id => authors.id" == ex.value.args[0]
def test_add_foreign_key_no_error_if_exists_and_ignore_true(fresh_db):
fresh_db["books"].insert({"title": "Hedgehogs of the world", "author_id": 1})
fresh_db["authors"].insert({"id": 1, "name": "Sally"}, pk="id")
fresh_db["books"].add_foreign_key("author_id", "authors", "id")
fresh_db["books"].add_foreign_key("author_id", "authors", "id", ignore=True)
def test_add_foreign_keys(fresh_db):
fresh_db["authors"].insert_all(
[{"id": 1, "name": "Sally"}, {"id": 2, "name": "Asheesh"}], pk="id"
)
fresh_db["categories"].insert_all([{"id": 1, "name": "Wildlife"}], pk="id")
fresh_db["books"].insert_all(
[{"title": "Hedgehogs of the world", "author_id": 1, "category_id": 1}]
)
assert [] == fresh_db["books"].foreign_keys
fresh_db.add_foreign_keys(
[
("books", "author_id", "authors", "id"),
("books", "category_id", "categories", "id"),
]
)
assert [
ForeignKey(
table="books", column="author_id", other_table="authors", other_column="id"
),
ForeignKey(
table="books",
column="category_id",
other_table="categories",
other_column="id",
),
] == sorted(fresh_db["books"].foreign_keys)
def test_add_column_foreign_key(fresh_db):
fresh_db.create_table("dogs", {"name": str})
fresh_db.create_table("breeds", {"name": str})
fresh_db["dogs"].add_column("breed_id", fk="breeds")
assert fresh_db["dogs"].schema == (
'CREATE TABLE "dogs" (\n'
" [name] TEXT,\n"
" [breed_id] INTEGER REFERENCES [breeds]([rowid])\n"
")"
)
# And again with an explicit primary key column
fresh_db.create_table("subbreeds", {"name": str, "primkey": str}, pk="primkey")
fresh_db["dogs"].add_column("subbreed_id", fk="subbreeds")
assert fresh_db["dogs"].schema == (
'CREATE TABLE "dogs" (\n'
" [name] TEXT,\n"
" [breed_id] INTEGER REFERENCES [breeds]([rowid]),\n"
" [subbreed_id] TEXT REFERENCES [subbreeds]([primkey])\n"
")"
)
def test_add_foreign_key_guess_table(fresh_db):
fresh_db.create_table("dogs", {"name": str})
fresh_db.create_table("breeds", {"name": str, "id": int}, pk="id")
fresh_db["dogs"].add_column("breed_id", int)
fresh_db["dogs"].add_foreign_key("breed_id")
assert fresh_db["dogs"].schema == (
'CREATE TABLE "dogs" (\n'
" [name] TEXT,\n"
" [breed_id] INTEGER REFERENCES [breeds]([id])\n"
")"
)
def test_index_foreign_keys(fresh_db):
test_add_foreign_key_guess_table(fresh_db)
assert [] == fresh_db["dogs"].indexes
fresh_db.index_foreign_keys()
assert [["breed_id"]] == [i.columns for i in fresh_db["dogs"].indexes]
# Calling it a second time should do nothing
fresh_db.index_foreign_keys()
assert [["breed_id"]] == [i.columns for i in fresh_db["dogs"].indexes]
def test_index_foreign_keys_if_index_name_is_already_used(fresh_db):
# https://github.com/simonw/sqlite-utils/issues/335
test_add_foreign_key_guess_table(fresh_db)
# Add index with a name that will conflict with index_foreign_keys()
fresh_db["dogs"].create_index(["name"], index_name="idx_dogs_breed_id")
fresh_db.index_foreign_keys()
assert {(idx.name, tuple(idx.columns)) for idx in fresh_db["dogs"].indexes} == {
("idx_dogs_breed_id_2", ("breed_id",)),
("idx_dogs_breed_id", ("name",)),
}
@pytest.mark.parametrize(
"extra_data,expected_new_columns",
[
({"species": "squirrels"}, [{"name": "species", "type": "TEXT"}]),
(
{"species": "squirrels", "hats": 5},
[{"name": "species", "type": "TEXT"}, {"name": "hats", "type": "INTEGER"}],
),
(
{"hats": 5, "rating": 3.5},
[{"name": "hats", "type": "INTEGER"}, {"name": "rating", "type": "FLOAT"}],
),
],
)
@pytest.mark.parametrize("use_table_factory", [True, False])
def test_insert_row_alter_table(
fresh_db, extra_data, expected_new_columns, use_table_factory
):
table = fresh_db["books"]
table.insert({"title": "Hedgehogs of the world", "author_id": 1})
assert [
{"name": "title", "type": "TEXT"},
{"name": "author_id", "type": "INTEGER"},
] == [{"name": col.name, "type": col.type} for col in table.columns]
record = {"title": "Squirrels of the world", "author_id": 2}
record.update(extra_data)
if use_table_factory:
fresh_db.table("books", alter=True).insert(record)
else:
fresh_db["books"].insert(record, alter=True)
assert [
{"name": "title", "type": "TEXT"},
{"name": "author_id", "type": "INTEGER"},
] + expected_new_columns == [
{"name": col.name, "type": col.type} for col in table.columns
]
def test_add_missing_columns_case_insensitive(fresh_db):
table = fresh_db["foo"]
table.insert({"id": 1, "name": "Cleo"}, pk="id")
table.add_missing_columns([{"Name": ".", "age": 4}])
assert (
table.schema
== "CREATE TABLE [foo] (\n [id] INTEGER PRIMARY KEY,\n [name] TEXT\n, [age] INTEGER)"
)
@pytest.mark.parametrize("use_table_factory", [True, False])
def test_insert_replace_rows_alter_table(fresh_db, use_table_factory):
first_row = {"id": 1, "title": "Hedgehogs of the world", "author_id": 1}
next_rows = [
{"id": 1, "title": "Hedgehogs of the World", "species": "hedgehogs"},
{"id": 2, "title": "Squirrels of the World", "num_species": 200},
{
"id": 3,
"title": "Badgers of the World",
"significant_continents": ["Europe", "North America"],
},
]
if use_table_factory:
table = fresh_db.table("books", pk="id", alter=True)
table.insert(first_row)
table.insert_all(next_rows, replace=True)
else:
table = fresh_db["books"]
table.insert(first_row, pk="id")
table.insert_all(next_rows, alter=True, replace=True)
assert {
"author_id": int,
"id": int,
"num_species": int,
"significant_continents": str,
"species": str,
"title": str,
} == table.columns_dict
assert [
{
"author_id": None,
"id": 1,
"num_species": None,
"significant_continents": None,
"species": "hedgehogs",
"title": "Hedgehogs of the World",
},
{
"author_id": None,
"id": 2,
"num_species": 200,
"significant_continents": None,
"species": None,
"title": "Squirrels of the World",
},
{
"author_id": None,
"id": 3,
"num_species": None,
"significant_continents": '["Europe", "North America"]',
"species": None,
"title": "Badgers of the World",
},
] == list(table.rows)
def test_insert_all_with_extra_columns_in_later_chunks(fresh_db):
chunk = [
{"record": "Record 1"},
{"record": "Record 2"},
{"record": "Record 3"},
{"record": "Record 4", "extra": 1},
]
fresh_db["t"].insert_all(chunk, batch_size=2, alter=True)
assert list(fresh_db["t"].rows) == [
{"record": "Record 1", "extra": None},
{"record": "Record 2", "extra": None},
{"record": "Record 3", "extra": None},
{"record": "Record 4", "extra": 1},
]
def test_bulk_insert_more_than_999_values(fresh_db):
"Inserting 100 items with 11 columns should work"
fresh_db["big"].insert_all(
(
{
"id": i + 1,
"c2": 2,
"c3": 3,
"c4": 4,
"c5": 5,
"c6": 6,
"c7": 7,
"c8": 8,
"c9": 9,
"c10": 10,
"c11": 11,
}
for i in range(100)
),
pk="id",
)
assert fresh_db["big"].count == 100
@pytest.mark.parametrize(
"num_columns,should_error", ((900, False), (999, False), (1000, True))
)
def test_error_if_more_than_999_columns(fresh_db, num_columns, should_error):
record = dict([("c{}".format(i), i) for i in range(num_columns)])
if should_error:
with pytest.raises(AssertionError):
fresh_db["big"].insert(record)
else:
fresh_db["big"].insert(record)
def test_columns_not_in_first_record_should_not_cause_batch_to_be_too_large(fresh_db):
# https://github.com/simonw/sqlite-utils/issues/145
# sqlite on homebrew and Debian/Ubuntu etc. is typically compiled with
# SQLITE_MAX_VARIABLE_NUMBER set to 250,000, so we need to exceed this value to
# trigger the error on these systems.
THRESHOLD = 250000
batch_size = 999
extra_columns = 1 + (THRESHOLD - 1) // (batch_size - 1)
records = [
{"c0": "first record"}, # one column in first record -> batch size = 999
# fill out the batch with 99 records with enough columns to exceed THRESHOLD
*[
dict([("c{}".format(i), j) for i in range(extra_columns)])
for j in range(batch_size - 1)
],
]
try:
fresh_db["too_many_columns"].insert_all(
records, alter=True, batch_size=batch_size
)
except sqlite3.OperationalError:
raise
@pytest.mark.parametrize(
"columns,index_name,expected_index",
(
(
["is good dog"],
None,
Index(
seq=0,
name="idx_dogs_is good dog",
unique=0,
origin="c",
partial=0,
columns=["is good dog"],
),
),
(
["is good dog", "age"],
None,
Index(
seq=0,
name="idx_dogs_is good dog_age",
unique=0,
origin="c",
partial=0,
columns=["is good dog", "age"],
),
),
(
["age"],
"age_index",
Index(
seq=0,
name="age_index",
unique=0,
origin="c",
partial=0,
columns=["age"],
),
),
),
)
def test_create_index(fresh_db, columns, index_name, expected_index):
dogs = fresh_db["dogs"]
dogs.insert({"name": "Cleo", "twitter": "cleopaws", "age": 3, "is good dog": True})
assert [] == dogs.indexes
dogs.create_index(columns, index_name)
assert expected_index == dogs.indexes[0]
def test_create_index_unique(fresh_db):
dogs = fresh_db["dogs"]
dogs.insert({"name": "Cleo", "twitter": "cleopaws", "age": 3, "is_good_dog": True})
assert [] == dogs.indexes
dogs.create_index(["name"], unique=True)
assert (
Index(
seq=0,
name="idx_dogs_name",
unique=1,
origin="c",
partial=0,
columns=["name"],
)
== dogs.indexes[0]
)
def test_create_index_if_not_exists(fresh_db):
dogs = fresh_db["dogs"]
dogs.insert({"name": "Cleo", "twitter": "cleopaws", "age": 3, "is_good_dog": True})
assert [] == dogs.indexes
dogs.create_index(["name"])
assert len(dogs.indexes) == 1
with pytest.raises(Exception, match="index idx_dogs_name already exists"):
dogs.create_index(["name"])
dogs.create_index(["name"], if_not_exists=True)
def test_create_index_desc(fresh_db):
dogs = fresh_db["dogs"]
dogs.insert({"name": "Cleo", "twitter": "cleopaws", "age": 3, "is good dog": True})
assert [] == dogs.indexes
dogs.create_index([DescIndex("age"), "name"])
sql = fresh_db.execute(
"select sql from sqlite_master where name='idx_dogs_age_name'"
).fetchone()[0]
assert sql == (
"CREATE INDEX [idx_dogs_age_name]\n" " ON [dogs] ([age] desc, [name])"
)
def test_create_index_find_unique_name(fresh_db):
table = fresh_db["t"]
table.insert({"id": 1})
table.create_index(["id"])
# Without find_unique_name should error
with pytest.raises(OperationalError, match="index idx_t_id already exists"):
table.create_index(["id"])
# With find_unique_name=True it should work
table.create_index(["id"], find_unique_name=True)
table.create_index(["id"], find_unique_name=True)
# Should have three now
index_names = {idx.name for idx in table.indexes}
assert index_names == {"idx_t_id", "idx_t_id_2", "idx_t_id_3"}
def test_create_index_analyze(fresh_db):
dogs = fresh_db["dogs"]
assert "sqlite_stat1" not in fresh_db.table_names()
dogs.insert({"name": "Cleo", "twitter": "cleopaws"})
dogs.create_index(["name"], analyze=True)
assert "sqlite_stat1" in fresh_db.table_names()
assert list(fresh_db["sqlite_stat1"].rows) == [
{"tbl": "dogs", "idx": "idx_dogs_name", "stat": "1 1"}
]
@pytest.mark.parametrize(
"data_structure",
(
["list with one item"],
["list with", "two items"],
{"dictionary": "simple"},
{"dictionary": {"nested": "complex"}},
collections.OrderedDict(
[
("key1", {"nested": ["cømplex"]}),
("key2", "foo"),
]
),
[{"list": "of"}, {"two": "dicts"}],
),
)
def test_insert_dictionaries_and_lists_as_json(fresh_db, data_structure):
fresh_db["test"].insert({"id": 1, "data": data_structure}, pk="id")
row = fresh_db.execute("select id, data from test").fetchone()
assert row[0] == 1
assert data_structure == json.loads(row[1])
def test_insert_list_nested_unicode(fresh_db):
fresh_db["test"].insert(
{"id": 1, "data": {"key1": {"nested": ["cømplex"]}}}, pk="id"
)
row = fresh_db.execute("select id, data from test").fetchone()
assert row[1] == '{"key1": {"nested": ["cømplex"]}}'
def test_insert_uuid(fresh_db):
uuid4 = uuid.uuid4()
fresh_db["test"].insert({"uuid": uuid4})
row = list(fresh_db["test"].rows)[0]
assert {"uuid"} == row.keys()
assert isinstance(row["uuid"], str)
assert row["uuid"] == str(uuid4)
def test_insert_memoryview(fresh_db):
fresh_db["test"].insert({"data": memoryview(b"hello")})
row = list(fresh_db["test"].rows)[0]
assert {"data"} == row.keys()
assert isinstance(row["data"], bytes)
assert row["data"] == b"hello"
def test_insert_thousands_using_generator(fresh_db):
fresh_db["test"].insert_all(
{"i": i, "word": "word_{}".format(i)} for i in range(10000)
)
assert [{"name": "i", "type": "INTEGER"}, {"name": "word", "type": "TEXT"}] == [
{"name": col.name, "type": col.type} for col in fresh_db["test"].columns
]
assert fresh_db["test"].count == 10000
def test_insert_thousands_raises_exception_with_extra_columns_after_first_100(fresh_db):
# https://github.com/simonw/sqlite-utils/issues/139
with pytest.raises(Exception, match="table test has no column named extra"):
fresh_db["test"].insert_all(
[{"i": i, "word": "word_{}".format(i)} for i in range(100)]
+ [{"i": 101, "extra": "This extra column should cause an exception"}],
)
def test_insert_thousands_adds_extra_columns_after_first_100_with_alter(fresh_db):
# https://github.com/simonw/sqlite-utils/issues/139
fresh_db["test"].insert_all(
[{"i": i, "word": "word_{}".format(i)} for i in range(100)]
+ [{"i": 101, "extra": "Should trigger ALTER"}],
alter=True,
)
rows = list(fresh_db.query("select * from test where i = 101"))
assert rows == [{"i": 101, "word": None, "extra": "Should trigger ALTER"}]
def test_insert_ignore(fresh_db):
fresh_db["test"].insert({"id": 1, "bar": 2}, pk="id")
# Should raise an error if we try this again
with pytest.raises(Exception, match="UNIQUE constraint failed"):
fresh_db["test"].insert({"id": 1, "bar": 2}, pk="id")
# Using ignore=True should cause our insert to be silently ignored
fresh_db["test"].insert({"id": 1, "bar": 3}, pk="id", ignore=True)
# Only one row, and it should be bar=2, not bar=3
rows = list(fresh_db.query("select * from test"))
assert rows == [{"id": 1, "bar": 2}]
def test_insert_hash_id(fresh_db):
dogs = fresh_db["dogs"]
id = dogs.insert({"name": "Cleo", "twitter": "cleopaws"}, hash_id="id").last_pk
assert "f501265970505d9825d8d9f590bfab3519fb20b1" == id
assert dogs.count == 1
# Insert replacing a second time should not create a new row
id2 = dogs.insert(
{"name": "Cleo", "twitter": "cleopaws"}, hash_id="id", replace=True
).last_pk
assert "f501265970505d9825d8d9f590bfab3519fb20b1" == id2
assert dogs.count == 1
@pytest.mark.parametrize("use_table_factory", [True, False])
def test_insert_hash_id_columns(fresh_db, use_table_factory):
if use_table_factory:
dogs = fresh_db.table("dogs", hash_id_columns=("name", "twitter"))
insert_kwargs = {}
else:
dogs = fresh_db["dogs"]
insert_kwargs = dict(hash_id_columns=("name", "twitter"))
id = dogs.insert(
{"name": "Cleo", "twitter": "cleopaws", "age": 5},
**insert_kwargs,
).last_pk
expected_hash = hash_record({"name": "Cleo", "twitter": "cleopaws"})
assert id == expected_hash
assert dogs.count == 1
# Insert replacing a second time should not create a new row
id2 = dogs.insert(
{"name": "Cleo", "twitter": "cleopaws", "age": 6},
**insert_kwargs,
replace=True,
).last_pk
assert id2 == expected_hash
assert dogs.count == 1
def test_vacuum(fresh_db):
fresh_db["data"].insert({"foo": "foo", "bar": "bar"})
fresh_db.vacuum()
def test_works_with_pathlib_path(tmpdir):
path = pathlib.Path(tmpdir / "test.db")
db = Database(path)
db["demo"].insert_all([{"foo": 1}])
assert db["demo"].count == 1
@pytest.mark.skipif(pd is None, reason="pandas and numpy are not installed")
def test_create_table_numpy(fresh_db):
df = pd.DataFrame({"col 1": range(3), "col 2": range(3)})
fresh_db["pandas"].insert_all(df.to_dict(orient="records"))
assert [
{"col 1": 0, "col 2": 0},
{"col 1": 1, "col 2": 1},
{"col 1": 2, "col 2": 2},
] == list(fresh_db["pandas"].rows)
# Now try all the different types
df = pd.DataFrame(
{
"np.int8": [-8],
"np.int16": [-16],
"np.int32": [-32],
"np.int64": [-64],
"np.uint8": [8],
"np.uint16": [16],
"np.uint32": [32],
"np.uint64": [64],
"np.float16": [16.5],
"np.float32": [32.5],
"np.float64": [64.5],
}
)
df = df.astype(
{
"np.int8": "int8",
"np.int16": "int16",
"np.int32": "int32",
"np.int64": "int64",
"np.uint8": "uint8",
"np.uint16": "uint16",
"np.uint32": "uint32",
"np.uint64": "uint64",
"np.float16": "float16",
"np.float32": "float32",
"np.float64": "float64",
}
)
assert [
"int8",
"int16",
"int32",
"int64",
"uint8",
"uint16",
"uint32",
"uint64",
"float16",
"float32",
"float64",
] == [str(t) for t in df.dtypes]
fresh_db["types"].insert_all(df.to_dict(orient="records"))
assert [
{
"np.float16": 16.5,
"np.float32": 32.5,
"np.float64": 64.5,
"np.int16": -16,
"np.int32": -32,
"np.int64": -64,
"np.int8": -8,
"np.uint16": 16,
"np.uint32": 32,
"np.uint64": 64,
"np.uint8": 8,
}
] == list(fresh_db["types"].rows)
def test_cannot_provide_both_filename_and_memory():
with pytest.raises(
AssertionError, match="Either specify a filename_or_conn or pass memory=True"
):
Database("/tmp/foo.db", memory=True)
def test_creates_id_column(fresh_db):
last_pk = fresh_db.table("cats", pk="id").insert({"name": "barry"}).last_pk
assert [{"name": "barry", "id": last_pk}] == list(fresh_db["cats"].rows)
def test_drop(fresh_db):
fresh_db["t"].insert({"foo": 1})
assert ["t"] == fresh_db.table_names()
assert None is fresh_db["t"].drop()
assert [] == fresh_db.table_names()
def test_drop_view(fresh_db):
fresh_db.create_view("foo_view", "select 1")
assert ["foo_view"] == fresh_db.view_names()
assert None is fresh_db["foo_view"].drop()
assert [] == fresh_db.view_names()
def test_drop_ignore(fresh_db):
with pytest.raises(sqlite3.OperationalError):
fresh_db["does_not_exist"].drop()
fresh_db["does_not_exist"].drop(ignore=True)
# Testing view is harder, we need to create it in order
# to get a View object, then drop it twice
fresh_db.create_view("foo_view", "select 1")
view = fresh_db["foo_view"]
assert isinstance(view, View)
view.drop()
with pytest.raises(sqlite3.OperationalError):
view.drop()
view.drop(ignore=True)
def test_insert_all_empty_list(fresh_db):
fresh_db["t"].insert({"foo": 1})
assert fresh_db["t"].count == 1
fresh_db["t"].insert_all([])
assert fresh_db["t"].count == 1
fresh_db["t"].insert_all([], replace=True)
assert fresh_db["t"].count == 1
def test_insert_all_single_column(fresh_db):
table = fresh_db["table"]
table.insert_all([{"name": "Cleo"}], pk="name")
assert [{"name": "Cleo"}] == list(table.rows)
assert table.pks == ["name"]
@pytest.mark.parametrize("method_name", ("insert_all", "upsert_all"))
def test_insert_all_analyze(fresh_db, method_name):
table = fresh_db["table"]
table.insert_all([{"id": 1, "name": "Cleo"}], pk="id")
assert "sqlite_stat1" not in fresh_db.table_names()
table.create_index(["name"], analyze=True)
assert list(fresh_db["sqlite_stat1"].rows) == [
{"tbl": "table", "idx": "idx_table_name", "stat": "1 1"}
]
method = getattr(table, method_name)
method([{"id": 2, "name": "Suna"}], pk="id", analyze=True)
assert "sqlite_stat1" in fresh_db.table_names()
assert list(fresh_db["sqlite_stat1"].rows) == [
{"tbl": "table", "idx": "idx_table_name", "stat": "2 1"}
]
def test_create_with_a_null_column(fresh_db):
record = {"name": "Name", "description": None}
fresh_db["t"].insert(record)
assert [record] == list(fresh_db["t"].rows)
def test_create_with_nested_bytes(fresh_db):
record = {"id": 1, "data": {"foo": b"bytes"}}
fresh_db["t"].insert(record)
assert [{"id": 1, "data": '{"foo": "b\'bytes\'"}'}] == list(fresh_db["t"].rows)
@pytest.mark.parametrize(
"input,expected", [("hello", "'hello'"), ("hello'there'", "'hello''there'''")]
)
def test_quote(fresh_db, input, expected):
assert fresh_db.quote(input) == expected
@pytest.mark.parametrize(
"columns,expected_sql_middle",
(
(
{"id": int},
"[id] INTEGER",
),
(
{"col": dict},
"[col] TEXT",
),
(
{"col": tuple},
"[col] TEXT",
),
(
{"col": list},
"[col] TEXT",
),
),
)
def test_create_table_sql(fresh_db, columns, expected_sql_middle):
sql = fresh_db.create_table_sql("t", columns)
middle = sql.split("(")[1].split(")")[0].strip()
assert middle == expected_sql_middle
def test_create(fresh_db):
fresh_db["t"].create(
{
"id": int,
"text": str,
"float": float,
"integer": int,
"bytes": bytes,
},
pk="id",
column_order=("id", "float"),
not_null=("float", "integer"),
defaults={"integer": 0},
)
assert fresh_db["t"].schema == (
"CREATE TABLE [t] (\n"
" [id] INTEGER PRIMARY KEY,\n"
" [float] FLOAT NOT NULL,\n"
" [text] TEXT,\n"
" [integer] INTEGER NOT NULL DEFAULT 0,\n"
" [bytes] BLOB\n"
")"
)
def test_create_if_not_exists(fresh_db):
fresh_db["t"].create({"id": int})
# This should error
with pytest.raises(sqlite3.OperationalError):
fresh_db["t"].create({"id": int})
# This should not
fresh_db["t"].create({"id": int}, if_not_exists=True)
def test_create_if_no_columns(fresh_db):
with pytest.raises(AssertionError) as error:
fresh_db["t"].create({})
assert error.value.args[0] == "Tables must have at least one column"
def test_create_ignore(fresh_db):
fresh_db["t"].create({"id": int})
# This should error
with pytest.raises(sqlite3.OperationalError):
fresh_db["t"].create({"id": int})
# This should not
fresh_db["t"].create({"id": int}, ignore=True)
def test_create_replace(fresh_db):
fresh_db["t"].create({"id": int})
# This should error
with pytest.raises(sqlite3.OperationalError):
fresh_db["t"].create({"id": int})
# This should not
fresh_db["t"].create({"name": str}, replace=True)
assert fresh_db["t"].schema == ("CREATE TABLE [t] (\n" " [name] TEXT\n" ")")
@pytest.mark.parametrize(
"cols,kwargs,expected_schema,should_transform",
(
# Change nothing
(
{"id": int, "name": str},
{"pk": "id"},
"CREATE TABLE [demo] (\n [id] INTEGER PRIMARY KEY,\n [name] TEXT\n)",
False,
),
# Drop name column, remove primary key
({"id": int}, {}, 'CREATE TABLE "demo" (\n [id] INTEGER\n)', True),
# Add a new column
(
{"id": int, "name": str, "age": int},
{"pk": "id"},
'CREATE TABLE "demo" (\n [id] INTEGER PRIMARY KEY,\n [name] TEXT,\n [age] INTEGER\n)',
True,
),
# Change a column type
(
{"id": int, "name": bytes},
{"pk": "id"},
'CREATE TABLE "demo" (\n [id] INTEGER PRIMARY KEY,\n [name] BLOB\n)',
True,
),
# Change the primary key
(
{"id": int, "name": str},
{"pk": "name"},
'CREATE TABLE "demo" (\n [id] INTEGER,\n [name] TEXT PRIMARY KEY\n)',
True,
),
# Change in column order
(
{"id": int, "name": str},
{"pk": "id", "column_order": ["name"]},
'CREATE TABLE "demo" (\n [name] TEXT,\n [id] INTEGER PRIMARY KEY\n)',
True,
),
# Same column order is ignored
(
{"id": int, "name": str},
{"pk": "id", "column_order": ["id", "name"]},
"CREATE TABLE [demo] (\n [id] INTEGER PRIMARY KEY,\n [name] TEXT\n)",
False,
),
# Change not null
(
{"id": int, "name": str},
{"pk": "id", "not_null": {"name"}},
'CREATE TABLE "demo" (\n [id] INTEGER PRIMARY KEY,\n [name] TEXT NOT NULL\n)',
True,
),
# Change default values
(
{"id": int, "name": str},
{"pk": "id", "defaults": {"id": 0, "name": "Bob"}},
"CREATE TABLE \"demo\" (\n [id] INTEGER PRIMARY KEY DEFAULT 0,\n [name] TEXT DEFAULT 'Bob'\n)",
True,
),
),
)
def test_create_transform(fresh_db, cols, kwargs, expected_schema, should_transform):
fresh_db.create_table("demo", {"id": int, "name": str}, pk="id")
fresh_db["demo"].insert({"id": 1, "name": "Cleo"})
traces = []
with fresh_db.tracer(lambda sql, parameters: traces.append((sql, parameters))):
fresh_db["demo"].create(cols, **kwargs, transform=True)
at_least_one_create_table = any(sql.startswith("CREATE TABLE") for sql, _ in traces)
assert should_transform == at_least_one_create_table
new_schema = fresh_db["demo"].schema
assert new_schema == expected_schema, repr(new_schema)
assert fresh_db["demo"].count == 1
def test_rename_table(fresh_db):
fresh_db["t"].insert({"foo": "bar"})
assert ["t"] == fresh_db.table_names()
fresh_db.rename_table("t", "renamed")
assert ["renamed"] == fresh_db.table_names()
assert [{"foo": "bar"}] == list(fresh_db["renamed"].rows)
# Should error if table does not exist:
with pytest.raises(sqlite3.OperationalError):
fresh_db.rename_table("does_not_exist", "renamed")
@pytest.mark.parametrize("strict", (False, True))
def test_database_strict(strict):
db = Database(memory=True, strict=strict)
table = db.table("t", columns={"id": int})
table.insert({"id": 1})
assert table.strict == strict or not db.supports_strict
@pytest.mark.parametrize("strict", (False, True))
def test_database_strict_override(strict):
db = Database(memory=True, strict=strict)
table = db.table("t", columns={"id": int}, strict=not strict)
table.insert({"id": 1})
assert table.strict != strict or not db.supports_strict
@pytest.mark.parametrize(
"method_name", ("insert", "upsert", "insert_all", "upsert_all")
)
@pytest.mark.parametrize("strict", (False, True))
def test_insert_upsert_strict(fresh_db, method_name, strict):
table = fresh_db["t"]
method = getattr(table, method_name)
record = {"id": 1}
if method_name.endswith("_all"):
record = [record]
method(record, pk="id", strict=strict)
assert table.strict == strict or not fresh_db.supports_strict
@pytest.mark.parametrize("strict", (False, True))
def test_create_table_strict(fresh_db, strict):
table = fresh_db.create_table("t", {"id": int}, strict=strict)
assert table.strict == strict or not fresh_db.supports_strict
@pytest.mark.parametrize("strict", (False, True))
def test_create_strict(fresh_db, strict):
table = fresh_db["t"]
table.create({"id": int}, strict=strict)
assert table.strict == strict or not fresh_db.supports_strict