home

Menu
  • ripgrep search

datasette-auth-github/datasette_auth_github/views.py

from datasette.utils.asgi import Response
from urllib.parse import parse_qsl
from .utils import load_orgs_and_teams
import httpx
 
 
DEPRECATED_KEYS = ("allow_users", "allow_orgs", "allow_teams")
 
 
def verify_config(config):
    config = config or {}
    for key in DEPRECATED_KEYS:
        assert key not in config, "{} is no longer a supported option".format(key)
 
 
async def github_auth_start(datasette):
    config = datasette.plugin_config("datasette-auth-github")
    verify_config(config)
    if config.get("load_teams") or config.get("load_orgs"):
        scope = "read:org"
    else:
        scope = "user:email"
    github_login_url = (
        "https://github.com/login/oauth/authorize?scope={}&client_id={}".format(
            scope, config["client_id"]
        )
    )
    return Response.redirect(github_login_url)
 
 
async def response_error(datasette, error):
    return Response.html(
        await datasette.render_template(
            "datasette_auth_github_error.html", {"error": error}
        ),
        status=500,
    )
 
 
async def github_auth_callback(datasette, request, scope, receive, send):
    config = datasette.plugin_config("datasette-auth-github")
    verify_config(config)
    if not request.args.get("code"):
        return await response_error(datasette, "Authentication failed, no code")
 
    # Exchange that code for a token
    async with httpx.AsyncClient() as client:
        github_response = await client.post(
            "https://github.com/login/oauth/access_token",
            data={
                "client_id": config["client_id"],
                "client_secret": config["client_secret"],
                "code": request.args["code"],
            },
        )
    parsed = dict(parse_qsl(github_response.text))
    # b'error=bad_verification_code&error_description=The+code+passed...'
    if parsed.get("error"):
        return await response_error(
            datasette, parsed["error"] + ": " + (parsed.get("error_description") or "")
        )
    access_token = parsed.get("access_token")
    if not access_token:
        return await response_error(datasette, "No valid access token")
 
    # Use access_token to verify user
    profile_url = "https://api.github.com/user"
    try:
        async with httpx.AsyncClient() as client:
            profile = (
                await client.get(
                    profile_url,
                    headers={"Authorization": "token {}".format(access_token)},
                )
            ).json()
    except ValueError:
        return await response_error(datasette, "Could not load GitHub profile")
    actor = {
        "display": profile["login"],
        "gh_id": str(profile["id"]),
        "gh_name": profile["name"],
        "gh_login": profile["login"],
        "gh_email": profile["email"],
    }
    extras = await load_orgs_and_teams(config, profile, access_token)
    actor.update(extras)
 
    # Set a signed cookie and redirect to homepage
    response = Response.redirect("/")
    response.set_cookie("ds_actor", datasette.sign({"a": actor}, "actor"))
    return response
 
Powered by Datasette