home

Menu
  • ripgrep search

sqlite-utils/tests/test_extract.py

from sqlite_utils.db import InvalidColumns
import itertools
import pytest
 
 
@pytest.mark.parametrize("table", [None, "Species"])
@pytest.mark.parametrize("fk_column", [None, "species"])
def test_extract_single_column(fresh_db, table, fk_column):
    expected_table = table or "species"
    expected_fk = fk_column or "{}_id".format(expected_table)
    iter_species = itertools.cycle(["Palm", "Spruce", "Mangrove", "Oak"])
    fresh_db["tree"].insert_all(
        (
            {
                "id": i,
                "name": "Tree {}".format(i),
                "species": next(iter_species),
                "end": 1,
            }
            for i in range(1, 1001)
        ),
        pk="id",
    )
    fresh_db["tree"].extract("species", table=table, fk_column=fk_column)
    assert fresh_db["tree"].schema == (
        'CREATE TABLE "tree" (\n'
        "   [id] INTEGER PRIMARY KEY,\n"
        "   [name] TEXT,\n"
        "   [{}] INTEGER REFERENCES [{}]([id]),\n".format(expected_fk, expected_table)
        + "   [end] INTEGER\n"
        + ")"
    )
    assert fresh_db[expected_table].schema == (
        "CREATE TABLE [{}] (\n".format(expected_table)
        + "   [id] INTEGER PRIMARY KEY,\n"
        "   [species] TEXT\n"
        ")"
    )
    assert list(fresh_db[expected_table].rows) == [
        {"id": 1, "species": "Palm"},
        {"id": 2, "species": "Spruce"},
        {"id": 3, "species": "Mangrove"},
        {"id": 4, "species": "Oak"},
    ]
    assert list(itertools.islice(fresh_db["tree"].rows, 0, 4)) == [
        {"id": 1, "name": "Tree 1", expected_fk: 1, "end": 1},
        {"id": 2, "name": "Tree 2", expected_fk: 2, "end": 1},
        {"id": 3, "name": "Tree 3", expected_fk: 3, "end": 1},
        {"id": 4, "name": "Tree 4", expected_fk: 4, "end": 1},
    ]
 
 
def test_extract_multiple_columns_with_rename(fresh_db):
    iter_common = itertools.cycle(["Palm", "Spruce", "Mangrove", "Oak"])
    iter_latin = itertools.cycle(["Arecaceae", "Picea", "Rhizophora", "Quercus"])
    fresh_db["tree"].insert_all(
        (
            {
                "id": i,
                "name": "Tree {}".format(i),
                "common_name": next(iter_common),
                "latin_name": next(iter_latin),
            }
            for i in range(1, 1001)
        ),
        pk="id",
    )
 
    fresh_db["tree"].extract(
        ["common_name", "latin_name"], rename={"common_name": "name"}
    )
    assert fresh_db["tree"].schema == (
        'CREATE TABLE "tree" (\n'
        "   [id] INTEGER PRIMARY KEY,\n"
        "   [name] TEXT,\n"
        "   [common_name_latin_name_id] INTEGER REFERENCES [common_name_latin_name]([id])\n"
        ")"
    )
    assert fresh_db["common_name_latin_name"].schema == (
        "CREATE TABLE [common_name_latin_name] (\n"
        "   [id] INTEGER PRIMARY KEY,\n"
        "   [name] TEXT,\n"
        "   [latin_name] TEXT\n"
        ")"
    )
    assert list(fresh_db["common_name_latin_name"].rows) == [
        {"name": "Palm", "id": 1, "latin_name": "Arecaceae"},
        {"name": "Spruce", "id": 2, "latin_name": "Picea"},
        {"name": "Mangrove", "id": 3, "latin_name": "Rhizophora"},
        {"name": "Oak", "id": 4, "latin_name": "Quercus"},
    ]
    assert list(itertools.islice(fresh_db["tree"].rows, 0, 4)) == [
        {"id": 1, "name": "Tree 1", "common_name_latin_name_id": 1},
        {"id": 2, "name": "Tree 2", "common_name_latin_name_id": 2},
        {"id": 3, "name": "Tree 3", "common_name_latin_name_id": 3},
        {"id": 4, "name": "Tree 4", "common_name_latin_name_id": 4},
    ]
 
 
