datasette-indieauth/datasette_indieauth/__init__.py
from datasette import hookimplfrom .utils import (build_authorization_url,canonicalize_url,discover_endpoints,display_url,verify_profile_url,verify_same_domain,)import httpximport itsdangerousfrom markupsafe import escapeimport jsonimport urllibDATASETTE_INDIEAUTH_STATE = "datasette-indieauth-state"DATASETTE_INDIEAUTH_COOKIE = "datasette-indieauth-cookie"async def indieauth(request, datasette):return await indieauth_page(request, datasette)async def indieauth_page(request, datasette, status=200, error=None):from datasette.utils.asgi import Responseurls = Urls(request, datasette)if request.method == "POST":while True: # So I can use 'break'post = await request.post_vars()me = post.get("me")if me:me = canonicalize_url(me)if not me or not verify_profile_url(me):error = "Invalid IndieAuth identifier"break# Start the auth processtry:me, authorization_endpoint, token_endpoint = await discover_endpoints(me)except httpx.RequestError as ex:error = "Invalid IndieAuth identifier: {}".format(ex)breakif not authorization_endpoint:error = "Invalid IndieAuth identifier - no authorization_endpoint found"breakauthorization_url, state, verifier = build_authorization_url(authorization_endpoint=authorization_endpoint,client_id=urls.client_id,redirect_uri=urls.redirect_uri,me=me,signing_function=lambda x: datasette.sign(x, DATASETTE_INDIEAUTH_STATE),)response = Response.redirect(authorization_url)response.set_cookie("ds_indieauth",datasette.sign({"v": verifier,"m": me,},DATASETTE_INDIEAUTH_COOKIE,),)return responsereturn Response.html(await datasette.render_template("indieauth.html",{"error": error,"title": datasette.metadata("title") or "Datasette","absolute_instance_url": datasette.absolute_url(request, datasette.urls.instance()),},request=request,),status=status,)async def indieauth_done(request, datasette):from datasette.utils.asgi import Responsestate = request.args.get("state") or ""code = request.args.get("code")try:state_bits = datasette.unsign(state, DATASETTE_INDIEAUTH_STATE)except itsdangerous.BadSignature:return await indieauth_page(request, datasette, error="Invalid state", status=400)authorization_endpoint = state_bits["a"]urls = Urls(request, datasette)# code_verifier should be in a signed cookiecode_verifier = Noneoriginal_me = Noneif "ds_indieauth" in request.cookies:try:cookie_bits = datasette.unsign(request.cookies["ds_indieauth"], DATASETTE_INDIEAUTH_COOKIE)code_verifier = cookie_bits["v"]original_me = cookie_bits["m"]except (itsdangerous.BadSignature, KeyError):passif not code_verifier or not original_me:return await indieauth_page(request, datasette, error="Invalid ds_indieauth cookie")data = {"grant_type": "authorization_code","code": code,"client_id": urls.client_id,"redirect_uri": urls.redirect_uri,"code_verifier": code_verifier,}async with httpx.AsyncClient() as client:response = await client.post(authorization_endpoint, data=data)if response.status_code == 200:body = response.texttry:info = json.loads(body)except ValueError:info = dict(urllib.parse.parse_qsl(body))if "me" not in info:return await indieauth_page(request,datasette,error="Invalid authorization_code response from authorization server",)me = info["me"]# Verify returned me - must be same domain and link to same authorization_endpointme_error = Noneif not verify_same_domain(me, original_me):me_error = '"me" value returned by authorization server had a domain that did not match the initial URL'canonical_me, me_authorization_endpoint, _ = await utils.discover_endpoints(me)if me_authorization_endpoint != authorization_endpoint:me_error = '"me" value resolves to a different authorization_endpoint'if me_error:return await indieauth_page(request,datasette,error=me_error,)me = canonical_meactor = {"me": me,"display": display_url(me),}if "scope" in info:actor["indieauth_scope"] = info["scope"]if "profile" in info and isinstance(info["profile"], dict):actor.update(info["profile"])response = Response.redirect(datasette.urls.instance())response.set_cookie("ds_actor",datasette.sign({"a": actor},"actor",),)return responseelse:return await indieauth_page(request,datasette,error="Invalid response from authorization server",)class Urls:def __init__(self, request, datasette):self.request = requestself.datasette = datasettedef absolute(self, path):return self.datasette.absolute_url(self.request, self.datasette.urls.path(path))@propertydef login(self):return self.absolute("/-/indieauth")@propertydef client_id(self):return self.login@propertydef redirect_uri(self):return self.absolute("/-/indieauth/done")@hookimpldef register_routes():return [(r"^/-/indieauth$", indieauth),(r"^/-/indieauth/done$", indieauth_done),]@hookimpldef menu_links(datasette, actor):if not actor:return [{"href": datasette.urls.path("/-/indieauth"),"label": "Sign in IndieAuth",},]@hookimpldef permission_allowed(datasette, actor, action):if action != "view-instance":return Noneplugin_config = datasette.plugin_config("datasette-indieauth") or {}if plugin_config.get("restrict_access") is None:return None# Only actors in the list are allowedif not actor:return Falseallowed_actors = plugin_config["restrict_access"]if isinstance(allowed_actors, str):allowed_actors = allowed_actors.split()return actor.get("me") in allowed_actors@hookimpldef forbidden(request, datasette):async def inner():return await indieauth_page(request, datasette, 403)return inner