datasette-hashed-urls/datasette_hashed_urls/__init__.py
from datasette import hookimplfrom functools import wrapsimport hashlib@hookimpldef startup(datasette):datasette._hashed_url_databases = {}all_hashes = []for name, database in datasette.databases.items():if database.hash:all_hashes.append(database.hash)hash = database.hash[:7]datasette._hashed_url_databases[name] = hashroute = "{}-{}".format(name, hash)database.route = routedatasette._hashed_url_databases[name] = hashif datasette.crossdb and all_hashes:# Set up a hashed route for _memory too, as a combo# of all of the other hashesmemory_hash = hashlib.sha256("\n".join(all_hashes).encode("latin-1")).hexdigest()[:7]memory = datasette.get_database("_memory")memory.route = "_memory-{}".format(memory_hash)datasette._hashed_url_databases["_memory"] = memory_hash@hookimpldef asgi_wrapper(datasette):def wrap_with_hashed_urls(app):@wraps(app)async def hashed_urls(scope, receive, send):if scope.get("type") != "http":await app(scope, receive, send)return# Only trigger on pages with a path that starts with /xxx# or /xxx-yyy where xxx is the name of an immutable database# and where the first page component matches a database namepath = scope["path"].lstrip("/")first_component = path.split("/")[0]# Might have a format like .json on the endfirst_component_without_format = first_component.split(".")[0]db_without_hash_or_format = first_component_without_format.rsplit("-", 1)[0]if (first_component_without_format in datasette._hashed_url_databases) or (db_without_hash_or_format in datasette._hashed_url_databases):await handle_hashed_urls(datasette, app, scope, receive, send)returnawait app(scope, receive, send)return hashed_urlsreturn wrap_with_hashed_urlsasync def handle_hashed_urls(datasette, app, scope, receive, send):path = scope["path"].lstrip("/")first_component = path.split("/")[0]if "." in first_component:first_component_without_format, _, format = first_component.partition(".")else:first_component_without_format = first_componentformat = Noneif ("-" not in first_component_without_format) or (first_component_without_format in datasette._hashed_url_databases):db_name = first_component_without_formatincoming_hash = ""else:db_name, incoming_hash = first_component_without_format.rsplit("-", 1)current_hash = datasette._hashed_url_databases[db_name]if current_hash != incoming_hash:# Send the redirectpath_bits = path.split("/")new_path = "/" + "/".join(["{}-{}{}".format(db_name, current_hash, ".{}".format(format) if format else "")]+ path_bits[1:])if scope.get("query_string"):new_path += "?" + scope["query_string"].decode("latin-1")redirect_headers = [[b"location", new_path.encode("latin1")]]if datasette.cors:redirect_headers.extend([[b"access-control-allow-origin", b"*"],[b"access-control-allow-headers", b"authorization"],[b"access-control-expose-headers", b"link"],])await send({"type": "http.response.start","status": 302,"headers": redirect_headers,})await send({"type": "http.response.body", "body": b""})returnelse:plugin_config = datasette.plugin_config("datasette-hashed-urls") or {}max_age = plugin_config.get("max_age", 31536000)# Hash is correct, add a far-future cache headerasync def wrapped_send(event):if event["type"] == "http.response.start":original_headers = [pairfor pair in event.get("headers")if pair[0].lower() != b"cache-control"]event = {"type": event["type"],"status": event["status"],"headers": original_headers+ [[b"cache-control","max-age={}, public".format(max_age).encode("latin-1"),]],}await send(event)return await app(scope, receive, wrapped_send)