def test_extract_invalid_columns(fresh_db):
    fresh_db["tree"].insert(
        {
            "id": 1,
            "name": "Tree 1",
            "common_name": "Palm",
            "latin_name": "Arecaceae",
        },
        pk="id",
    )
    with pytest.raises(InvalidColumns):
        fresh_db["tree"].extract(["bad_column"])
 
 
def test_extract_rowid_table(fresh_db):
    fresh_db["tree"].insert(
        {
            "name": "Tree 1",
            "common_name": "Palm",
            "latin_name": "Arecaceae",
        }
    )
    fresh_db["tree"].extract(["common_name", "latin_name"])
    assert fresh_db["tree"].schema == (
        'CREATE TABLE "tree" (\n'
        "   [name] TEXT,\n"
        "   [common_name_latin_name_id] INTEGER REFERENCES [common_name_latin_name]([id])\n"
        ")"
    )
    assert (
        fresh_db.execute(
            """
        select
            tree.name,
            common_name_latin_name.common_name,
            common_name_latin_name.latin_name
        from tree
            join common_name_latin_name
            on tree.common_name_latin_name_id = common_name_latin_name.id
    """
        ).fetchall()
        == [("Tree 1", "Palm", "Arecaceae")]
    )
 
 
def test_reuse_lookup_table(fresh_db):
    fresh_db["species"].insert({"id": 1, "name": "Wolf"}, pk="id")
    fresh_db["sightings"].insert({"id": 10, "species": "Wolf"}, pk="id")
    fresh_db["individuals"].insert(
        {"id": 10, "name": "Terriana", "species": "Fox"}, pk="id"
    )
    fresh_db["sightings"].extract("species", rename={"species": "name"})
    fresh_db["individuals"].extract("species", rename={"species": "name"})
    assert fresh_db["sightings"].schema == (
        'CREATE TABLE "sightings" (\n'
        "   [id] INTEGER PRIMARY KEY,\n"
        "   [species_id] INTEGER REFERENCES [species]([id])\n"
        ")"
    )
    assert fresh_db["individuals"].schema == (
        'CREATE TABLE "individuals" (\n'
        "   [id] INTEGER PRIMARY KEY,\n"
        "   [name] TEXT,\n"
        "   [species_id] INTEGER REFERENCES [species]([id])\n"
        ")"
    )
    assert list(fresh_db["species"].rows) == [
        {"id": 1, "name": "Wolf"},
        {"id": 2, "name": "Fox"},
    ]
 
 
def test_extract_error_on_incompatible_existing_lookup_table(fresh_db):
    fresh_db["species"].insert({"id": 1})
    fresh_db["tree"].insert({"name": "Tree 1", "common_name": "Palm"})
    with pytest.raises(InvalidColumns):
        fresh_db["tree"].extract("common_name", table="species")
 
    # Try again with incompatible existing column type
    fresh_db["species2"].insert({"id": 1, "common_name": 3.5})
    with pytest.raises(InvalidColumns):
        fresh_db["tree"].extract("common_name", table="species2")
 
 
def test_extract_works_with_null_values(fresh_db):
    fresh_db["listens"].insert_all(
        [
            {"id": 1, "track_title": "foo", "album_title": "bar"},
            {"id": 2, "track_title": "baz", "album_title": None},
        ],
        pk="id",
    )
    fresh_db["listens"].extract(
        columns=["album_title"], table="albums", fk_column="album_id"
    )
    assert list(fresh_db["listens"].rows) == [
        {"id": 1, "track_title": "foo", "album_id": 1},
        {"id": 2, "track_title": "baz", "album_id": 2},
    ]
    assert list(fresh_db["albums"].rows) == [
        {"id": 1, "album_title": "bar"},
        {"id": 2, "album_title": None},
    ]
 
Powered by Datasette