datasette/tests/test_cli.py
from .fixtures import (app_client,make_app_client,TestClient as _TestClient,EXPECTED_PLUGINS,)import asynciofrom datasette.app import SETTINGSfrom datasette.plugins import DEFAULT_PLUGINSfrom datasette.cli import cli, servefrom datasette.version import __version__from datasette.utils import tilde_encodefrom datasette.utils.sqlite import sqlite3from click.testing import CliRunnerimport ioimport jsonimport pathlibimport pytestimport sysimport textwrapfrom unittest import mockimport urllibdef test_inspect_cli(app_client):runner = CliRunner()result = runner.invoke(cli, ["inspect", "fixtures.db"])data = json.loads(result.output)assert ["fixtures"] == list(data.keys())database = data["fixtures"]assert "fixtures.db" == database["file"]assert isinstance(database["hash"], str)assert 64 == len(database["hash"])for table_name, expected_count in {"Table With Space In Name": 0,"facetable": 15,}.items():assert expected_count == database["tables"][table_name]["count"]def test_inspect_cli_writes_to_file(app_client):runner = CliRunner()result = runner.invoke(cli, ["inspect", "fixtures.db", "--inspect-file", "foo.json"])assert 0 == result.exit_code, result.outputwith open("foo.json") as fp:data = json.load(fp)assert ["fixtures"] == list(data.keys())def test_serve_with_inspect_file_prepopulates_table_counts_cache():inspect_data = {"fixtures": {"tables": {"hithere": {"count": 44}}}}with make_app_client(inspect_data=inspect_data, is_immutable=True) as client:assert inspect_data == client.ds.inspect_datadb = client.ds.databases["fixtures"]assert {"hithere": 44} == db.cached_table_counts@pytest.mark.parametrize("spatialite_paths,should_suggest_load_extension",(([], False),(["/tmp"], True),),)def test_spatialite_error_if_attempt_to_open_spatialite(spatialite_paths, should_suggest_load_extension):with mock.patch("datasette.utils.SPATIALITE_PATHS", spatialite_paths):runner = CliRunner()result = runner.invoke(cli, ["serve", str(pathlib.Path(__file__).parent / "spatialite.db")])assert result.exit_code != 0assert "It looks like you're trying to load a SpatiaLite" in result.outputsuggestion = "--load-extension=spatialite"if should_suggest_load_extension:assert suggestion in result.outputelse:assert suggestion not in result.output@mock.patch("datasette.utils.SPATIALITE_PATHS", ["/does/not/exist"])def test_spatialite_error_if_cannot_find_load_extension_spatialite():runner = CliRunner()result = runner.invoke(cli,["serve",str(pathlib.Path(__file__).parent / "spatialite.db"),"--load-extension","spatialite",],)assert result.exit_code != 0assert "Could not find SpatiaLite extension" in result.outputdef test_plugins_cli(app_client):runner = CliRunner()result1 = runner.invoke(cli, ["plugins"])assert json.loads(result1.output) == EXPECTED_PLUGINS# Try with --allresult2 = runner.invoke(cli, ["plugins", "--all"])names = [p["name"] for p in json.loads(result2.output)]# Should have all the EXPECTED_PLUGINSassert set(names).issuperset({p["name"] for p in EXPECTED_PLUGINS})# And the following too:assert set(names).issuperset(DEFAULT_PLUGINS)# --requirements should be empty because there are no installed non-plugins-dir pluginsresult3 = runner.invoke(cli, ["plugins", "--requirements"])assert result3.output == ""def test_metadata_yaml():yaml_file = io.StringIO(textwrap.dedent("""title: Hello from YAML"""))# Annoyingly we have to provide all default arguments here:ds = serve.callback([],metadata=yaml_file,immutable=[],host="127.0.0.1",port=8001,uds=None,reload=False,cors=False,sqlite_extensions=[],inspect_file=None,template_dir=None,plugins_dir=None,static=[],memory=False,config=[],settings=[],secret=None,root=False,token=None,actor=None,version_note=None,get=None,help_settings=False,pdb=False,crossdb=False,nolock=False,open_browser=False,create=False,ssl_keyfile=None,ssl_certfile=None,return_instance=True,internal=None,)client = _TestClient(ds)response = client.get("/-/metadata.json")assert {"title": "Hello from YAML"} == response.json@mock.patch("datasette.cli.run_module")def test_install(run_module):runner = CliRunner()runner.invoke(cli, ["install", "datasette-mock-plugin", "datasette-mock-plugin2"])run_module.assert_called_once_with("pip", run_name="__main__")assert sys.argv == ["pip","install","datasette-mock-plugin","datasette-mock-plugin2",]@pytest.mark.parametrize("flag", ["-U", "--upgrade"])@mock.patch("datasette.cli.run_module")def test_install_upgrade(run_module, flag):runner = CliRunner()runner.invoke(cli, ["install", flag, "datasette"])run_module.assert_called_once_with("pip", run_name="__main__")assert sys.argv == ["pip", "install", "--upgrade", "datasette"]@mock.patch("datasette.cli.run_module")def test_install_requirements(run_module, tmpdir):path = tmpdir.join("requirements.txt")path.write("datasette-mock-plugin\ndatasette-plugin-2")runner = CliRunner()runner.invoke(cli, ["install", "-r", str(path)])run_module.assert_called_once_with("pip", run_name="__main__")assert sys.argv == ["pip", "install", "-r", str(path)]def test_install_error_if_no_packages():runner = CliRunner()result = runner.invoke(cli, ["install"])assert result.exit_code == 2assert "Error: Please specify at least one package to install" in result.output@mock.patch("datasette.cli.run_module")def test_uninstall(run_module):runner = CliRunner()runner.invoke(cli, ["uninstall", "datasette-mock-plugin", "-y"])run_module.assert_called_once_with("pip", run_name="__main__")assert sys.argv == ["pip", "uninstall", "datasette-mock-plugin", "-y"]def test_version():runner = CliRunner()result = runner.invoke(cli, ["--version"])assert result.output == f"cli, version {__version__}\n"@pytest.mark.parametrize("invalid_port", ["-1", "0.5", "dog", "65536"])def test_serve_invalid_ports(invalid_port):runner = CliRunner(mix_stderr=False)result = runner.invoke(cli, ["--port", invalid_port])assert result.exit_code == 2assert "Invalid value for '-p'" in result.stderr@pytest.mark.parametrize("args",(["--setting", "default_page_size", "5"],["--setting", "settings.default_page_size", "5"],["-s", "settings.default_page_size", "5"],),)def test_setting(args):runner = CliRunner()result = runner.invoke(cli, ["--get", "/-/settings.json"] + args)assert result.exit_code == 0, result.outputsettings = json.loads(result.output)assert settings["default_page_size"] == 5def test_plugin_s_overwrite():runner = CliRunner()plugins_dir = str(pathlib.Path(__file__).parent / "plugins")result = runner.invoke(cli,["--plugins-dir",plugins_dir,"--get","/_memory.json?sql=select+prepare_connection_args()",],)assert result.exit_code == 0, result.outputassert (json.loads(result.output).get("rows")[0].get("prepare_connection_args()")== 'database=_memory, datasette.plugin_config("name-of-plugin")=None')result = runner.invoke(cli,["--plugins-dir",plugins_dir,"--get","/_memory.json?sql=select+prepare_connection_args()","-s","plugins.name-of-plugin","OVERRIDE",],)assert result.exit_code == 0, result.outputassert (json.loads(result.output).get("rows")[0].get("prepare_connection_args()")== 'database=_memory, datasette.plugin_config("name-of-plugin")=OVERRIDE')def test_setting_type_validation():runner = CliRunner(mix_stderr=False)result = runner.invoke(cli, ["--setting", "default_page_size", "dog"])assert result.exit_code == 2assert '"settings.default_page_size" should be an integer' in result.stderr@pytest.mark.parametrize("default_allow_sql", (True, False))def test_setting_default_allow_sql(default_allow_sql):runner = CliRunner()result = runner.invoke(cli,["--setting","default_allow_sql","on" if default_allow_sql else "off","--get","/_memory.json?sql=select+21&_shape=objects",],)if default_allow_sql:assert result.exit_code == 0, result.outputassert json.loads(result.output)["rows"][0] == {"21": 21}else:assert result.exit_code == 1, result.output# This isn't JSON at the moment, maybe it should be thoughassert "Forbidden" in result.outputdef test_sql_errors_logged_to_stderr():runner = CliRunner(mix_stderr=False)result = runner.invoke(cli, ["--get", "/_memory.json?sql=select+blah"])assert result.exit_code == 1assert "sql = 'select blah', params = {}: no such column: blah\n" in result.stderrdef test_serve_create(tmpdir):runner = CliRunner()db_path = tmpdir / "does_not_exist_yet.db"assert not db_path.exists()result = runner.invoke(cli, [str(db_path), "--create", "--get", "/-/databases.json"])assert result.exit_code == 0, result.outputdatabases = json.loads(result.output)assert {"name": "does_not_exist_yet","is_mutable": True,"is_memory": False,"hash": None,}.items() <= databases[0].items()assert db_path.exists()@pytest.mark.parametrize("argument", ("-c", "--config"))@pytest.mark.parametrize("format_", ("json", "yaml"))def test_serve_config(tmpdir, argument, format_):config_path = tmpdir / "datasette.{}".format(format_)config_path.write_text("settings:\n default_page_size: 5\n"if format_ == "yaml"else '{"settings": {"default_page_size": 5}}',"utf-8",)runner = CliRunner()result = runner.invoke(cli,[argument,str(config_path),"--get","/-/settings.json",],)assert result.exit_code == 0, result.outputassert json.loads(result.output)["default_page_size"] == 5def test_serve_duplicate_database_names(tmpdir):"'datasette db.db nested/db.db' should attach two databases, /db and /db_2"runner = CliRunner()db_1_path = str(tmpdir / "db.db")nested = tmpdir / "nested"nested.mkdir()db_2_path = str(tmpdir / "nested" / "db.db")for path in (db_1_path, db_2_path):sqlite3.connect(path).execute("vacuum")result = runner.invoke(cli, [db_1_path, db_2_path, "--get", "/-/databases.json"])assert result.exit_code == 0, result.outputdatabases = json.loads(result.output)assert {db["name"] for db in databases} == {"db", "db_2"}def test_serve_deduplicate_same_database_path(tmpdir):"'datasette db.db db.db' should only attach one database, /db"runner = CliRunner()db_path = str(tmpdir / "db.db")sqlite3.connect(db_path).execute("vacuum")result = runner.invoke(cli, [db_path, db_path, "--get", "/-/databases.json"])assert result.exit_code == 0, result.outputdatabases = json.loads(result.output)assert {db["name"] for db in databases} == {"db"}@pytest.mark.parametrize("filename", ["test-database (1).sqlite", "database (1).sqlite"])def test_weird_database_names(tmpdir, filename):# https://github.com/simonw/datasette/issues/1181runner = CliRunner()db_path = str(tmpdir / filename)sqlite3.connect(db_path).execute("vacuum")result1 = runner.invoke(cli, [db_path, "--get", "/"])assert result1.exit_code == 0, result1.outputfilename_no_stem = filename.rsplit(".", 1)[0]expected_link = '<a href="/{}">{}</a>'.format(tilde_encode(filename_no_stem), filename_no_stem)assert expected_link in result1.output# Now try hitting that database pageresult2 = runner.invoke(cli, [db_path, "--get", "/{}".format(tilde_encode(filename_no_stem))])assert result2.exit_code == 0, result2.outputdef test_help_settings():runner = CliRunner()result = runner.invoke(cli, ["--help-settings"])for setting in SETTINGS:assert setting.name in result.outputdef test_internal_db(tmpdir):runner = CliRunner()internal_path = tmpdir / "internal.db"assert not internal_path.exists()result = runner.invoke(cli, ["--memory", "--internal", str(internal_path), "--get", "/"])assert result.exit_code == 0assert internal_path.exists()