datasette-media/tests/test_media.py
from asgiref.testing import ApplicationCommunicatorfrom datasette.app import Datasettefrom sqlite_utils import Databasefrom PIL import Imageimport ioimport pathlibimport pytestimport httpx@pytest.mark.asyncioasync def test_media_filepath(tmpdir):filepath = tmpdir / "hello.txt"filepath.write_text("hello", "utf-8")app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select '{}' as filepath".format(filepath)}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/photos/key")assert 200 == response.status_codeassert "hello" == response.content.decode("utf8")assert "text/plain" == response.headers["content-type"]@pytest.mark.asyncio@pytest.mark.parametrize("content,content_type", [("hello", "text/plain"), ("GIF", "image/gif")])async def test_media_blob(tmpdir, content, content_type):app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"text": {"sql": "select '{}' as content, '{}' as content_type".format(content, content_type)}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/text/key")assert 200 == response.status_codeassert content == response.content.decode("utf8")assert content_type == response.headers["content-type"]@pytest.mark.asyncioasync def test_media_blob_404():ds = Datasette(metadata={"plugins": {"datasette-media": {"text": {"sql": "select '' as content, 'image/gif' as content_type"}}}})response = await ds.client.get("/-/media/text/key")assert response.status_code == 404assert response.content.startswith(b"\x89PNG")@pytest.mark.asyncioasync def test_media_content_url(httpx_mock):jpeg = pathlib.Path(__file__).parent / "example.jpg"httpx_mock.add_response(content=jpeg.open("rb").read(), headers={"Content-Type": "image/jpeg"})app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select 'http://example/example.jpg' as content_url"}}}},).app()status_code, headers, body = await request(app, "/-/media/photos/1")assert 200 == status_codeimage = Image.open(io.BytesIO(body))assert image.size == (313, 234)assert "JPEG" == image.format@pytest.mark.asyncioasync def test_media_content_url_transform(httpx_mock):jpeg = pathlib.Path(__file__).parent / "example.jpg"httpx_mock.add_response(content=jpeg.open("rb").read(),headers={"Content-Type": "image/jpeg"},)app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select 'http://example/example.jpg' as content_url, 100 as resize_width","enable_transform": True,}}}},).app()status_code, headers, body = await request(app, "/-/media/photos/2?format=PNG")assert 200 == status_codeimage = Image.open(io.BytesIO(body))assert image.size == (100, 74)assert "PNG" == image.format@pytest.mark.asyncioasync def test_database_option(tmpdir):filepath = tmpdir / "hello.txt"filepath.write_text("hello2", "utf-8")one = str(tmpdir / "one.db")two = str(tmpdir / "two.db")Database(one)["t"].insert({"hello": 1})Database(two)["photos"].insert({"pk": 1, "filepath": str(filepath)})app = Datasette([one, two],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select filepath from photos where pk=:key","database": "two",}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/photos/1")assert 200 == response.status_codeassert "hello2" == response.content.decode("utf8")@pytest.mark.asyncio@pytest.mark.parametrize("extra_sql,expected_width,expected_height",(("", 313, 234),(", 99 as resize_width", 99, 74),(", 99 as resize_height", 132, 99),),)async def test_sql_resize(extra_sql, expected_width, expected_height):jpeg = str(pathlib.Path(__file__).parent / "example.jpg")app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select '{}' as filepath{}".format(jpeg, extra_sql)}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/photos/1")assert 200 == response.status_codeimage = Image.open(io.BytesIO(response.content))actual_width, actual_height = image.sizeassert (expected_width, expected_height) == (actual_width, actual_height)assert "JPEG" == image.format@pytest.mark.asyncioasync def test_sql_convert_filepath():jpeg = str(pathlib.Path(__file__).parent / "example.jpg")app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select '{}' as filepath, 'png' as output_format".format(jpeg)}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/photos/1")assert 200 == response.status_codeimage = Image.open(io.BytesIO(response.content))assert (313, 234) == image.sizeassert "PNG" == image.format@pytest.mark.asyncioasync def test_sql_convert_blob(tmpdir):jpeg = pathlib.Path(__file__).parent / "example.jpg"db_path = tmpdir / "photos.db"Database(str(db_path))["photos"].insert({"content": jpeg.open("rb").read(),})app = Datasette([db_path],metadata={"plugins": {"datasette-media": {"photos": {"sql": "select content, 'png' as output_format, 100 as resize_width from photos"}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/photos/1")assert 200 == response.status_codeimage = Image.open(io.BytesIO(response.content))assert image.size == (100, 74)assert "PNG" == image.format@pytest.mark.parametrize("custom_limit,enabled,args,expected_dimensions,expected_format",((None, False, {"w": 100}, (313, 234), "JPEG"),(None, True, {"w": 100}, (100, 74), "JPEG"),(None, True, {"format": "png"}, (313, 234), "PNG"),(None, True, {"h": 100}, (133, 100), "JPEG"),(None, True, {"h": 3999}, (5349, 3999), "JPEG"),(None, True, {"h": 4000}, (313, 234), "JPEG"),(4001, True, {"h": 4000}, (5350, 4000), "JPEG"),),)@pytest.mark.asyncioasync def test_transform_query_string(custom_limit, enabled, args, expected_dimensions, expected_format):jpeg = str(pathlib.Path(__file__).parent / "example.jpg")app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"photos": {"sql": "select '{}' as filepath".format(jpeg),"enable_transform": enabled,"max_width_height": custom_limit,}}}},).app()async with httpx.AsyncClient(app=app) as client:response = await client.get("http://localhost/-/media/photos/1", params=args)assert 200 == response.status_codeimage = Image.open(io.BytesIO(response.content))assert expected_dimensions == image.sizeassert expected_format == image.format@pytest.mark.parametrize("path,expected_size",[("/-/media/proxied/1", (313, 234)),("/-/media/proxied_transformed/1", (100, 74)),("/-/media/blob/1", (313, 234)),("/-/media/on_disk/1", (313, 234)),],)@pytest.mark.asyncioasync def test_content_filename(path, expected_size, httpx_mock):jpeg = pathlib.Path(__file__).parent / "example.jpg"jpeg_bytes = jpeg.open("rb").read()if "proxied" in path:httpx_mock.add_response(content=jpeg_bytes, headers={"Content-Type": "image/jpeg"})app = Datasette([],memory=True,metadata={"plugins": {"datasette-media": {"proxied": {"sql": "select 'http://blah/' as content_url, 'x.jpg' as content_filename"},"proxied_transformed": {"sql": "select 'http://blah/' as content_url, 'x.jpg' as content_filename, 100 as resize_width"},"blob": {"sql": "select X'{}' as content, 'x.jpg' as content_filename".format(jpeg_bytes.hex())},"on_disk": {"sql": "select '{}' as filepath, 'x.jpg' as content_filename".format(jpeg)},}}},).app()status_code, headers, body = await request(app, path)assert 200 == status_codecontent_disposition = [headers[k] for k in headers if k.lower() == "content-disposition"][0]assert content_disposition == 'attachment; filename="x.jpg"'image = Image.open(io.BytesIO(body))assert expected_size == image.sizeassert "JPEG" == image.formatasync def request(app, path):# Sometimes we can't use httpx.AsyncClient to execute the test, because# we've mocked it using httpx_mock - so we do it the harder way insteadif "?" in path:path, query_string = path.split("?", 1)query_string = query_string.encode("utf-8")else:query_string = b""scope = {"type": "http","http_version": "1.0","method": "GET","path": path,"raw_path": path.encode("utf-8"),"query_string": query_string,"headers": [],}instance = ApplicationCommunicator(app, scope)await instance.send_input({"type": "http.request"})messages = []start = await instance.receive_output(2)messages.append(start)assert start["type"] == "http.response.start"headers = dict([(k.decode("utf8"), v.decode("utf8")) for k, v in start["headers"]])status_code = start["status"]# Loop until we run out of response.bodybody = b""while True:message = await instance.receive_output(2)messages.append(message)assert message["type"] == "http.response.body"body += message["body"]if not message.get("more_body"):breakreturn status_code, headers, body