home

Menu
  • ripgrep search

datasette-atom/datasette_atom/__init__.py

import bleach
from datasette import hookimpl, __version__
from datasette.utils.asgi import Response
from feedgen.feed import FeedGenerator
import hashlib
import html
 
REQUIRED_COLUMNS = {"atom_id", "atom_updated", "atom_title"}
 
 
@hookimpl
def register_output_renderer():
    return {"extension": "atom", "render": render_atom, "can_render": can_render_atom}
 
 
def render_atom(
    datasette, request, sql, columns, rows, database, table, query_name, view_name, data
):
    from datasette.views.base import DatasetteError
 
    if not REQUIRED_COLUMNS.issubset(columns):
        raise DatasetteError(
            "SQL query must return columns {}".format(", ".join(REQUIRED_COLUMNS)),
            status=400,
        )
    fg = FeedGenerator()
    fg.generator(
        generator="Datasette",
        version=__version__,
        uri="https://github.com/simonw/datasette",
    )
    fg.id(request.url)
    fg.link(href=request.url, rel="self")
    fg.updated(max(row["atom_updated"] for row in rows))
    title = request.args.get("_feed_title", sql)
    if table:
        title += "/" + table
    if data.get("human_description_en"):
        title += ": " + data["human_description_en"]
    # If this is a canned query the configured title for that over-rides all others
    if query_name:
        try:
            title = datasette.metadata(database=database)["queries"][query_name][
                "title"
            ]
        except (KeyError, TypeError):
            pass
    fg.title(title)
 
    clean_function = clean
    if query_name:
        # Check allow_unsafe_html_in_canned_queries
        plugin_config = datasette.plugin_config("datasette-atom")
        if plugin_config:
            allow_unsafe_html_in_canned_queries = plugin_config.get(
                "allow_unsafe_html_in_canned_queries"
            )
            if allow_unsafe_html_in_canned_queries is True:
                clean_function = lambda s: s
            elif isinstance(allow_unsafe_html_in_canned_queries, dict):
                allowlist = allow_unsafe_html_in_canned_queries.get(database) or []
                if query_name in allowlist:
                    clean_function = lambda s: s
 
    # And the rows
    for row in reversed(rows):
        entry = fg.add_entry()
        entry.id(str(row["atom_id"]))
        if "atom_content_html" in columns:
            entry.content(clean_function(row["atom_content_html"]), type="html")
        elif "atom_content" in columns:
            entry.content(row["atom_content"], type="text")
        entry.updated(row["atom_updated"])
        entry.title(str(row["atom_title"]))
        # atom_link is optional
        if "atom_link" in columns:
            entry.link(href=row["atom_link"])
        if "atom_author_name" in columns and row["atom_author_name"]:
            author = {
                "name": row["atom_author_name"],
            }
            for key in ("uri", "email"):
                colname = "atom_author_{}".format(key)
                if colname in columns and row[colname]:
                    author[key] = row[colname]
            entry.author(author)
 
    return Response(
        fg.atom_str(pretty=True),
        content_type="application/xml; charset=utf-8",
        status=200,
    )
 
 
def can_render_atom(columns):
    return REQUIRED_COLUMNS.issubset(columns)
 
 
def clean(html):
    cleaned = bleach.clean(
        html,
        tags=[
            "a",
            "abbr",
            "acronym",
            "b",
            "blockquote",
            "br",
            "code",
            "em",
            "i",
            "li",
            "ol",
            "strong",
            "ul",
            "pre",
            "p",
            "h1",
            "h2",
            "h3",
            "h4",
            "h5",
            "h6",
            "img",
        ],
        attributes={"a": ["href", "title"], "img": ["alt", "src"]},
    )
    return cleaned
 
Powered by Datasette