home

Menu
  • ripgrep search

dogsheep-beta/dogsheep_beta/__init__.py

from datasette import hookimpl
from dogsheep_beta.utils import parse_metadata
import html
import urllib
from jinja2 import Template
import json
 
TIMELINE_SQL = """
select
  search_index.rowid,
  search_index.type,
  search_index.key,
  search_index.title,
  search_index.category,
  search_index.timestamp,
  search_index.search_1
from
  search_index
{where}
  {where_clauses}
order by
  {order_by}
limit 40
"""
 
SEARCH_SQL = """
select
  search_index_fts.rank,
  search_index.rowid,
  search_index.type,
  search_index.key,
  search_index.title,
  search_index.category,
  search_index.timestamp,
  search_index.search_1
from
  search_index join search_index_fts on search_index.rowid = search_index_fts.rowid
{where}
  {where_clauses}
order by
  {order_by}
limit 100
"""
FILTER_COLS = ("type", "category", "is_public")
SORT_ORDERS = {
    "oldest": "search_index.timestamp",
    "newest": "search_index.timestamp desc",
}
 
 
class InnerResponseError(Exception):
    pass
 
 
async def beta(request, datasette):
    from datasette.utils.asgi import Response
    from datasette.utils import path_with_removed_args, path_with_replaced_args
 
    config = datasette.plugin_config("dogsheep-beta") or {}
    database_name = config.get("database") or datasette.get_database().name
    dogsheep_beta_config_file = config["config_file"]
    template_debug = bool(config.get("template_debug"))
    rules = parse_metadata(open(dogsheep_beta_config_file).read())
    q = (request.args.get("q") or "").strip()
    sorted_by = "relevance" if q else "newest"
    if request.args.get("sort") in SORT_ORDERS:
        sorted_by = request.args["sort"]
    other_sort_orders = []
    for sort_order in ("relevance", "newest", "oldest"):
        if not q and sort_order == "relevance":
            continue
        if sort_order != sorted_by:
            other_sort_orders.append(
                {
                    "label": sort_order,
                    "url": path_with_replaced_args(request, {"sort": sort_order})
                    if sort_order != "relevance"
                    else path_with_removed_args(request, {"sort"}),
                }
            )
    results = []
    facets = {}
    count = None
 
    results = await search(datasette, database_name, request)
    count, facets = await get_count_and_facets(datasette, database_name, request)
    await process_results(datasette, results, rules, q, template_debug)
 
    hiddens = [
        {"name": column, "value": request.args[column]}
        for column in FILTER_COLS
        if column in request.args
    ]
    return Response.html(
        await datasette.render_template(
            "beta.html",
            {
                "q": q or "",
                "count": count,
                "results": results,
                "facets": facets,
                "hiddens": hiddens,
                "sorted_by": sorted_by,
                "other_sort_orders": other_sort_orders,
            },
            request=request,
        )
    )
 
 
async def search(datasette, database_name, request):
    from datasette.utils import sqlite3, escape_fts
 
    database = datasette.get_database(database_name)
    q = (request.args.get("q") or "").strip()
 
    if q:
        default_sort = "search_index_fts.rank, search_index.timestamp desc"
    else:
        default_sort = "search_index.timestamp desc"
    order_by = SORT_ORDERS.get(request.args.get("sort"), default_sort)
 
    params = {"query": q}
    where_clauses = []
    if request.args.get("timestamp__date"):
        where_clauses.append('date("timestamp") = :date')
        params["date"] = request.args["timestamp__date"]
    sql = TIMELINE_SQL
    if q:
        sql = SEARCH_SQL
        where_clauses.append("search_index_fts match :query")
    for arg in FILTER_COLS:
        if arg in request.args:
            where_clauses.append("[{arg}]=:{arg}".format(arg=arg))
            params[arg] = request.args[arg]
    sql_to_execute = sql.format(
        where=" where " if where_clauses else "",
        where_clauses=" and ".join(where_clauses),
        order_by=order_by,
    )
    try:
        results = await database.execute(sql_to_execute, params)
    except sqlite3.OperationalError as e:
        params["query"] = escape_fts(params["query"])
        results = await database.execute(sql_to_execute, params)
    return [dict(r) for r in results.rows]
 
 
