home

Menu
  • ripgrep search

datasette-media/tests/test_media.py

from asgiref.testing import ApplicationCommunicator
from datasette.app import Datasette
from sqlite_utils import Database
from PIL import Image
import io
import pathlib
import pytest
import httpx
 
 
@pytest.mark.asyncio
async def test_media_filepath(tmpdir):
    filepath = tmpdir / "hello.txt"
    filepath.write_text("hello", "utf-8")
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {"sql": "select '{}' as filepath".format(filepath)}
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/photos/key")
    assert 200 == response.status_code
    assert "hello" == response.content.decode("utf8")
    assert "text/plain" == response.headers["content-type"]
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "content,content_type", [("hello", "text/plain"), ("GIF", "image/gif")]
)
async def test_media_blob(tmpdir, content, content_type):
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "text": {
                        "sql": "select '{}' as content, '{}' as content_type".format(
                            content, content_type
                        )
                    }
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/text/key")
    assert 200 == response.status_code
    assert content == response.content.decode("utf8")
    assert content_type == response.headers["content-type"]
 
 
@pytest.mark.asyncio
async def test_media_blob_404():
    ds = Datasette(
        metadata={
            "plugins": {
                "datasette-media": {
                    "text": {"sql": "select '' as content, 'image/gif' as content_type"}
                }
            }
        }
    )
    response = await ds.client.get("/-/media/text/key")
    assert response.status_code == 404
    assert response.content.startswith(b"\x89PNG")
 
 
@pytest.mark.asyncio
async def test_media_content_url(httpx_mock):
    jpeg = pathlib.Path(__file__).parent / "example.jpg"
    httpx_mock.add_response(
        content=jpeg.open("rb").read(), headers={"Content-Type": "image/jpeg"}
    )
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select 'http://example/example.jpg' as content_url"
                    }
                }
            }
        },
    ).app()
    status_code, headers, body = await request(app, "/-/media/photos/1")
    assert 200 == status_code
    image = Image.open(io.BytesIO(body))
    assert image.size == (313, 234)
    assert "JPEG" == image.format
 
 
@pytest.mark.asyncio
async def test_media_content_url_transform(httpx_mock):
    jpeg = pathlib.Path(__file__).parent / "example.jpg"
    httpx_mock.add_response(
        content=jpeg.open("rb").read(),
        headers={"Content-Type": "image/jpeg"},
    )
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select 'http://example/example.jpg' as content_url, 100 as resize_width",
                        "enable_transform": True,
                    }
                }
            }
        },
    ).app()
    status_code, headers, body = await request(app, "/-/media/photos/2?format=PNG")
    assert 200 == status_code
    image = Image.open(io.BytesIO(body))
    assert image.size == (100, 74)
    assert "PNG" == image.format
 
 
@pytest.mark.asyncio
async def test_database_option(tmpdir):
    filepath = tmpdir / "hello.txt"
    filepath.write_text("hello2", "utf-8")
 
    one = str(tmpdir / "one.db")
    two = str(tmpdir / "two.db")
 
    Database(one)["t"].insert({"hello": 1})
    Database(two)["photos"].insert({"pk": 1, "filepath": str(filepath)})
 
    app = Datasette(
        [one, two],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select filepath from photos where pk=:key",
                        "database": "two",
                    }
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/photos/1")
    assert 200 == response.status_code
    assert "hello2" == response.content.decode("utf8")
 
 
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "extra_sql,expected_width,expected_height",
    (
        ("", 313, 234),
        (", 99 as resize_width", 99, 74),
        (", 99 as resize_height", 132, 99),
    ),
)
async def test_sql_resize(extra_sql, expected_width, expected_height):
    jpeg = str(pathlib.Path(__file__).parent / "example.jpg")
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select '{}' as filepath{}".format(jpeg, extra_sql)
                    }
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/photos/1")
    assert 200 == response.status_code
    image = Image.open(io.BytesIO(response.content))
    actual_width, actual_height = image.size
    assert (expected_width, expected_height) == (actual_width, actual_height)
    assert "JPEG" == image.format
 
 
@pytest.mark.asyncio
async def test_sql_convert_filepath():
    jpeg = str(pathlib.Path(__file__).parent / "example.jpg")
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select '{}' as filepath, 'png' as output_format".format(
                            jpeg
                        )
                    }
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/photos/1")
    assert 200 == response.status_code
    image = Image.open(io.BytesIO(response.content))
    assert (313, 234) == image.size
    assert "PNG" == image.format
 
 
