datasette-ripgrep/datasette_ripgrep/__init__.py
from datasette import hookimplfrom datasette.utils.asgi import Response, Forbiddenimport asyncioimport jsonfrom pathlib import Pathimport urllibasync def run_ripgrep(pattern,path,globs=None,time_limit=1.0,max_lines=2000,ignore=False,literal=False,context=2,):args = ["-e", pattern, path, "--json"]if context:args.extend(["-C", str(context)])if ignore:args.append("-i")if literal:args.append("-F")if globs:for glob in globs:args.extend(["--glob", glob])proc = await asyncio.create_subprocess_exec("rg",*args,stdout=asyncio.subprocess.PIPE,stdin=asyncio.subprocess.PIPE,limit=1024 * 1024,cwd=path,)max_lines_hit = Falsetime_limit_hit = Falseresults = []async def inner(results):nonlocal max_lines_hitwhile True:try:line = await proc.stdout.readline()except (asyncio.exceptions.LimitOverrunError, ValueError):# Skip 'Separator is not found, and chunk exceed the limit' linescontinueif line == b"":breaktry:results.append(json.loads(line))except json.decoder.JSONDecodeError:# Usually this means a really long line which was# truncated as invalid JSONprint(line)if len(results) >= max_lines:max_lines_hit = Truebreakresults = []try:await asyncio.wait_for(inner(results), timeout=time_limit)except asyncio.TimeoutError:time_limit_hit = Truetry:proc.kill()except OSError:# Ignore 'no such process' errorpass# We should have accumulated some results anywayreturn results, time_limit_hitasync def ripgrep(request, datasette):await check_permission(request, datasette)pattern = (request.args.get("pattern") or "").strip()ignore = request.args.get("ignore")literal = request.args.get("literal")globs = [g.strip() for g in request.args.getlist("glob") if g.strip()]config = datasette.plugin_config("datasette-ripgrep") or {}time_limit = config.get("time_limit") or 1.0max_lines = config.get("max_lines") or 2000path = config.get("path")if not path:return Response.html("The path plugin configuration is required.", status=500)results = []time_limit_hit = Falseif pattern:results, time_limit_hit = await run_ripgrep(pattern,path,globs=globs,time_limit=time_limit,max_lines=max_lines,ignore=ignore,literal=literal,)def fix_path(path_):return str(Path(path_).relative_to(path))try:widest_line_number = len(str(max(result["data"]["line_number"]for result in resultsif "line_number" in result["data"])))except ValueError:# max() arg is an empty sequencewidest_line_number = 1return Response.html(await datasette.render_template("ripgrep.html",{"pattern": pattern,"results": results,"fix_path": fix_path,"time_limit_hit": time_limit_hit,"url_quote": urllib.parse.quote,"literal": literal,"ignore": ignore,"globs": globs,"widest_line_number": widest_line_number,},request=request,))async def view_file(request, datasette):await check_permission(request, datasette)config = datasette.plugin_config("datasette-ripgrep") or {}subpath = urllib.parse.unquote(request.url_vars["subpath"])path = config.get("path")if not path:return Response.html("The path plugin configuration is required.", status=500)filepath = Path(path) / subpathfilepath = filepath.resolve()# Make absolutely sure it's still inside the rootif not str(filepath).startswith(str(path)):return Response.html("File must be inside path directory", status=403)if not filepath.exists():return Response.text("File not found: {}".format(subpath), status=404)lines = filepath.read_text().split("\n")widest_line_number = len(str(len(lines) + 1))return Response.html(await datasette.render_template("ripgrep_view_file.html",{"subpath": subpath,"lines": enumerate(lines),"widest_line_number": widest_line_number,},request=request,))async def check_permission(request, datasette):if (await datasette.permission_allowed(request.actor,"view-instance",default=None,)) is False:raise Forbidden("view-instance denied")@hookimpldef register_routes():return (("^/-/ripgrep$", ripgrep),("^/-/ripgrep/view/(?P<subpath>.*)$", view_file),)@hookimpldef menu_links(datasette, actor):config = datasette.plugin_config("datasette-ripgrep") or {}if not config.get("path"):returnreturn [{"href": datasette.urls.path("/-/ripgrep"), "label": "ripgrep search"},]