async def process_results(datasette, results, rules, q, template_debug=False):
    # Adds a 'display' property with HTML to the results
    templates_by_type = {}
    rules_by_type = {}
    for db_name, types in rules.items():
        for type_, meta in types.items():
            rules_by_type["{}/{}".format(db_name, type_)] = meta
 
    for result in results:
        type_ = result["type"]
        meta = rules_by_type[type_]
        result["display"] = {}
        if meta.get("display_sql"):
            db = datasette.get_database(type_.split(".")[0])
            display_results = await db.execute(
                meta["display_sql"], {"key": result["key"], "q": q}
            )
            first = display_results.first()
            if first:
                result["display"] = dict(first)
        output = None
        if meta.get("display"):
            if type_ not in templates_by_type:
                compiled = Template(meta["display"], autoescape=True)
                templates_by_type[type_] = compiled
            else:
                compiled = templates_by_type[type_]
            try:
                output = compiled.render({**result, **{"json": json}})
            except Exception as e:
                if not template_debug:
                    raise
                output = '<p style="color: red">{}</p><pre>{}</pre><p>Template:</p><pre>{}</pre>'.format(
                    html.escape(str(e)),
                    html.escape(json.dumps(result, default=repr, indent=4)),
                    html.escape(meta["display"]),
                )
        else:
            output = "<pre>{}</pre>".format(
                html.escape(json.dumps(result, default=repr, indent=4))
            )
        result["output"] = output
 
 
async def get_count_and_facets(datasette, database_name, request):
    from datasette.utils.asgi import Request, Response
    from datasette.utils import sqlite3, escape_fts
 
    q = (request.args.get("q") or "").strip()
    timestamp__date = request.args.get("timestamp__date") or ""
 
    async def execute_search(searchmode_raw):
        args = {
            "_facet": ["type", "category", "is_public"],
            "_facet_date": ["timestamp"],
            "_size": 0,
        }
        if q:
            args["_search"] = q
            if searchmode_raw:
                args["_searchmode"] = "raw"
        if timestamp__date:
            args["timestamp__date"] = timestamp__date
        for column in FILTER_COLS:
            if column in request.args:
                args[column] = request.args[column]
 
        path_with_query_string = "/{}/search_index.json?{}".format(
            database_name,
            urllib.parse.urlencode(
                args,
                doseq=True,
            ),
        )
        inner_response = await datasette.client.get(
            path_with_query_string, cookies=request.cookies
        )
        if inner_response.status_code != 200:
            raise InnerResponseError(inner_response.status_code)
        data = inner_response.json()
        count, facets = data["filtered_table_rows_count"], data["facet_results"]
        return count, facets
 
    try:
        count, facets = await execute_search(True)
    except InnerResponseError as e:
        count, facets = await execute_search(False)
 
    facets = facets.values()
    # Rewrite toggle_url on facet_results
    # ../search_index.json?_search=wolf&_facet=type&_facet=category&_facet=is_public&_size=0&category=2
    for facet in facets:
        for result in facet["results"]:
            bits = urllib.parse.urlparse(result["toggle_url"])
            qs_bits = dict(urllib.parse.parse_qsl(bits.query))
            to_remove = [k for k in qs_bits if k.startswith("_")]
            for k in to_remove:
                qs_bits.pop(k)
            qs_bits["q"] = q
            result["toggle_url"] = "?" + urllib.parse.urlencode(qs_bits)
    return count, facets
 
 
@hookimpl
def register_routes():
    return [("/-/beta", beta)]
 
 
@hookimpl
def extra_template_vars():
    return {"intcomma": lambda s: "{:,}".format(int(s))}
 
Powered by Datasette