home

Menu
  • ripgrep search

datasette-edit-schema/tests/test_edit_schema.py

from datasette.app import Datasette
from datasette_edit_schema.utils import (
    potential_foreign_keys,
    get_primary_keys,
    examples_for_columns,
    potential_primary_keys,
)
import sqlite_utils
import pytest
import re
from bs4 import BeautifulSoup
from .conftest import Rule
 
whitespace = re.compile(r"\s+")
 
 
@pytest.mark.asyncio
async def test_csrf_required(db_path):
    ds = Datasette([db_path])
    response = await ds.client.post(
        "/edit-schema/data/creatures",
        data={"drop_table": "1"},
        cookies={"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")},
    )
    assert response.status_code == 403
 
 
@pytest.mark.parametrize(
    "actor_id,should_allow",
    (
        (None, False),
        ("user_with_edit_schema", True),
        ("user_with_alter_table", True),
        ("user_with_create_table", False),
        ("user_with_no_perms", False),
    ),
)
@pytest.mark.asyncio
async def test_table_actions(permission_plugin, ds, actor_id, should_allow):
    ds._rules_allow = [
        Rule(
            actor_id="user_with_edit_schema",
            action="edit-schema",
            database="data",
            resource=None,
        ),
        Rule(
            actor_id="user_with_alter_table",
            action="edit-schema-alter-table",
            database="data",
            resource="creatures",
        ),
        Rule(
            actor_id="user_with_create_table",
            action="edit-schema-create-table",
            database="data",
            resource=None,
        ),
    ]
    cookies = None
    if actor_id:
        cookies = {"ds_actor": ds.sign({"a": {"id": actor_id}}, "actor")}
    response = await ds.client.get("/data/creatures", cookies=cookies)
    assert response.status_code == 200
    fragment = '<li><a href="/-/edit-schema/data/creatures">Edit table schema</a></li>'
    if should_allow:
        # Should have table action
        assert fragment in response.text
    else:
        assert fragment not in response.text
 
 
