datasette-auth-tokens/datasette_auth_tokens/views.py
from datasette import Forbidden, Response, NotFoundfrom datasette.utils import (tilde_encode,tilde_decode,display_actor,)from .utils import ago_difference, format_permissionsimport datetimeimport jsonimport timeTOKEN_PAGE_SIZE = 30async def create_api_token(request, datasette):await check_permission(datasette, request.actor)if request.method == "GET":return Response.html(await datasette.render_template("create_api_token.html",await _shared(datasette, request),request=request,))elif request.method == "POST":post = await request.post_vars()errors = []expires_after = Noneif post.get("expire_type"):duration_string = post.get("expire_duration")if (not duration_stringor not duration_string.isdigit()or not int(duration_string) > 0):errors.append("Invalid expire duration")else:unit = post["expire_type"]if unit == "minutes":expires_after = int(duration_string) * 60elif unit == "hours":expires_after = int(duration_string) * 60 * 60elif unit == "days":expires_after = int(duration_string) * 60 * 60 * 24else:errors.append("Invalid expire duration unit")# Are there any restrictions?restrict_all = []restrict_database = {}restrict_resource = {}for key in post:if key.startswith("all:") and key.count(":") == 1:restrict_all.append(key.split(":")[1])elif key.startswith("database:") and key.count(":") == 2:bits = key.split(":")database = tilde_decode(bits[1])action = bits[2]restrict_database.setdefault(database, []).append(action)elif key.startswith("resource:") and key.count(":") == 3:bits = key.split(":")database = tilde_decode(bits[1])resource = tilde_decode(bits[2])action = bits[3]restrict_resource.setdefault(database, {}).setdefault(resource, []).append(action)# Reuse Datasette signed tokens mechanism to create parts of the tokenthrowaway_signed_token = datasette.create_token(request.actor["id"],expires_after=expires_after,restrict_all=restrict_all,restrict_database=restrict_database,restrict_resource=restrict_resource,)token_bits = datasette.unsign(throwaway_signed_token[len("dstok_") :], namespace="token")permissions = token_bits.get("_r") or Noneconfig = Config(datasette)db = config.dbcursor = await db.execute_write("""insert into _datasette_auth_tokens(secret_version, description, permissions, actor_id, created_timestamp, expires_after_seconds)values(:secret_version, :description, :permissions, :actor_id, :created_timestamp, :expires_after_seconds)""",{"secret_version": 0,"permissions": json.dumps(permissions),"description": post.get("description") or None,"actor_id": request.actor["id"],"created_timestamp": int(time.time()),"expires_after_seconds": expires_after,},)token = "dsatok_{}".format(datasette.sign(cursor.lastrowid, "dsatok"))context = await _shared(datasette, request)context.update({"errors": errors, "token": token, "token_bits": token_bits})return Response.html(await datasette.render_template("create_api_token.html", context, request=request))else:raise Forbidden("Invalid method")async def check_permission(datasette, actor):if not actor or not actor.get("id"):raise Forbidden("You must be logged in as an actor with an ID to create a token")if not await datasette.permission_allowed(actor, "auth-tokens-create"):raise Forbidden("You do not have permission to create a token")async def _shared(datasette, request):await check_permission(datasette, request.actor)db = Config(datasette).dbtokens_exist = bool((await db.execute("select 1 from _datasette_auth_tokens limit 1")).first())# Build list of databases and tables the user has permission to viewdatabase_with_tables = []for database in datasette.databases.values():if database.name in ("_internal", "_memory"):continueif not await datasette.permission_allowed(request.actor, "view-database", database.name):continuehidden_tables = await database.hidden_table_names()tables = []for table in await database.table_names():if table in hidden_tables:continueif not await datasette.permission_allowed(request.actor,"view-table",resource=(database.name, table),):continuetables.append({"name": table, "encoded": tilde_encode(table)})database_with_tables.append({"name": database.name,"encoded": tilde_encode(database.name),"tables": tables,})return {"actor": request.actor,"all_permissions": [{"name": key, "description": value.description}for key, value in datasette.permissions.items()if keynot in ("auth-tokens-create","auth-tokens-revoke-all","debug-menu","permissions-debug",)],"database_permissions": [{"name": key, "description": value.description}for key, value in datasette.permissions.items()if value.takes_database],"resource_permissions": [{"name": key, "description": value.description}for key, value in datasette.permissions.items()if value.takes_resource],"database_with_tables": database_with_tables,"tokens_exist": tokens_exist,}async def tokens_index(datasette, request):from . import TOKEN_STATUSES, make_expire_functiondb = Config(datasette).db# Expire any tokens that are due for expiringawait db.execute_write_fn(make_expire_function())next = request.args.get("next")where_bits = []params = {}if next:where_bits.append("id <= :next")params["next"] = nextwhere = " and ".join(where_bits)# Users can only see their own tokens, unless they have the# auth-tokens-view-all permissionif not await datasette.permission_allowed(request.actor, "auth-tokens-view-all"):where_bits.append("actor_id = :actor_id")params["actor_id"] = request.actor["id"] if request.actor else Nonetokens = [dict(row)for row in (await db.execute("""select * from _datasette_auth_tokens{where} order by id desc limit {limit}""".format(where="where {}".format(where) if where else "",limit=TOKEN_PAGE_SIZE + 1,),params,)).rows]next = Noneif len(tokens) == TOKEN_PAGE_SIZE + 1:next = tokens[-1]["id"]tokens = tokens[:-1]for token in tokens:token["status"] = TOKEN_STATUSES.get(token["token_status"], token["token_status"])# Resolve actorsactor_ids = set([token["actor_id"] for token in tokens])actors = await datasette.actors_from_ids(list(actor_ids))for token in tokens:actor = actors.get(token["actor_id"])token["actor"] = actortoken["actor_display"] = display_actor(actor) if actor else Nonedef _format_permissions(json_string):return format_permissions(datasette, json.loads(json_string))return Response.html(await datasette.render_template("tokens_index.html",{"tokens": tokens,"next": next,"is_first_page": not bool(request.args.get("next")),"timestamp": _timestamp,"ago_difference": ago_difference,"format_permissions": _format_permissions,"can_create_tokens": await datasette.permission_allowed(request.actor, "auth-tokens-create"),},request=request,))async def token_details(request, datasette):from . import TOKEN_STATUSESconfig = Config(datasette)db = config.dbid = request.url_vars["id"]async def fetch_row():return (await db.execute("select * from _datasette_auth_tokens where id = ?", (id,))).first()row = await fetch_row()if row is None:raise NotFound("Token not found")# User can manage if they own the token or they have auth-tokens-revoke-allif not await actor_can_view(datasette, request.actor, row["actor_id"]):raise Forbidden("You do not have permission to manage this token")can_revoke = await actor_can_revoke(datasette, request.actor, row["actor_id"])if (row["expires_after_seconds"]and (row["created_timestamp"] + row["expires_after_seconds"]) < time.time()):await db.execute_write("update _datasette_auth_tokens set token_status='E' where id=:token_id",{"token_id": id},)row = await fetch_row()if request.method == "POST":post_vars = await request.post_vars()if post_vars.get("revoke"):if not can_revoke:raise Forbidden("You do not have permission to revoke this token")else:await db.execute_write("""update _datasette_auth_tokenssettoken_status = 'R',ended_timestamp = :nowwhere id = :id""",{"id": id, "now": int(time.time())},)return Response.redirect(request.path)restrictions = "None"permissions = json.loads(row["permissions"])if permissions:restrictions = format_permissions(datasette, permissions)actors = await datasette.actors_from_ids([row["actor_id"]])actor_display = Noneif actors and actors.get(row["actor_id"]):actor_display = display_actor(actors[row["actor_id"]])return Response.html(await datasette.render_template("token_details.html",{"token": row,"actor_display": actor_display,"token_status": TOKEN_STATUSES.get(row["token_status"], row["token_status"]),"timestamp": _timestamp,"ago_difference": ago_difference,"restrictions": restrictions,"can_revoke": can_revoke,},request=request,))def _timestamp(ts):if ts:return datetime.datetime.fromtimestamp(ts).isoformat()else:return ""async def actor_can_view(datasette, actor, token_actor_id):if not actor or not actor.get("id"):# Only works for actors that have an ID setreturn Falseif token_actor_id and str(token_actor_id) == str(actor.get("id")):return True# User with auth-tokens-view-all can view any tokenreturn await datasette.permission_allowed(actor, "auth-tokens-view-all")async def actor_can_revoke(datasette, actor, token_actor_id):if not actor or not actor.get("id"):# Only works for actors that have an ID setreturn Falseif token_actor_id and str(token_actor_id) == str(actor.get("id")):return True# User with auth-tokens-revoke-all can revoke any tokenreturn await datasette.permission_allowed(actor, "auth-tokens-revoke-all")class Config:def __init__(self, datasette):self._plugin_config = datasette.plugin_config("datasette-auth-tokens") or {}self._datasette = datasetteself.enabled = self._plugin_config.get("manage_tokens")def get(self, key):return self._plugin_config.get(key)@propertydef db(self):db_name = self._plugin_config.get("manage_tokens_database") or Noneif db_name is None:return self._datasette.get_internal_database()else:return self._datasette.get_database(db_name)