home

Menu
  • ripgrep search

datasette-upload-dbs/datasette_upload_dbs/__init__.py

from datasette import hookimpl
from datasette.database import Database
from datasette.utils.asgi import Response, Forbidden
from datasette.utils import to_css_class
from datasette.utils.sqlite import sqlite3
from starlette.requests import Request
from shutil import copyfileobj
import pathlib
 
 
@hookimpl
def register_permissions(datasette):
    # Only runs on 1.0a2 or higher
    from datasette import Permission
 
    return [
        Permission(
            name="upload-dbs",
            abbr=None,
            description="Upload SQLite database files",
            takes_database=False,
            takes_resource=False,
            default=False,
        )
    ]
 
 
@hookimpl
def permission_allowed(actor, action):
    if action == "upload-dbs" and actor and actor.get("id") == "root":
        return True
 
 
@hookimpl
def register_routes():
    return [
        (r"^/-/upload-dbs$", upload_dbs),
        (r"^/-/upload-db$", lambda: Response.redirect("/-/upload-dbs")),
    ]
 
 
@hookimpl
def menu_links(datasette, actor):
    async def inner():
        if await datasette.permission_allowed(
            actor, "upload-dbs", default=False
        ) and _configured(datasette):
            return [
                {
                    "href": datasette.urls.path("/-/upload-dbs"),
                    "label": "Upload database",
                },
            ]
 
    return inner
 
 
@hookimpl
def startup(datasette):
    # Load any databases located in the directory folder
    config = datasette.plugin_config("datasette-upload-dbs") or {}
    if config.get("skip_startup_scan"):
        return
    directory = config.get("directory")
    if not directory:
        return
    path = pathlib.Path(directory)
    database_files = path.glob("*.db")
    for file_path in database_files:
        # Needs to set is_mutable=True here because the default was False
        # in Datasette versions up to and including 0.62
        datasette.add_database(
            Database(datasette, path=str(file_path), is_mutable=True),
        )
 
 
def _configured(datasette):
    return (datasette.plugin_config("datasette-upload-dbs") or {}).get("directory")
 
 
async def upload_dbs(scope, receive, datasette, request):
    if not await datasette.permission_allowed(
        request.actor, "upload-dbs", default=False
    ):
        raise Forbidden("Permission denied for upload-dbs")
 
    directory = _configured(datasette)
 
    if not directory:
        raise Forbidden("datasette-upload-dbs plugin has not been correctly configured")
 
    path = pathlib.Path(directory)
 
    if request.method != "POST":
        return Response.html(
            await datasette.render_template("upload_dbs.html", request=request)
        )
 
    # We use the Starlette request object to handle file uploads
    starlette_request = Request(scope, receive)
 
    async def error(msg):
        if is_xhr:
            return Response.json({"ok": False, "error": msg})
 
        return Response.html(
            await datasette.render_template(
                "upload_dbs.html",
                {
                    "error": msg,
                },
                request=request,
            )
        )
 
    formdata = await starlette_request.form()
    db_file = formdata["db"]
    is_xhr = formdata.get("xhr")
    db_name = (formdata.get("db_name") or "").strip()
 
    if not db_name:
        db_name = db_file.filename.split(".")[0]
 
    db_name = to_css_class(db_name) or "db"
 
    first_16 = db_file.file.read(16)
    if first_16 != b"SQLite format 3\x00":
        return await error("File is not a valid SQLite database (invalid header)")
 
    path.mkdir(parents=True, exist_ok=True)
 
    # Copy it to its final destination
    filepath = path / (db_name + ".db")
    with open(filepath, "wb+") as target_file:
        db_file.file.seek(0)
        copyfileobj(db_file.file, target_file)
 
    # Now really verify it
    conn = sqlite3.connect(str(filepath))
    try:
        conn.execute("select * from sqlite_master")
    except sqlite3.Error as e:
        # Delete file, it is invalid
        filepath.unlink()
        return await error(f"File is not a valid SQLite database ({e})")
 
    # File is valid - add it to this Datasette instance
    db = Database(datasette, path=str(filepath), is_mutable=True)
    datasette.add_database(db)
 
    redirect_url = datasette.urls.database(db.name)
    if is_xhr:
        return Response.json({"ok": True, "redirect": redirect_url})
    else:
        return Response.redirect(redirect_url)
 
Powered by Datasette