datasette-scale-to-zero/datasette_scale_to_zero/__init__.py
import asynciofrom re import Sfrom datasette import hookimplfrom functools import wrapsfrom time import monotonicimport loggingimport sys@hookimpldef startup(datasette):# Verify that the config is validget_config(datasette, "duration")get_config(datasette, "max-age")@hookimpldef asgi_wrapper(datasette):duration = get_config(datasette, "duration")max_age = get_config(datasette, "max-age")def wrap_with_scale_to_zero(app):if duration is None and max_age is None:return app@wraps(app)async def record_last_request(scope, receive, send):if not hasattr(datasette, "_scale_to_zero_last_asgi"):start_that_loop(datasette)datasette._scale_to_zero_last_asgi = monotonic()await app(scope, receive, send)return record_last_requestreturn wrap_with_scale_to_zerodef start_that_loop(datasette):duration = get_config(datasette, "duration")max_age = get_config(datasette, "max-age")if duration is None and max_age is None:returnasync def check_if_server_should_exit():server_start = monotonic()while True:await asyncio.sleep(1)last_asgi = getattr(datasette, "_scale_to_zero_last_asgi", None)should_exit = Falseif duration is not None and last_asgi is not None:# Have there been no reuests for longer than duration?if monotonic() - last_asgi > duration:should_exit = Trueif max_age is not None:# Has server been running for longer than max_age?if monotonic() - server_start > max_age:should_exit = Trueif should_exit:# Avoid logging a traceback when the server exits# https://github.com/simonw/datasette-scale-to-zero/issues/2logger = logging.getLogger("uvicorn.error")logger.disabled = Trueloop.call_soon(sys.exit, 0)loop = asyncio.get_running_loop()loop.create_task(check_if_server_should_exit())def get_config(datasette, key):duration_s = (datasette.plugin_config("datasette-scale-to-zero") or {}).get(key)if duration_s is None:return Noneinvalid_duration_message = ("{} must be a number followed by a unit (s, m, h)".format(key))if not isinstance(duration_s, str):raise ValueError(invalid_duration_message)unit = duration_s[-1]digits = duration_s[:-1]if not digits.isdigit():raise ValueError(invalid_duration_message)duration = int(digits)if unit == "s":return durationelif unit == "m":return duration * 60elif unit == "h":return duration * 60 * 60else:raise ValueError("Invalid {}".format(key))