datasette/tests/plugins/my_plugin.py
import asynciofrom datasette import hookimpl, Permissionfrom datasette.facets import Facetfrom datasette import tracerfrom datasette.utils import path_with_added_argsfrom datasette.utils.asgi import asgi_send_json, Responseimport base64import pintimport jsonureg = pint.UnitRegistry()@hookimpldef prepare_connection(conn, database, datasette):def convert_units(amount, from_, to_):"""select convert_units(100, 'm', 'ft');"""return (amount * ureg(from_)).to(to_).to_tuple()[0]conn.create_function("convert_units", 3, convert_units)def prepare_connection_args():return 'database={}, datasette.plugin_config("name-of-plugin")={}'.format(database, datasette.plugin_config("name-of-plugin"))conn.create_function("prepare_connection_args", 0, prepare_connection_args)@hookimpldef extra_css_urls(template, database, table, view_name, columns, request, datasette):async def inner():return ["https://plugin-example.datasette.io/{}/extra-css-urls-demo.css".format(base64.b64encode(json.dumps({"template": template,"database": database,"table": table,"view_name": view_name,"request_path": request.pathif request is not Noneelse None,"added": (await datasette.get_database().execute("select 3 * 5")).first()[0],"columns": columns,}).encode("utf8")).decode("utf8"))]return inner@hookimpldef extra_js_urls():return [{"url": "https://plugin-example.datasette.io/jquery.js","sri": "SRIHASH",},"https://plugin-example.datasette.io/plugin1.js",{"url": "https://plugin-example.datasette.io/plugin.module.js", "module": True},]@hookimpldef extra_body_script(template, database, table, view_name, columns, request, datasette):async def inner():script = "var extra_body_script = {};".format(json.dumps({"template": template,"database": database,"table": table,"config": datasette.plugin_config("name-of-plugin",database=database,table=table,),"view_name": view_name,"request_path": request.path if request is not None else None,"added": (await datasette.get_database().execute("select 3 * 5")).first()[0],"columns": columns,}))return {"script": script, "module": True}return inner@hookimpldef render_cell(row, value, column, table, database, datasette, request):async def inner():# Render some debug output in cell with value RENDER_CELL_DEMOif value == "RENDER_CELL_DEMO":data = {"row": dict(row),"column": column,"table": table,"database": database,"config": datasette.plugin_config("name-of-plugin",database=database,table=table,),}if request.args.get("_render_cell_extra"):data["render_cell_extra"] = 1return json.dumps(data)elif value == "RENDER_CELL_ASYNC":return (await datasette.get_database(database).execute("select 'RENDER_CELL_ASYNC_RESULT'")).single_value()return inner@hookimpldef extra_template_vars(template, database, table, view_name, columns, request, datasette):return {"extra_template_vars": json.dumps({"template": template,"scope_path": request.scope["path"] if request else None,"columns": columns,},default=lambda b: b.decode("utf8"),)}@hookimpldef prepare_jinja2_environment(env, datasette):async def select_times_three(s):db = datasette.get_database()return (await db.execute("select 3 * ?", [int(s)])).first()[0]async def inner():env.filters["select_times_three"] = select_times_threereturn inner@hookimpldef register_facet_classes():return [DummyFacet]class DummyFacet(Facet):type = "dummy"async def suggest(self):columns = await self.get_columns(self.sql, self.params)return ([{"name": column,"toggle_url": self.ds.absolute_url(self.request,path_with_added_args(self.request, {"_facet_dummy": column}),),"type": "dummy",}for column in columns]if self.request.args.get("_dummy_facet")else [])async def facet_results(self):facet_results = {}facets_timed_out = []return facet_results, facets_timed_out@hookimpldef actor_from_request(datasette, request):if request.args.get("_bot"):return {"id": "bot"}else:return None@hookimpldef asgi_wrapper():def wrap(app):async def maybe_set_actor_in_scope(scope, receive, send):if b"_actor_in_scope" in scope.get("query_string", b""):scope = dict(scope, actor={"id": "from-scope"})print(scope)await app(scope, receive, send)return maybe_set_actor_in_scopereturn wrap@hookimpldef permission_allowed(actor, action):if action == "this_is_allowed":return Trueelif action == "this_is_denied":return Falseelif action == "view-database-download":return actor.get("can_download") if actor else None# Special permissions for latest.datasette.io demos# See https://github.com/simonw/todomvc-datasette/issues/2actor_id = Noneif actor:actor_id = actor.get("id")if actor_id == "todomvc" and action in ("insert-row","create-table","drop-table","delete-row","update-row",):return True@hookimpldef register_routes():async def one(datasette):return Response.text((await datasette.get_database().execute("select 1 + 1")).first()[0])async def two(request):name = request.url_vars["name"]greeting = request.args.get("greeting")return Response.text(f"{greeting} {name}")async def three(scope, send):await asgi_send_json(send, {"hello": "world"}, status=200, headers={"x-three": "1"})async def post(request):if request.method == "GET":return Response.html(request.scope["csrftoken"]())else:return Response.json(await request.post_vars())async def csrftoken_form(request, datasette):return Response.html(await datasette.render_template("csrftoken_form.html", request=request))def not_async():return Response.html("This was not async")def add_message(datasette, request):datasette.add_message(request, "Hello from messages")return Response.html("Added message")async def render_message(datasette, request):return Response.html(await datasette.render_template("render_message.html", request=request))def login_as_root(datasette, request):# Mainly for the latest.datasette.io demoif request.method == "POST":response = Response.redirect("/")response.set_cookie("ds_actor", datasette.sign({"a": {"id": "root"}}, "actor"))return responsereturn Response.html("""<form action="{}" method="POST"><p><input type="hidden" name="csrftoken" value="{}"><input type="submit"value="Sign in as root user"style="font-size: 2em; padding: 0.1em 0.5em;"></p></form>""".format(request.path, request.scope["csrftoken"]()))def asgi_scope(scope):return Response.json(scope, default=repr)async def parallel_queries(datasette):db = datasette.get_database()with tracer.trace_child_tasks():one, two = await asyncio.gather(db.execute("select coalesce(sleep(0.1), 1)"),db.execute("select coalesce(sleep(0.1), 2)"),)return Response.json({"one": one.single_value(), "two": two.single_value()})return [(r"/one/$", one),(r"/two/(?P<name>.*)$", two),(r"/three/$", three),(r"/post/$", post),(r"/csrftoken-form/$", csrftoken_form),(r"/login-as-root$", login_as_root),(r"/not-async/$", not_async),(r"/add-message/$", add_message),(r"/render-message/$", render_message),(r"/asgi-scope$", asgi_scope),(r"/parallel-queries$", parallel_queries),]@hookimpldef startup(datasette):datasette._startup_hook_fired = True# And test some import shortcuts toofrom datasette import Responsefrom datasette import Forbiddenfrom datasette import NotFoundfrom datasette import hookimplfrom datasette import actor_matches_allow_ = (Response, Forbidden, NotFound, hookimpl, actor_matches_allow)@hookimpldef canned_queries(datasette, database, actor):return {"from_hook": f"select 1, '{actor['id'] if actor else 'null'}' as actor_id"}@hookimpldef register_magic_parameters():from uuid import uuid4def uuid(key, request):if key == "new":return str(uuid4())else:raise KeyErrordef request(key, request):if key == "http_version":return request.scope["http_version"]else:raise KeyErrorreturn [("request", request),("uuid", uuid),]@hookimpldef forbidden(datasette, request, message):datasette._last_forbidden_message = messageif request.path == "/data2":return Response.redirect("/login?message=" + message)@hookimpldef menu_links(datasette, actor, request):if actor:label = "Hello"if request.args.get("_hello"):label += ", " + request.args["_hello"]return [{"href": datasette.urls.instance(), "label": label}]@hookimpldef table_actions(datasette, database, table, actor):if actor:return [{"href": datasette.urls.instance(),"label": f"Database: {database}",},{"href": datasette.urls.instance(), "label": f"Table: {table}"},]@hookimpldef database_actions(datasette, database, actor, request):if actor:label = f"Database: {database}"if request.args.get("_hello"):label += " - " + request.args["_hello"]return [{"href": datasette.urls.instance(),"label": label,}]@hookimpldef skip_csrf(scope):return scope["path"] == "/skip-csrf"@hookimpldef register_permissions(datasette):extras = datasette.plugin_config("datasette-register-permissions") or {}permissions = [Permission(name="permission-from-plugin",abbr="np",description="New permission added by a plugin",takes_database=True,takes_resource=False,default=False,)]if extras:permissions.extend(Permission(name=p["name"],abbr=p["abbr"],description=p["description"],takes_database=p["takes_database"],takes_resource=p["takes_resource"],default=p["default"],)for p in extras["permissions"])return permissions