@pytest.mark.asyncio
async def test_post_without_operation_raises_error(db_path):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    # Get a csrftoken
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/creatures", cookies=cookies)
    ).cookies["ds_csrftoken"]
    cookies["ds_csrftoken"] = csrftoken
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data={"csrftoken": csrftoken},
        cookies=cookies,
    )
    assert response.status_code == 400
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "actor_id,should_allow",
    (
        (None, False),
        ("user_with_edit_schema", True),
        ("user_with_just_create_table", False),
        ("user_with_just_alter_table", False),
        ("user_with_alter_table_and_drop_table", True),
    ),
)
async def test_drop_table(permission_plugin, db_path, actor_id, should_allow):
    ds = Datasette([db_path], pdb=True)
    ds._rules_allow = [
        Rule(
            actor_id="user_with_edit_schema",
            action="edit-schema",
            database="data",
            resource=None,
        ),
        Rule(
            actor_id="user_with_alter_table_and_drop_table",
            action="edit-schema-drop-table",
            database="data",
            resource="creatures",
        ),
        Rule(
            actor_id="user_with_alter_table_and_drop_table",
            action="edit-schema-alter-table",
            database="data",
            resource="creatures",
        ),
        Rule(
            actor_id="user_with_just_create_table",
            action="edit-schema-create-table",
            database="data",
            resource=None,
        ),
        Rule(
            actor_id="user_with_just_alter_table",
            action="edit-schema-alter-table",
            database="data",
            resource="creatures",
        ),
    ]
    db = sqlite_utils.Database(db_path)
    assert "creatures" in db.table_names()
    cookies = {}
    if actor_id:
        cookies = {"ds_actor": ds.sign({"a": {"id": actor_id}}, "actor")}
    # Get a csrftoken
    form_response = await ds.client.get(
        "/-/edit-schema/data/creatures", cookies=cookies
    )
    if actor_id in (None, "user_with_just_create_table"):
        assert form_response.status_code == 403
        return
    assert form_response.status_code == 200
    csrftoken = form_response.cookies["ds_csrftoken"]
    if should_allow:
        assert 'name="drop_table"' in form_response.text
    else:
        assert 'name="drop_table"' not in form_response.text
    # Try submitting form anyway
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data={"drop_table": "1", "csrftoken": csrftoken},
        cookies=dict(cookies, ds_csrftoken=csrftoken),
    )
    if should_allow:
        assert response.status_code == 302
        assert "creatures" not in db.table_names()
    else:
        assert response.status_code == 403
        assert "creatures" in db.table_names()
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "col_type,expected_type",
    [("text", str), ("integer", int), ("real", float), ("blob", bytes)],
)
async def test_add_column(db_path, col_type, expected_type):
    ds = Datasette([db_path])
    db = sqlite_utils.Database(db_path)
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    table = db["creatures"]
    assert {"name": str, "description": str} == table.columns_dict
    # Get a csrftoken
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/creatures", cookies=cookies)
    ).cookies["ds_csrftoken"]
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data={
            "add_column": "1",
            "csrftoken": csrftoken,
            "name": "new_col",
            "type": col_type,
        },
        cookies=dict(cookies, ds_csrftoken=csrftoken),
    )
    assert response.status_code == 302
    if "ds_messages" in response.cookies:
        messages = ds.unsign(response.cookies["ds_messages"], "messages")
        # None of these should be errors
        assert all(m[1] == Datasette.INFO for m in messages), "Got an error: {}".format(
            messages
        )
    assert {
        "name": str,
        "description": str,
        "new_col": expected_type,
    } == table.columns_dict
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "name,type,expected_error",
    [
        ("name", "text", "A column called 'name' already exists"),
        ("", "text", "Column name is required"),
        ("]]]", "integer", 'unrecognized token: "]"'),
        ("name", "blop", "Invalid type: blop"),
    ],
)
async def test_add_column_errors(db_path, name, type, expected_error):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/creatures", cookies=cookies)
    ).cookies["ds_csrftoken"]
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data={
            "add_column": "1",
            "name": name,
            "type": type,
            "csrftoken": csrftoken,
        },
        cookies=dict(cookies, ds_csrftoken=csrftoken),
    )
    assert response.status_code == 302
    assert response.headers["location"] == "/-/edit-schema/data/creatures"
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert len(messages) == 1
    assert messages[0][0] == expected_error
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "post_data,action,expected_columns_dict,expected_order,expected_message",
    [
        # Change column type
        (
            {
                "type.name": "REAL",
            },
            "update_columns",
            {"name": float, "description": str},
            ["name", "description"],
            "Changes to table have been saved",
        ),
        (
            {
                "type.name": "INTEGER",
            },
            "update_columns",
            {"name": int, "description": str},
            ["name", "description"],
            "Changes to table have been saved",
        ),
        # Changing order
        (
            {
                "sort.description": "0",
                "sort.name": "2",
            },
            "update_columns",
            {"name": str, "description": str},
            ["description", "name"],
            "Changes to table have been saved",
        ),
        # Change names
        (
            {
                "name.name": "name2",
                "name.description": "description2",
            },
            "update_columns",
            {"name2": str, "description2": str},
            ["name2", "description2"],
            "Changes to table have been saved",
        ),
        # Add new columns
        (
            {
                "add_column": "1",
                "name": "new_text",
                "type": "text",
            },
            None,
            {"name": str, "description": str, "new_text": str},
            ["name", "description", "new_text"],
            "Column has been added",
        ),
        (
            {
                "add_column": "1",
                "name": "new_integer",
                "type": "integer",
            },
            None,
            {"name": str, "description": str, "new_integer": int},
            ["name", "description", "new_integer"],
            "Column has been added",
        ),
        (
            {
                "add_column": "1",
                "name": "new_float",
                "type": "real",
            },
            None,
            {"name": str, "description": str, "new_float": float},
            ["name", "description", "new_float"],
            "Column has been added",
        ),
        (
            {
                "add_column": "1",
                "name": "new_blob",
                "type": "blob",
            },
            None,
            {"name": str, "description": str, "new_blob": bytes},
            ["name", "description", "new_blob"],
            "Column has been added",
        ),
        # Drop column
        (
            {
                "delete.description": "1",
            },
            "update_columns",
            {"name": str},
            ["name"],
            "Changes to table have been saved",
        ),
    ],
)
async def test_transform_table(
    db_path, action, post_data, expected_columns_dict, expected_order, expected_message
):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    db = sqlite_utils.Database(db_path)
    table = db["creatures"]
    assert table.columns_dict == {"name": str, "description": str}
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/creatures", cookies=cookies)
    ).cookies["ds_csrftoken"]
    post_data["csrftoken"] = csrftoken
    if action:
        post_data["action"] = action
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data=post_data,
        cookies=dict(cookies, ds_csrftoken=csrftoken),
    )
    assert response.status_code == 302
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert table.columns_dict == expected_columns_dict
    assert [c.name for c in table.columns] == expected_order
    assert len(messages) == 1
    assert messages[0][0] == expected_message
 
 
