datasette-atom/datasette_atom/__init__.py
import bleachfrom datasette import hookimpl, __version__from datasette.utils.asgi import Responsefrom feedgen.feed import FeedGeneratorimport hashlibimport htmlREQUIRED_COLUMNS = {"atom_id", "atom_updated", "atom_title"}@hookimpldef register_output_renderer():return {"extension": "atom", "render": render_atom, "can_render": can_render_atom}def render_atom(datasette, request, sql, columns, rows, database, table, query_name, view_name, data):from datasette.views.base import DatasetteErrorif not REQUIRED_COLUMNS.issubset(columns):raise DatasetteError("SQL query must return columns {}".format(", ".join(REQUIRED_COLUMNS)),status=400,)fg = FeedGenerator()fg.generator(generator="Datasette",version=__version__,uri="https://github.com/simonw/datasette",)fg.id(request.url)fg.link(href=request.url, rel="self")fg.updated(max(row["atom_updated"] for row in rows))title = request.args.get("_feed_title", sql)if table:title += "/" + tableif data.get("human_description_en"):title += ": " + data["human_description_en"]# If this is a canned query the configured title for that over-rides all othersif query_name:try:title = datasette.metadata(database=database)["queries"][query_name]["title"]except (KeyError, TypeError):passfg.title(title)clean_function = cleanif query_name:# Check allow_unsafe_html_in_canned_queriesplugin_config = datasette.plugin_config("datasette-atom")if plugin_config:allow_unsafe_html_in_canned_queries = plugin_config.get("allow_unsafe_html_in_canned_queries")if allow_unsafe_html_in_canned_queries is True:clean_function = lambda s: selif isinstance(allow_unsafe_html_in_canned_queries, dict):allowlist = allow_unsafe_html_in_canned_queries.get(database) or []if query_name in allowlist:clean_function = lambda s: s# And the rowsfor row in reversed(rows):entry = fg.add_entry()entry.id(str(row["atom_id"]))if "atom_content_html" in columns:entry.content(clean_function(row["atom_content_html"]), type="html")elif "atom_content" in columns:entry.content(row["atom_content"], type="text")entry.updated(row["atom_updated"])entry.title(str(row["atom_title"]))# atom_link is optionalif "atom_link" in columns:entry.link(href=row["atom_link"])if "atom_author_name" in columns and row["atom_author_name"]:author = {"name": row["atom_author_name"],}for key in ("uri", "email"):colname = "atom_author_{}".format(key)if colname in columns and row[colname]:author[key] = row[colname]entry.author(author)return Response(fg.atom_str(pretty=True),content_type="application/xml; charset=utf-8",status=200,)def can_render_atom(columns):return REQUIRED_COLUMNS.issubset(columns)def clean(html):cleaned = bleach.clean(html,tags=["a","abbr","acronym","b","blockquote","br","code","em","i","li","ol","strong","ul","pre","p","h1","h2","h3","h4","h5","h6","img",],attributes={"a": ["href", "title"], "img": ["alt", "src"]},)return cleaned