home

Menu
  • ripgrep search

datasette-insert/datasette_insert/__init__.py

from datasette import hookimpl
from datasette.utils.asgi import Response
from datasette.utils import actor_matches_allow, sqlite3
import json
import sqlite_utils
 
 
class MissingTable(Exception):
    pass
 
 
async def insert_or_upsert(request, datasette):
    # Wraps insert_or_upsert_implementation with CORS
    response = await insert_or_upsert_implementation(request, datasette)
    if datasette.cors:
        response.headers["Access-Control-Allow-Origin"] = "*"
        response.headers["Access-Control-Allow-Headers"] = "content-type,authorization"
        response.headers["Access-Control-Allow-Methods"] = "POST"
    return response
 
 
async def insert_or_upsert_implementation(request, datasette):
    database = request.url_vars["database"]
    table = request.url_vars["table"]
    upsert = request.url_vars["verb"] == "upsert"
    db = datasette.get_database(database)
 
    # Needed for CORS:
    if request.method == "OPTIONS":
        return Response.text("ok")
 
    pk = request.args.get("pk")
    alter = request.args.get("alter")
 
    if upsert and not pk:
        return Response.json(
            {
                "status": 400,
                "error": "Upsert requires ?pk=",
                "error_code": "upsert_requires_pk",
            },
            status=400,
        )
 
    # Check permissions
    allow_insert_update = False
    allow_create_table = False
    allow_alter_table = False
    allow_all = await datasette.permission_allowed(
        request.actor, "insert:all", database, default=False
    )
    if allow_all:
        allow_insert_update = True
        allow_create_table = True
        allow_alter_table = True
    else:
        # Check for finely grained permissions
        allow_insert_update = await datasette.permission_allowed(
            request.actor, "insert:insert-update", (database, table), default=False
        )
        allow_create_table = await datasette.permission_allowed(
            request.actor, "insert:create-table", database, default=False
        )
        allow_alter_table = await datasette.permission_allowed(
            request.actor, "insert:alter-table", (database, table), default=False
        )
 
    if not allow_insert_update:
        return Response.json({"error": "Permission denied", "status": 403}, status=403)
 
    if alter and not allow_alter_table:
        return Response.json(
            {"error": "Alter permission denied", "status": 403}, status=403
        )
 
    post_json = json.loads(await request.post_body())
    if isinstance(post_json, dict):
        post_json = [post_json]
 
    def write_in_thread(conn):
        db = sqlite_utils.Database(conn)
        if not allow_create_table and not db[table].exists():
            raise MissingTable()
        if upsert:
            db[table].upsert_all(post_json, pk=pk, alter=alter)
        else:
            db[table].insert_all(post_json, replace=True, pk=pk, alter=alter)
        return db[table].count
 
    try:
        table_count = await db.execute_write_fn(write_in_thread, block=True)
    except MissingTable:
        return Response.json(
            {
                "status": 400,
                "error": "Table {} does not exist".format(table),
                "error_code": "missing_table",
            },
            status=400,
        )
    except sqlite3.OperationalError as ex:
        if "has no column" in str(ex):
            return Response.json(
                {"status": 400, "error": str(ex), "error_code": "unknown_keys"},
                status=400,
            )
        else:
            return Response.json({"error": str(ex), "status": 500}, status=500)
 
    return Response.json({"table_count": table_count})
 
 
@hookimpl
def permission_allowed(datasette, actor, action):
    if action != "insert:all":
        return None
    plugin_config = datasette.plugin_config("datasette-insert") or {}
    if "allow" in plugin_config:
        return actor_matches_allow(actor, plugin_config["allow"])
 
 
@hookimpl
def register_routes():
    return [
        (
            r"^/-/(?P<verb>(insert|upsert))/(?P<database>[^/]+)/(?P<table>[^/]+)$",
            insert_or_upsert,
        ),
    ]
 
Powered by Datasette