@pytest.mark.asyncio
async def test_sql_convert_blob(tmpdir):
    jpeg = pathlib.Path(__file__).parent / "example.jpg"
    db_path = tmpdir / "photos.db"
    Database(str(db_path))["photos"].insert(
        {
            "content": jpeg.open("rb").read(),
        }
    )
    app = Datasette(
        [db_path],
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select content, 'png' as output_format, 100 as resize_width from photos"
                    }
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/photos/1")
    assert 200 == response.status_code
    image = Image.open(io.BytesIO(response.content))
    assert image.size == (100, 74)
    assert "PNG" == image.format
 
 
@pytest.mark.parametrize(
    "custom_limit,enabled,args,expected_dimensions,expected_format",
    (
        (None, False, {"w": 100}, (313, 234), "JPEG"),
        (None, True, {"w": 100}, (100, 74), "JPEG"),
        (None, True, {"format": "png"}, (313, 234), "PNG"),
        (None, True, {"h": 100}, (133, 100), "JPEG"),
        (None, True, {"h": 3999}, (5349, 3999), "JPEG"),
        (None, True, {"h": 4000}, (313, 234), "JPEG"),
        (4001, True, {"h": 4000}, (5350, 4000), "JPEG"),
    ),
)
@pytest.mark.asyncio
async def test_transform_query_string(
    custom_limit, enabled, args, expected_dimensions, expected_format
):
    jpeg = str(pathlib.Path(__file__).parent / "example.jpg")
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "photos": {
                        "sql": "select '{}' as filepath".format(jpeg),
                        "enable_transform": enabled,
                        "max_width_height": custom_limit,
                    }
                }
            }
        },
    ).app()
    async with httpx.AsyncClient(app=app) as client:
        response = await client.get("http://localhost/-/media/photos/1", params=args)
    assert 200 == response.status_code
    image = Image.open(io.BytesIO(response.content))
    assert expected_dimensions == image.size
    assert expected_format == image.format
 
 
@pytest.mark.parametrize(
    "path,expected_size",
    [
        ("/-/media/proxied/1", (313, 234)),
        ("/-/media/proxied_transformed/1", (100, 74)),
        ("/-/media/blob/1", (313, 234)),
        ("/-/media/on_disk/1", (313, 234)),
    ],
)
@pytest.mark.asyncio
async def test_content_filename(path, expected_size, httpx_mock):
    jpeg = pathlib.Path(__file__).parent / "example.jpg"
    jpeg_bytes = jpeg.open("rb").read()
    if "proxied" in path:
        httpx_mock.add_response(
            content=jpeg_bytes, headers={"Content-Type": "image/jpeg"}
        )
    app = Datasette(
        [],
        memory=True,
        metadata={
            "plugins": {
                "datasette-media": {
                    "proxied": {
                        "sql": "select 'http://blah/' as content_url, 'x.jpg' as content_filename"
                    },
                    "proxied_transformed": {
                        "sql": "select 'http://blah/' as content_url, 'x.jpg' as content_filename, 100 as resize_width"
                    },
                    "blob": {
                        "sql": "select X'{}' as content, 'x.jpg' as content_filename".format(
                            jpeg_bytes.hex()
                        )
                    },
                    "on_disk": {
                        "sql": "select '{}' as filepath, 'x.jpg' as content_filename".format(
                            jpeg
                        )
                    },
                }
            }
        },
    ).app()
    status_code, headers, body = await request(app, path)
    assert 200 == status_code
    content_disposition = [
        headers[k] for k in headers if k.lower() == "content-disposition"
    ][0]
    assert content_disposition == 'attachment; filename="x.jpg"'
    image = Image.open(io.BytesIO(body))
    assert expected_size == image.size
    assert "JPEG" == image.format
 
 
async def request(app, path):
    # Sometimes we can't use httpx.AsyncClient to execute the test, because
    # we've mocked it using httpx_mock - so we do it the harder way instead
    if "?" in path:
        path, query_string = path.split("?", 1)
        query_string = query_string.encode("utf-8")
    else:
        query_string = b""
    scope = {
        "type": "http",
        "http_version": "1.0",
        "method": "GET",
        "path": path,
        "raw_path": path.encode("utf-8"),
        "query_string": query_string,
        "headers": [],
    }
    instance = ApplicationCommunicator(app, scope)
    await instance.send_input({"type": "http.request"})
    messages = []
    start = await instance.receive_output(2)
    messages.append(start)
    assert start["type"] == "http.response.start"
    headers = dict([(k.decode("utf8"), v.decode("utf8")) for k, v in start["headers"]])
    status_code = start["status"]
    # Loop until we run out of response.body
    body = b""
    while True:
        message = await instance.receive_output(2)
        messages.append(message)
        assert message["type"] == "http.response.body"
        body += message["body"]
        if not message.get("more_body"):
            break
    return status_code, headers, body
 
Powered by Datasette