@pytest.mark.asyncio
async def test_drop_column_from_table_that_is_part_of_a_view(db_path):
    # https://github.com/simonw/datasette-edit-schema/issues/35
    ds = Datasette([db_path], pdb=True)
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    db = sqlite_utils.Database(db_path)
    db.create_view("creatures_view", "select * from creatures")
    table = db["creatures"]
    assert table.columns_dict == {"name": str, "description": str}
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/creatures", cookies=cookies)
    ).cookies["ds_csrftoken"]
    post_data = {
        "delete.description": "1",
        "csrftoken": csrftoken,
        "action": "update_columns",
    }
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data=post_data,
        cookies=dict(cookies, ds_csrftoken=csrftoken),
    )
    assert response.status_code == 302
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert table.columns_dict == {"name": str}
    assert [c.name for c in table.columns] == ["name"]
    assert len(messages) == 1
    assert messages[0][0] == "Changes to table have been saved"
 
 
@pytest.mark.asyncio
async def test_static_assets(db_path):
    ds = Datasette([db_path])
    for path in (
        "/-/static-plugins/datasette-edit-schema/draggable.1.0.0-beta.11.bundle.min.js",
    ):
        response = await ds.client.post(path)
        assert response.status_code == 200
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "path", ["/-/edit-schema", "/-/edit-schema/data", "/-/edit-schema/data/creatures"]
)
async def test_permission_edit_schema(db_path, path):
    # root user has edit-schema which allows access to all
    ds = Datasette([db_path])
    someuser_cookies = {"ds_actor": ds.sign({"a": {"id": "someuser"}}, "actor")}
    root_cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    response = await ds.client.get(path)
    assert response.status_code == 403
    # Should deny with someuser cookie
    response2 = await ds.client.get("" + path, cookies=someuser_cookies)
    assert response2.status_code == 403
    # Should allow with root cookies
    response3 = await ds.client.get("" + path, cookies=root_cookies)
    assert response3.status_code in (200, 302)
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "rules_allow,should_work",
    (
        (
            [
                Rule(
                    actor_id="user",
                    action="edit-schema",
                    database="data",
                    resource=None,
                ),
            ],
            True,
        ),
        (
            [
                Rule(
                    actor_id="user2",
                    action="edit-schema",
                    database="data",
                    resource=None,
                ),
            ],
            False,
        ),
        (
            [
                Rule(
                    actor_id="user",
                    action="edit-schema-create-table",
                    database="data",
                    resource=None,
                ),
            ],
            True,
        ),
        (
            [
                Rule(
                    actor_id="user2",
                    action="edit-schema-create-table",
                    database="data",
                    resource=None,
                ),
            ],
            False,
        ),
    ),
)
async def test_permission_create_table(permission_plugin, ds, rules_allow, should_work):
    ds._rules_allow = rules_allow
    cookies = {"ds_actor": ds.sign({"a": {"id": "user"}}, "actor")}
    csrftoken_r = await ds.client.get("/-/edit-schema/data/-/create", cookies=cookies)
    if not should_work:
        assert csrftoken_r.status_code == 403
        return
    assert csrftoken_r.status_code == 200
    csrftoken = csrftoken_r.cookies["ds_csrftoken"]
    cookies["ds_csrftoken"] = csrftoken
    post_data = {
        "primary_key_name": "id",
        "primary_key_type": "INTEGER",
        "table_name": "foo",
        "csrftoken": csrftoken,
    }
    response = await ds.client.post(
        "/-/edit-schema/data/-/create",
        data=post_data,
        cookies=cookies,
    )
    assert response.status_code == 302
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "rules_allow,should_work",
    (
        (
            [
                Rule(
                    actor_id="user",
                    action="edit-schema",
                    database="data",
                    resource=None,
                ),
            ],
            True,
        ),
        (
            [
                Rule(
                    actor_id="user2",
                    action="edit-schema",
                    database="data",
                    resource=None,
                ),
            ],
            False,
        ),
        (
            [
                Rule(
                    actor_id="user",
                    action="edit-schema-alter-table",
                    database="data",
                    resource="museums",
                ),
            ],
            True,
        ),
        (
            [
                Rule(
                    actor_id="user2",
                    action="edit-schema-alter-table",
                    database="data",
                    resource="museums",
                ),
            ],
            False,
        ),
    ),
)
async def test_permission_alter_table(permission_plugin, ds, rules_allow, should_work):
    ds._rules_allow = rules_allow
    cookies = {"ds_actor": ds.sign({"a": {"id": "user"}}, "actor")}
    csrftoken_r = await ds.client.get("/-/edit-schema/data/museums", cookies=cookies)
    if not should_work:
        assert csrftoken_r.status_code == 403
        return
    assert csrftoken_r.status_code == 200
    csrftoken = csrftoken_r.cookies["ds_csrftoken"]
    cookies["ds_csrftoken"] = csrftoken
    post_data = {
        "action": "update_primary_key",
        "primary_key": "name",
        "csrftoken": csrftoken,
    }
    response = await ds.client.post(
        "/-/edit-schema/data/museums",
        data=post_data,
        cookies=cookies,
    )
    assert response.status_code == 302
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "new_name,should_work,expected_message",
    [
        ("valid", True, "Table renamed to 'valid'"),
        ("]]]", False, 'Error renaming table: unrecognized token: "]"'),
        ("creatures", True, "Table name was the same"),
        ("", False, "New table name is required"),
        ("other_table", False, "A table called 'other_table' already exists"),
    ],
)
async def test_rename_table(db_path, new_name, should_work, expected_message):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/creatures", cookies=cookies)
    ).cookies["ds_csrftoken"]
    response = await ds.client.post(
        "/-/edit-schema/data/creatures",
        data={
            "rename_table": "1",
            "name": new_name,
            "csrftoken": csrftoken,
        },
        cookies=dict(cookies, ds_csrftoken=csrftoken),
    )
    assert response.status_code == 302
    if should_work:
        expected_path = "/-/edit-schema/data/{}".format(new_name)
    else:
        expected_path = "/-/edit-schema/data/creatures"
    assert response.headers["location"] == expected_path
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert len(messages) == 1
    assert messages[0][0] == expected_message
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "path,expected_breadcrumbs",
    (
        ("/-/edit-schema/data", ['<a href="/">home</a>', '<a href="/data">data</a>']),
        (
            "/-/edit-schema/data/creatures",
            [
                '<a href="/">home</a>',
                '<a href="/data">data</a>',
                '<a href="/data/creatures">creatures</a>',
            ],
        ),
    ),
)
async def test_breadcrumbs(db_path, path, expected_breadcrumbs):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    response = await ds.client.get(path, cookies=cookies)
    assert response.status_code == 200
    breadcrumbs = response.text.split('<p class="crumbs">')[1].split("</p>")[0]
    for crumb in expected_breadcrumbs:
        assert crumb in breadcrumbs
 
 
