datasette/tests/plugins/my_plugin_2.py
from datasette import hookimplfrom datasette.utils.asgi import Responsefrom functools import wrapsimport markupsafeimport json@hookimpldef extra_js_urls():return [{"url": "https://plugin-example.datasette.io/jquery.js","sri": "SRIHASH",},"https://plugin-example.datasette.io/plugin2.js",]@hookimpldef render_cell(value, database):# Render {"href": "...", "label": "..."} as linkif not isinstance(value, str):return Nonestripped = value.strip()if not stripped.startswith("{") and stripped.endswith("}"):return Nonetry:data = json.loads(value)except ValueError:return Noneif not isinstance(data, dict):return Noneif set(data.keys()) != {"href", "label"}:return Nonehref = data["href"]if not (href.startswith("/")or href.startswith("http://")or href.startswith("https://")):return Nonereturn markupsafe.Markup('<a data-database="{database}" href="{href}">{label}</a>'.format(database=database,href=markupsafe.escape(data["href"]),label=markupsafe.escape(data["label"] or "") or " ",))@hookimpldef extra_template_vars(template, database, table, view_name, request, datasette):# This helps unit tests that want to run assertions against the request object:datasette._last_request = requestasync def query_database(sql):first_db = list(datasette.databases.keys())[0]return (await datasette.execute(first_db, sql)).rows[0][0]async def inner():return {"extra_template_vars_from_awaitable": json.dumps({"template": template,"scope_path": request.scope["path"] if request else None,"awaitable": True,},default=lambda b: b.decode("utf8"),),"query_database": query_database,}return inner@hookimpldef asgi_wrapper(datasette):def wrap_with_databases_header(app):@wraps(app)async def add_x_databases_header(scope, receive, send):async def wrapped_send(event):if event["type"] == "http.response.start":original_headers = event.get("headers") or []event = {"type": event["type"],"status": event["status"],"headers": original_headers+ [[b"x-databases",", ".join(datasette.databases.keys()).encode("utf-8"),]],}await send(event)await app(scope, receive, wrapped_send)return add_x_databases_headerreturn wrap_with_databases_header@hookimpldef actor_from_request(datasette, request):async def inner():if request.args.get("_bot2"):result = await datasette.get_database().execute("select 1 + 1")return {"id": "bot2", "1+1": result.first()[0]}else:return Nonereturn inner@hookimpldef permission_allowed(datasette, actor, action):# Testing asyncio version of permission_allowedasync def inner():assert (2== (await datasette.get_internal_database().execute("select 1 + 1")).first()[0])if action == "this_is_allowed_async":return Trueelif action == "this_is_denied_async":return Falsereturn inner@hookimpldef prepare_jinja2_environment(env, datasette):env.filters["format_numeric"] = lambda s: f"{float(s):,.0f}"env.filters["to_hello"] = lambda s: datasette._HELLO@hookimpldef startup(datasette):async def inner():# Run against _internal so tests that use the ds_client fixture# (which has no databases yet on startup) do not fail:internal_db = datasette.get_internal_database()result = await internal_db.execute("select 1 + 1")datasette._startup_hook_calculation = result.first()[0]return inner@hookimpldef canned_queries(datasette, database):async def inner():return {"from_async_hook": "select {}".format((await datasette.get_database(database).execute("select 1 + 1")).first()[0])}return inner@hookimpl(trylast=True)def menu_links(datasette, actor):async def inner():if actor:return [{"href": datasette.urls.instance(), "label": "Hello 2"}]return inner@hookimpldef table_actions(datasette, database, table, actor, request):async def inner():if actor:label = "From async"if request.args.get("_hello"):label += " " + request.args["_hello"]return [{"href": datasette.urls.instance(), "label": label}]return inner@hookimpldef register_routes(datasette):config = datasette.plugin_config("register-route-demo")if not config:returnpath = config["path"]def new_table(request):return Response.text("/db/table: {}".format(sorted(request.url_vars.items())))return [(r"/{}/$".format(path), lambda: Response.text(path.upper())),# Also serves to demonstrate over-ride of default paths:(r"/(?P<db_name>[^/]+)/(?P<table_and_format>[^/]+?$)", new_table),]@hookimpldef handle_exception(datasette, request, exception):datasette._exception_hook_fired = (request, exception)if request.args.get("_custom_error"):return Response.text("_custom_error")elif request.args.get("_custom_error_async"):async def inner():return Response.text("_custom_error_async")return inner@hookimpl(specname="register_routes")def register_triger_error():return ((r"/trigger-error", lambda: 1 / 0),)