tutor/tutor/server.py
2023-01-09 21:22:46 -05:00

261 lines
8.6 KiB
Python

import decimal
import importlib.metadata
import importlib.resources
import json
import re
import typing
import urllib.parse
import psycopg
import psycopg.rows
import psycopg_pool
import tornado.ioloop
import tornado.web
import tornado_openapi3.handler
import yaml
import tutor.database
import tutor.models
import tutor.search
def update_args(url: str, **qargs) -> str:
parts = urllib.parse.urlsplit(url)
return urllib.parse.urlunsplit(
(
parts.scheme,
parts.netloc,
parts.path,
urllib.parse.urlencode(
[
(k, v)
for k, v in urllib.parse.parse_qsl(parts.query)
if k not in qargs.keys()
]
+ list(qargs.items())
),
parts.fragment,
)
)
class OpenAPIRequestHandler(tornado_openapi3.handler.OpenAPIRequestHandler):
@property
def spec_dict(self):
spec = getattr(self.application, "openapi_spec_dict", None)
if not spec:
version = importlib.metadata.version(__package__)
spec = yaml.safe_load(self.render_string("openapi.yaml", version=version))
setattr(self.application, "openapi_spec_dict", spec)
return spec
@property
def spec(self):
spec = getattr(self.application, "openapi_spec", None)
if not spec:
spec = super().spec
setattr(self.application, "openapi_spec", spec)
return spec
class RequestHandler(tornado.web.RequestHandler):
def set_links(self, **links) -> None:
self.set_header(
"Link",
", ".join(
[f'<{self.url(url)}>; rel="{rel}"' for rel, url in links.items()]
),
)
class SearchHandler(RequestHandler):
def url(self, url: str) -> str:
scheme_override = self.application.settings["scheme"]
if not scheme_override:
return url
parts = urllib.parse.urlsplit(url)
return urllib.parse.urlunsplit(
(
scheme_override,
parts.netloc,
parts.path,
parts.query,
parts.fragment,
)
)
async def get(self) -> None:
async with self.application.pool.connection() as conn:
async with conn.cursor() as cursor:
query = self.get_argument("q", "")
in_collection = self.get_argument("in_collection", None)
page = max(1, int(self.get_argument("page", 1)))
limit = int(self.get_argument("limit", 10))
sort_by = self.get_argument("sort_by", "rarity")
search = tutor.search.search.parse(query)
copies = await tutor.database.advanced_search(
cursor,
search,
limit=limit + 1,
offset=limit * (page - 1),
sort_by=sort_by,
in_collection=in_collection in ("yes", "true")
if in_collection
else None,
)
has_more = copies and len(copies) > limit
self.set_header("Content-Type", "application/json")
self.set_header("Access-Control-Allow-Origin", "*")
links = {}
if page > 1:
links["prev"] = update_args(self.request.full_url(), page=page - 1)
if has_more:
links["next"] = update_args(self.request.full_url(), page=page + 1)
self.set_links(**links)
def price(amount: typing.Optional[decimal.Decimal]) -> typing.Optional[str]:
if amount is not None:
return str(amount)
else:
return None
self.write(
json.dumps(
[
{
"scryfall_id": str(copy.card.scryfall_id),
"name": copy.card.name,
"set_code": copy.card.set_code,
"collector_number": copy.card.collector_number,
"rarity": str(copy.card.rarity),
"color_identity": tutor.models.Color.to_string(
copy.card.color_identity
),
"oracle_text": copy.card.oracle_text,
"prices": {
"usd": price(copy.card.price_usd),
"usd_foil": price(copy.card.price_usd_foil),
"eur": price(copy.card.price_eur),
"eur_foil": price(copy.card.price_eur_foil),
"tix": price(copy.card.price_tix),
},
"foil": copy.foil,
"collection": copy.collection,
}
for copy in copies[:limit]
]
)
)
class CollectionHandler(RequestHandler):
async def get(self) -> None:
async with self.application.pool.connection() as conn:
async with conn.cursor() as cursor:
self.write(
json.dumps(
await tutor.database.collection_stats(cursor), default=str
)
)
class DecksHandler(RequestHandler):
async def get(self) -> None:
page = max(1, int(self.get_argument("page", 1)))
limit = int(self.get_argument("limit", 10))
async with self.application.pool.connection() as conn:
async with conn.cursor() as cursor:
decks = await tutor.database.get_decks(
cursor, limit=limit + 1, offset=limit * (page - 1)
)
has_more = decks and len(decks) > limit
self.set_header("Content-Type", "application/json")
self.set_header("Access-Control-Allow-Origin", "*")
links = {}
if page > 1:
links["prev"] = update_args(self.request.full_url(), page=page - 1)
if has_more:
links["next"] = update_args(self.request.full_url(), page=page + 1)
self.set_links(**links)
self.write(
json.dumps(
[
{
"deck_id": deck.deck_id,
"name": deck.name,
"cards": [],
}
for deck in decks
]
)
)
async def post(self) -> None:
...
class TemplateHandler(RequestHandler):
def initialize(
self,
path: str,
content_type: str = "application/octet-stream",
vars: dict = None,
):
self.path = path
self.content_type = content_type
self.vars = vars if vars else dict()
async def get(self) -> None:
self.set_header("Content-Type", self.content_type)
return self.render(self.path, **self.vars)
class StaticFileHandler(tornado.web.StaticFileHandler):
@classmethod
def get_absolute_path(cls, root: str, path: str) -> str:
# Rewrite paths to load the index
if path in ("collection", "decks", "decks/new123123"):
path = "index.html"
return tornado.web.StaticFileHandler.get_absolute_path(root, path)
class Application(tornado.web.Application):
def __init__(self, **settings):
version = importlib.metadata.version(__package__)
template_path = importlib.resources.path(__package__, "templates")
settings.setdefault("template_path", template_path)
paths = [
(
r"/api/",
TemplateHandler,
{"path": "index.html", "content_type": "text/html"},
),
(
r"/api/openapi.yaml",
TemplateHandler,
{
"path": "openapi.yaml",
"content_type": "application/x-yaml; charset=utf-8",
"vars": {"version": version},
},
),
(r"/api/search", SearchHandler),
(r"/api/collection", CollectionHandler),
(r"/api/decks", DecksHandler),
]
if static_path := settings.get("static"):
paths.extend(
[
(
fr"/(.*)",
StaticFileHandler,
{"path": static_path, "default_filename": "index.html"},
),
]
)
tornado.ioloop.IOLoop.current().add_callback(self.async_init)
super().__init__(paths, **settings)
async def async_init(self):
self.pool = psycopg_pool.AsyncConnectionPool(self.settings["database"])