def test_potential_foreign_keys(db):
    potentials = potential_foreign_keys(
        db.conn,
        "museums",
        ["name", "city_id"],
        get_primary_keys(db.conn),
    )
    assert potentials == {"name": [], "city_id": [("cities", "id")]}
 
 
@pytest.mark.asyncio
async def test_edit_form_shows_suggestions(db_path):
    # Test for suggested foreign keys and primary keys
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    response = await ds.client.get("/-/edit-schema/data/museums", cookies=cookies)
    assert response.status_code == 200
    # Should suggest two of the three columns as primary keys
    soup = BeautifulSoup(response.text, "html5lib")
    assert "<h2>Change the primary key</h2>" in response.text
    pk_options = get_options(soup, "primary_key")
    assert pk_options == [
        {"value": "id", "text": "id (current)", "selected": True},
        {"value": "name", "text": "name", "selected": False},
    ]
 
    # Test foreign key suggestions
    selects = soup.find_all("select", attrs={"name": re.compile("^fk.")})
    select_options = [(s["name"], get_options(soup, s["name"])) for s in selects]
    assert select_options == [
        (
            "fk.name",
            [
                {
                    "value": "-- no suggestions --",
                    "text": "-- no suggestions --",
                    "selected": False,
                },
                {"value": "cities.id", "text": "cities.id", "selected": False},
                {
                    "value": "distractions.id",
                    "text": "distractions.id",
                    "selected": False,
                },
            ],
        ),
        (
            "fk.city_id",
            [
                {"value": "-- none --", "text": "-- none --", "selected": False},
                {
                    "value": "cities.id",
                    "text": "cities.id (suggested)",
                    "selected": False,
                },
                {
                    "value": "distractions.id",
                    "text": "distractions.id",
                    "selected": False,
                },
            ],
        ),
    ]
 
 
