datasette-auth-passwords/datasette_auth_passwords/__init__.py
from datasette import hookimpl
from datasette.utils.asgi import Response
from .utils import hash_password, verify_password, scope_has_valid_authorization
import click
from functools import wraps
import sys
async def password_tool(request, datasette):
post_vars = await request.post_vars()
password = post_vars.get("password")
hashed_password = None
if password:
hashed_password = hash_password(password)
return Response.html(
await datasette.render_template(
"password_tool.html",
{
"hashed_password": hashed_password,
},
request=request,
)
)
async def password_login(request, datasette):
config = datasette.plugin_config("datasette-auth-passwords") or {}
accounts = {
key.split("_password_hash")[0]: value
for key, value in config.items()
if key.endswith("_password_hash")
}
actors = config.get("actors") or {}
error = None
if not accounts:
error = "This instance does not have any configured accounts"
post_vars = await request.post_vars()
username = post_vars.get("username") or ""
password = post_vars.get("password") or ""
if request.method == "POST":
# Look up user
password_hash = accounts.get(username)
if password_hash and verify_password(password, password_hash):
actor = actors.get(username) or {"id": username}
response = Response.redirect("/")
response.set_cookie("ds_actor", datasette.sign({"a": actor}, "actor"))
return response
else:
error = "Invalid username or password"
return Response.html(
await datasette.render_template(
"password_login.html", {"error": error}, request=request
)
)
@hookimpl
def register_routes():
return [
(r"^/-/password-tool$", password_tool),
(r"^/-/login$", password_login),
]
@hookimpl
def asgi_wrapper(datasette):
config = datasette.plugin_config("datasette-auth-passwords") or {}
if not config.get("http_basic_auth"):
return lambda asgi: asgi
def wrap(app):
@wraps(app)
async def require_authorization(scope, recieve, send):
if scope["type"] == "http":
actor = scope_has_valid_authorization(scope, datasette)
if actor is None:
return await Response.text(
"401 Authorization Required",
headers={
"www-authenticate": 'Basic realm="Datasette", charset="UTF-8"'
},
status=401,
).asgi_send(send)
await app(scope, recieve, send)
return require_authorization
return wrap
@hookimpl
def actor_from_request(datasette, request):
actor = scope_has_valid_authorization(request.scope, datasette)
if actor is not None:
return actor
@hookimpl
def register_commands(cli):
def no_confirm(ctx, param, value):
if not value or ctx.resilient_parsing:
return
click.echo(hash_password(sys.stdin.read().strip()))
ctx.exit()
@cli.command(name="hash-password")
@click.password_option()
@click.option(
"--no-confirm",
is_flag=True,
callback=no_confirm,
expose_value=False,
is_eager=True,
)
def _hash_password(password):
"Return hash for provided password"
click.echo(hash_password(password))
@hookimpl
def menu_links(datasette, actor):
if not actor:
return [
{
"href": datasette.urls.path("/-/login"),
"label": "Log in",
},
]