home

Menu
  • ripgrep search

datasette-indieauth/datasette_indieauth/__init__.py

from datasette import hookimpl
from .utils import (
    build_authorization_url,
    canonicalize_url,
    discover_endpoints,
    display_url,
    verify_profile_url,
    verify_same_domain,
)
import httpx
import itsdangerous
from markupsafe import escape
import json
import urllib
 
DATASETTE_INDIEAUTH_STATE = "datasette-indieauth-state"
DATASETTE_INDIEAUTH_COOKIE = "datasette-indieauth-cookie"
 
 
async def indieauth(request, datasette):
    return await indieauth_page(request, datasette)
 
 
async def indieauth_page(request, datasette, status=200, error=None):
    from datasette.utils.asgi import Response
 
    urls = Urls(request, datasette)
 
    if request.method == "POST":
        while True:  # So I can use 'break'
            post = await request.post_vars()
            me = post.get("me")
            if me:
                me = canonicalize_url(me)
 
            if not me or not verify_profile_url(me):
                error = "Invalid IndieAuth identifier"
                break
 
            # Start the auth process
            try:
                me, authorization_endpoint, token_endpoint = await discover_endpoints(
                    me
                )
            except httpx.RequestError as ex:
                error = "Invalid IndieAuth identifier: {}".format(ex)
                break
            if not authorization_endpoint:
                error = "Invalid IndieAuth identifier - no authorization_endpoint found"
                break
 
            authorization_url, state, verifier = build_authorization_url(
                authorization_endpoint=authorization_endpoint,
                client_id=urls.client_id,
                redirect_uri=urls.redirect_uri,
                me=me,
                signing_function=lambda x: datasette.sign(x, DATASETTE_INDIEAUTH_STATE),
            )
            response = Response.redirect(authorization_url)
            response.set_cookie(
                "ds_indieauth",
                datasette.sign(
                    {
                        "v": verifier,
                        "m": me,
                    },
                    DATASETTE_INDIEAUTH_COOKIE,
                ),
            )
            return response
 
    return Response.html(
        await datasette.render_template(
            "indieauth.html",
            {
                "error": error,
                "title": datasette.metadata("title") or "Datasette",
                "absolute_instance_url": datasette.absolute_url(
                    request, datasette.urls.instance()
                ),
            },
            request=request,
        ),
        status=status,
    )
 
 
async def indieauth_done(request, datasette):
    from datasette.utils.asgi import Response
 
    state = request.args.get("state") or ""
    code = request.args.get("code")
    try:
        state_bits = datasette.unsign(state, DATASETTE_INDIEAUTH_STATE)
    except itsdangerous.BadSignature:
        return await indieauth_page(
            request, datasette, error="Invalid state", status=400
        )
    authorization_endpoint = state_bits["a"]
 
    urls = Urls(request, datasette)
 
    # code_verifier should be in a signed cookie
    code_verifier = None
    original_me = None
    if "ds_indieauth" in request.cookies:
        try:
            cookie_bits = datasette.unsign(
                request.cookies["ds_indieauth"], DATASETTE_INDIEAUTH_COOKIE
            )
            code_verifier = cookie_bits["v"]
            original_me = cookie_bits["m"]
        except (itsdangerous.BadSignature, KeyError):
            pass
    if not code_verifier or not original_me:
        return await indieauth_page(
            request, datasette, error="Invalid ds_indieauth cookie"
        )
 
    data = {
        "grant_type": "authorization_code",
        "code": code,
        "client_id": urls.client_id,
        "redirect_uri": urls.redirect_uri,
        "code_verifier": code_verifier,
    }
    async with httpx.AsyncClient() as client:
        response = await client.post(authorization_endpoint, data=data)
 
    if response.status_code == 200:
        body = response.text
        try:
            info = json.loads(body)
        except ValueError:
            info = dict(urllib.parse.parse_qsl(body))
        if "me" not in info:
            return await indieauth_page(
                request,
                datasette,
                error="Invalid authorization_code response from authorization server",
            )
        me = info["me"]
 
        # Verify returned me - must be same domain and link to same authorization_endpoint
        me_error = None
        if not verify_same_domain(me, original_me):
            me_error = '"me" value returned by authorization server had a domain that did not match the initial URL'
 
        canonical_me, me_authorization_endpoint, _ = await utils.discover_endpoints(me)
        if me_authorization_endpoint != authorization_endpoint:
            me_error = '"me" value resolves to a different authorization_endpoint'
 
        if me_error:
            return await indieauth_page(
                request,
                datasette,
                error=me_error,
            )
 
        me = canonical_me
 
        actor = {
            "me": me,
            "display": display_url(me),
        }
        if "scope" in info:
            actor["indieauth_scope"] = info["scope"]
 
        if "profile" in info and isinstance(info["profile"], dict):
            actor.update(info["profile"])
        response = Response.redirect(datasette.urls.instance())
        response.set_cookie(
            "ds_actor",
            datasette.sign(
                {"a": actor},
                "actor",
            ),
        )
        return response
    else:
        return await indieauth_page(
            request,
            datasette,
            error="Invalid response from authorization server",
        )
 
 
class Urls:
    def __init__(self, request, datasette):
        self.request = request
        self.datasette = datasette
 
    def absolute(self, path):
        return self.datasette.absolute_url(self.request, self.datasette.urls.path(path))
 
    @property
    def login(self):
        return self.absolute("/-/indieauth")
 
    @property
    def client_id(self):
        return self.login
 
    @property
    def redirect_uri(self):
        return self.absolute("/-/indieauth/done")
 
 
@hookimpl
def register_routes():
    return [
        (r"^/-/indieauth$", indieauth),
        (r"^/-/indieauth/done$", indieauth_done),
    ]
 
 
@hookimpl
def menu_links(datasette, actor):
    if not actor:
        return [
            {
                "href": datasette.urls.path("/-/indieauth"),
                "label": "Sign in IndieAuth",
            },
        ]
 
 
@hookimpl
def permission_allowed(datasette, actor, action):
    if action != "view-instance":
        return None
    plugin_config = datasette.plugin_config("datasette-indieauth") or {}
    if plugin_config.get("restrict_access") is None:
        return None
    # Only actors in the list are allowed
    if not actor:
        return False
    allowed_actors = plugin_config["restrict_access"]
    if isinstance(allowed_actors, str):
        allowed_actors = allowed_actors.split()
    return actor.get("me") in allowed_actors
 
 
@hookimpl
def forbidden(request, datasette):
    async def inner():
        return await indieauth_page(request, datasette, 403)
 
    return inner
 
Powered by Datasette