@pytest.mark.asyncio
async def test_edit_form_for_empty_table(db_path):
    # https://github.com/simonw/datasette-edit-schema/issues/38
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    response = await ds.client.get("/-/edit-schema/data/empty_table", cookies=cookies)
    assert response.status_code == 200
 
    # It shouldn't suggest any foreign keys, since there are no records
    assert " (suggested)" not in response.text
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "table,post_data,expected_fks,expected_pk,expected_message",
    (
        # Foreign key edit
        (
            "museums",
            {"action": "update_foreign_keys", "fk.city_id": "cities.id"},
            [("museums", "city_id", "cities", "id")],
            ["id"],
            "Foreign keys updated to city_id → cities.id",
        ),
        # No changes to foreign keys
        (
            "museums",
            {"action": "update_foreign_keys"},
            [],
            ["id"],
            "No changes to foreign keys",
        ),
        # Remove foreign keys
        (
            "has_foreign_keys",
            {"action": "update_foreign_keys", "fk.distraction_id": ""},
            [],
            ["id"],
            "Foreign keys removed",
        ),
        # Point existing foreign key at something else
        (
            "has_foreign_keys",
            {"action": "update_foreign_keys", "fk.distraction_id": "cities.id"},
            [("has_foreign_keys", "distraction_id", "cities", "id")],
            ["id"],
            "Foreign keys updated to distraction_id → cities.id",
        ),
        # Change primary key in a way that works
        (
            "museums",
            {"action": "update_primary_key", "primary_key": "name"},
            [],
            ["name"],
            "Primary key for 'museums' is now 'name'",
        ),
        # And a way that returns an error
        (
            "museums",
            {"action": "update_primary_key", "primary_key": "city_id"},
            [],
            ["id"],
            "Column 'city_id' is not unique",
        ),
    ),
)
async def test_edit_keys(
    db_path, table, post_data, expected_fks, expected_pk, expected_message
):
    ds = Datasette([db_path])
    # Grab a csrftoken
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    csrftoken_r = await ds.client.get(
        "/-/edit-schema/data/{}".format(table), cookies=cookies
    )
    csrftoken = csrftoken_r.cookies["ds_csrftoken"]
    cookies["ds_csrftoken"] = csrftoken
    post_data["csrftoken"] = csrftoken
    response = await ds.client.post(
        "/-/edit-schema/data/{}".format(table),
        data=post_data,
        cookies=cookies,
    )
    assert response.status_code == 302
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert len(messages) == 1
    assert messages[0][0] == expected_message
    db = sqlite_utils.Database(db_path)
    assert db[table].foreign_keys == expected_fks
    assert db[table].pks == expected_pk
 
 
