datasette-graphql/datasette_graphql/__init__.py
from click import ClickException
from datasette import hookimpl
from datasette.utils.asgi import Response, NotFound, Forbidden
from datasette.plugins import pm
from graphql import graphql, print_schema
import json
from .utils import schema_for_database_via_cache
from . import hookspecs
import pathlib
import time
import urllib
pm.add_hookspecs(hookspecs)
DEFAULT_TIME_LIMIT_MS = 1000
DEFAULT_NUM_QUERIES_LIMIT = 100
async def post_body(request):
body = b""
more_body = True
while more_body:
message = await request.receive()
assert message["type"] == "http.request", message
body += message.get("body", b"")
more_body = message.get("more_body", False)
return body
async def view_graphql_schema(request, datasette):
database = request.url_vars.get("database")
try:
db = datasette.get_database(database)
except KeyError:
raise NotFound("Database does not exist")
await check_permissions(request, datasette, db.name)
schema = await schema_for_database_via_cache(datasette, database=database)
return Response.text(print_schema(schema.schema.graphql_schema))
CORS_HEADERS = {
"Access-Control-Allow-Headers": "content-type",
"Access-Control-Allow-Method": "POST",
"Access-Control-Allow-Origin": "*",
"Vary": "accept",
}
async def view_graphql(request, datasette):
if request.method == "OPTIONS":
return Response.text("ok", headers=CORS_HEADERS if datasette.cors else {})
body = await post_body(request)
database = request.url_vars.get("database")
try:
db = datasette.get_database(database)
except KeyError:
raise NotFound("Database does not exist")
await check_permissions(request, datasette, db.name)
if not body and "text/html" in request.headers.get("accept", ""):
static = pathlib.Path(__file__).parent / "static"
js_filenames = [p.name for p in static.glob("*.js")]
# These need to be sorted so react loads first
order = "react", "react-dom", "graphiql"
js_filenames.sort(key=lambda filename: order.index(filename.split(".")[0]))
return Response.html(
await datasette.render_template(
"graphiql.html",
{
"database": database,
"graphiql_css": [p.name for p in static.glob("*.css")],
"graphiql_js": js_filenames,
"graphql_path": _graphql_path(datasette),
},
request=request,
),
headers=CORS_HEADERS if datasette.cors else {},
)
schema = (await schema_for_database_via_cache(datasette, database=database)).schema
if request.args.get("schema"):
return Response.text(print_schema(schema.graphql_schema))
incoming = {}
if body:
incoming = json.loads(body)
query = incoming.get("query")
variables = incoming.get("variables")
operation_name = incoming.get("operationName")
else:
query = request.args.get("query")
variables = request.args.get("variables", "")
if variables:
variables = json.loads(variables)
operation_name = request.args.get("operationName")
if not query:
return Response.json(
{"error": "Missing query"},
status=400,
headers=CORS_HEADERS if datasette.cors else {},
)
config = datasette.plugin_config("datasette-graphql") or {}
context = {
"time_started": time.monotonic(),
"time_limit_ms": config.get("time_limit_ms") or DEFAULT_TIME_LIMIT_MS,
"num_queries_executed": 0,
"num_queries_limit": config.get("num_queries_limit")
or DEFAULT_NUM_QUERIES_LIMIT,
}
result = await schema.execute_async(
query,
operation_name=operation_name,
variable_values=variables or {},
context_value=context,
)
response = {"data": result.data}
if result.errors:
response["errors"] = [error.formatted for error in result.errors]
return Response.json(
response,
status=200 if not result.errors else 500,
headers=CORS_HEADERS if datasette.cors else {},
)
async def check_permissions(request, datasette, database):
# First check database permission
ok = await datasette.permission_allowed(
request.actor,
"view-database",
resource=database,
default=None,
)
if ok is not None:
if ok:
return
else:
raise Forbidden("view-database")
# Fall back to checking instance permission
ok2 = await datasette.permission_allowed(
request.actor,
"view-instance",
default=None,
)
if ok2 is False:
raise Forbidden("view-instance")
@hookimpl
def menu_links(datasette, actor):
graphql_path = _graphql_path(datasette)
return [
{"href": datasette.urls.path(graphql_path), "label": "GraphQL API"},
]
def _graphql_path(datasette):
config = datasette.plugin_config("datasette-graphql") or {}
graphql_path = None
if "path" not in config:
graphql_path = "/graphql"
else:
graphql_path = config["path"]
assert graphql_path.startswith("/") and not graphql_path.endswith(
"/"
), '"path" must start with / and must not end with /'
return graphql_path
@hookimpl
def register_routes(datasette):
graphql_path = _graphql_path(datasette)
return [
(
r"^{}/(?P<database>[^/]+)\.graphql$".format(graphql_path),
view_graphql_schema,
),
(r"^{}/(?P<database>[^/]+)$".format(graphql_path), view_graphql),
(r"^{}$".format(graphql_path), view_graphql),
]
@hookimpl
def extra_template_vars(datasette):
async def graphql_template_tag(query, database=None, variables=None):
schema = (
await schema_for_database_via_cache(datasette, database=database)
).schema
result = await schema.execute_async(
query,
variable_values=variables or {},
)
if result.errors:
raise Exception(result.errors)
else:
return result.data
return {
"graphql": graphql_template_tag,
}
@hookimpl
def startup(datasette):
# Validate configuration
config = datasette.plugin_config("datasette-graphql") or {}
if "databases" in config:
for database_name in config["databases"].keys():
try:
datasette.get_database(database_name)
except KeyError:
raise ClickException(
"datasette-graphql config error: '{}' is not a connected database".format(
database_name
)
)
@hookimpl
def table_actions(datasette, actor, database, table):
async def inner():
graphql_path = datasette.urls.path(
"{}/{}".format(_graphql_path(datasette), database)
)
db_schema = await schema_for_database_via_cache(datasette, database=database)
try:
example_query = await db_schema.table_classes[table].example_query()
except KeyError:
# https://github.com/simonw/datasette-graphql/issues/90
return []
return [
{
"href": "{}?query={}".format(
graphql_path, urllib.parse.quote(example_query)
),
"label": "GraphQL API for {}".format(table),
}
]
return inner
@hookimpl
def database_actions(datasette, actor, database):
graphql_path = _graphql_path(datasette)
if len(datasette.databases) > 1:
return [
{
"href": datasette.urls.path("{}/{}".format(graphql_path, database)),
"label": "GraphQL API for {}".format(database),
}
]
else:
return [
{
"href": datasette.urls.path(graphql_path),
"label": "GraphQL API",
}
]