datasette-ripgrep/datasette_ripgrep/__init__.py
from datasette import hookimpl
from datasette.utils.asgi import Response, Forbidden
import asyncio
import json
from pathlib import Path
import urllib
async def run_ripgrep(
pattern,
path,
globs=None,
time_limit=1.0,
max_lines=2000,
ignore=False,
literal=False,
context=2,
):
args = ["-e", pattern, path, "--json"]
if context:
args.extend(["-C", str(context)])
if ignore:
args.append("-i")
if literal:
args.append("-F")
if globs:
for glob in globs:
args.extend(["--glob", glob])
proc = await asyncio.create_subprocess_exec(
"rg",
*args,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
limit=1024 * 1024,
cwd=path,
)
max_lines_hit = False
time_limit_hit = False
results = []
async def inner(results):
nonlocal max_lines_hit
while True:
try:
line = await proc.stdout.readline()
except (asyncio.exceptions.LimitOverrunError, ValueError):
# Skip 'Separator is not found, and chunk exceed the limit' lines
continue
if line == b"":
break
try:
results.append(json.loads(line))
except json.decoder.JSONDecodeError:
# Usually this means a really long line which was
# truncated as invalid JSON
print(line)
if len(results) >= max_lines:
max_lines_hit = True
break
results = []
try:
await asyncio.wait_for(inner(results), timeout=time_limit)
except asyncio.TimeoutError:
time_limit_hit = True
try:
proc.kill()
except OSError:
# Ignore 'no such process' error
pass
# We should have accumulated some results anyway
return results, time_limit_hit
async def ripgrep(request, datasette):
await check_permission(request, datasette)
pattern = (request.args.get("pattern") or "").strip()
ignore = request.args.get("ignore")
literal = request.args.get("literal")
globs = [g.strip() for g in request.args.getlist("glob") if g.strip()]
config = datasette.plugin_config("datasette-ripgrep") or {}
time_limit = config.get("time_limit") or 1.0
max_lines = config.get("max_lines") or 2000
path = config.get("path")
if not path:
return Response.html("The path plugin configuration is required.", status=500)
results = []
time_limit_hit = False
if pattern:
results, time_limit_hit = await run_ripgrep(
pattern,
path,
globs=globs,
time_limit=time_limit,
max_lines=max_lines,
ignore=ignore,
literal=literal,
)
def fix_path(path_):
return str(Path(path_).relative_to(path))
try:
widest_line_number = len(
str(
max(
result["data"]["line_number"]
for result in results
if "line_number" in result["data"]
)
)
)
except ValueError:
# max() arg is an empty sequence
widest_line_number = 1
return Response.html(
await datasette.render_template(
"ripgrep.html",
{
"pattern": pattern,
"results": results,
"fix_path": fix_path,
"time_limit_hit": time_limit_hit,
"url_quote": urllib.parse.quote,
"literal": literal,
"ignore": ignore,
"globs": globs,
"widest_line_number": widest_line_number,
},
request=request,
)
)
async def view_file(request, datasette):
await check_permission(request, datasette)
config = datasette.plugin_config("datasette-ripgrep") or {}
subpath = urllib.parse.unquote(request.url_vars["subpath"])
path = config.get("path")
if not path:
return Response.html("The path plugin configuration is required.", status=500)
filepath = Path(path) / subpath
filepath = filepath.resolve()
# Make absolutely sure it's still inside the root
if not str(filepath).startswith(str(path)):
return Response.html("File must be inside path directory", status=403)
if not filepath.exists():
return Response.text("File not found: {}".format(subpath), status=404)
lines = filepath.read_text().split("\n")
widest_line_number = len(str(len(lines) + 1))
return Response.html(
await datasette.render_template(
"ripgrep_view_file.html",
{
"subpath": subpath,
"lines": enumerate(lines),
"widest_line_number": widest_line_number,
},
request=request,
)
)
async def check_permission(request, datasette):
if (
await datasette.permission_allowed(
request.actor,
"view-instance",
default=None,
)
) is False:
raise Forbidden("view-instance denied")
@hookimpl
def register_routes():
return (
("^/-/ripgrep$", ripgrep),
("^/-/ripgrep/view/(?P<subpath>.*)$", view_file),
)
@hookimpl
def menu_links(datasette, actor):
config = datasette.plugin_config("datasette-ripgrep") or {}
if not config.get("path"):
return
return [
{"href": datasette.urls.path("/-/ripgrep"), "label": "ripgrep search"},
]