def get_options(soup, name):
    select = soup.find("select", attrs={"name": name})
    return [
        {
            "value": o.get("value") or o.text,
            "text": o.text,
            "selected": bool(o.get("selected")),
        }
        for o in select.find_all("option")
    ]
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "post_data,expected_message,expected_schema",
    (
        (
            {"primary_key_name": "id", "primary_key_type": "INTEGER"},
            "Table name is required",
            None,
        ),
        (
            {
                "primary_key_name": "id",
                "primary_key_type": "INTEGER",
                "table_name": "museums",
            },
            "Table already exists",
            None,
        ),
        (
            {
                "primary_key_name": "id",
                "primary_key_type": "INTEGER",
                "table_name": "foo",
            },
            "Table has been created",
            {"id": int},
        ),
        (
            {
                "primary_key_name": "my_pk",
                "primary_key_type": "TEXT",
                "table_name": "foo",
                "column-name.0": "col1_text",
                "column-type.0": "TEXT",
                "column-sort.0": "2",
                "column-name.1": "col2_int",
                "column-type.1": "INTEGER",
                "column-sort.1": "1",
                "column-name.2": "col3_real",
                "column-type.2": "REAL",
                "column-sort.2": "3",
                "column-name.3": "col4_blob",
                "column-type.3": "BLOB",
                "column-sort.3": "4",
            },
            "Table has been created",
            {
                "my_pk": str,
                "col2_int": int,
                "col1_text": str,
                "col3_real": float,
                "col4_blob": bytes,
            },
        ),
    ),
)
async def test_create_table(db_path, post_data, expected_message, expected_schema):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    csrftoken_r = await ds.client.get("/-/edit-schema/data/-/create", cookies=cookies)
    csrftoken = csrftoken_r.cookies["ds_csrftoken"]
    cookies["ds_csrftoken"] = csrftoken
    post_data["csrftoken"] = csrftoken
    response = await ds.client.post(
        "/-/edit-schema/data/-/create",
        data=post_data,
        cookies=cookies,
    )
    assert response.status_code == 302
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert len(messages) == 1
    assert messages[0][0] == expected_message
    if expected_schema is not None:
        db = sqlite_utils.Database(db_path)
        assert db[post_data["table_name"]].columns_dict == expected_schema
 
 
def test_examples_for_columns():
    db = sqlite_utils.Database(memory=True)
    db["examples"].insert_all(
        [
            {"id": 1, "name": "Name 1", "age": 15, "weight": None, "photo's": b"Blob"},
            {"id": 2, "name": None, "age": 25, "weight": 2.3, "photo's": b"Blob2"},
            {"id": 3, "name": "", "age": None, "weight": 2.0, "photo's": b"Blob3"},
            {"id": 4, "name": "Name 4", "age": 18, "weight": 1.7, "photo's": b"Blob4"},
            {"id": 5, "name": "Name 5", "age": 21, "weight": None, "photo's": b"Blob5"},
            {"id": 6, "name": "Name 6", "age": 35, "weight": 2.5, "photo's": b"Blob6"},
            {"id": 7, "name": "Name 7", "age": 28, "weight": 1.9, "photo's": b"Blob7"},
            {"id": 8, "name": "Name 8", "age": 22, "weight": 2.1, "photo's": b"Blob8"},
            {"id": 9, "name": "Name 9", "age": 20, "weight": 1.5, "photo's": b"Blob9"},
            {
                "id": 10,
                "name": "Name 10",
                "age": 40,
                "weight": 2.8,
                "photo's": b"Blob10",
            },
        ]
    )
    examples = examples_for_columns(db.conn, "examples")
    assert examples == {
        "age": ["15", "25", "18", "21", "35"],
        "id": ["1", "2", "3", "4", "5"],
        "name": ["Name 1", "Name 4", "Name 5", "Name 6", "Name 7"],
        "weight": ["2.3", "2.0", "1.7", "2.5", "1.9"],
    }
 
 
def test_potential_primary_keys():
    db = sqlite_utils.Database(memory=True)
    db["examples"].insert_all(
        [
            {"id": 1, "photo's": b"Blob", "cat": "1"},
            {"id": 2, "photo's": b"Blob2", "cat": "1"},
            {"id": 3, "photo's": b"Blob3", "cat": "2"},
        ]
    )
    potentials = potential_primary_keys(db.conn, "examples", ["id", "photo's", "cat"])
    assert potentials == ["id", "photo's"]
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "table,post_data,expected_message,expected_indexes",
    (
        (
            "museums",
            {"add_index": "1"},
            "Column name is required",
            [],
        ),
        (
            "museums",
            {"add_index": "1", "add_index_column": "name"},
            "Index added on name",
            [{"name": "idx_museums_name", "columns": ["name"], "unique": 0}],
        ),
        (
            "museums",
            {"add_index": "1", "add_index_column": "name", "add_index_unique": 1},
            "Unique index added on name",
            [{"name": "idx_museums_name", "columns": ["name"], "unique": 1}],
        ),
        (
            "museums",
            {"add_index": "1", "add_index_column": "city", "add_index_unique": 1},
            "no such column: city",
            [],
        ),
        (
            "museums",
            {"add_index": "1", "add_index_column": "city_id", "add_index_unique": 1},
            "UNIQUE constraint failed: museums.city_id",
            [],
        ),
        # Tests for removing an index
        (
            "has_indexes",
            {"drop_index_bad": "1"},
            "no such index: bad",
            [
                {"columns": ["name"], "name": "name_unique_index", "unique": 1},
                {"columns": ["name"], "name": "name_index", "unique": 0},
            ],
        ),
        (
            "has_indexes",
            {"drop_index_name_index": "1"},
            "Index dropped: name_index",
            [{"columns": ["name"], "name": "name_unique_index", "unique": 1}],
        ),
        (
            "has_indexes",
            {"drop_index_name_unique_index": "1"},
            "Index dropped: name_unique_index",
            [{"columns": ["name"], "name": "name_index", "unique": 0}],
        ),
    ),
)
async def test_add_remove_index(
    db_path, table, post_data, expected_message, expected_indexes
):
    ds = Datasette([db_path])
    cookies = {"ds_actor": ds.sign({"a": {"id": "root"}}, "actor")}
    csrftoken = (
        await ds.client.get("/-/edit-schema/data/{}".format(table), cookies=cookies)
    ).cookies["ds_csrftoken"]
    cookies["ds_csrftoken"] = csrftoken
    post_data["csrftoken"] = csrftoken
    response = await ds.client.post(
        "/-/edit-schema/data/{}".format(table), cookies=cookies, data=post_data
    )
    assert response.status_code == 302
    messages = ds.unsign(response.cookies["ds_messages"], "messages")
    assert len(messages) == 1
    assert messages[0][0] == expected_message
    db = sqlite_utils.Database(db_path)
    indexes = db[table].indexes
    assert [
        {"name": index.name, "columns": index.columns, "unique": index.unique}
        for index in indexes
        if "sqlite_autoindex" not in index.name
    ] == expected_indexes
 
Powered by Datasette