tutor/tutor/database.py

586 lines
22 KiB
Python

import datetime
import decimal
import itertools
import logging
import typing
import uuid
import psycopg
import psycopg.rows
import psycopg.sql as sql
import tutor.models
import tutor.search
logger = logging.getLogger(__name__)
def convert_price(price: typing.Optional[str]) -> typing.Optional[decimal.Decimal]:
if price:
return decimal.Decimal(price)
else:
return None
async def search(
db: psycopg.AsyncCursor,
name: typing.Optional[str] = None,
collector_number: typing.Optional[str] = None,
set_code: typing.Optional[str] = None,
set_name: typing.Optional[str] = None,
foil: typing.Optional[bool] = None,
alternate_art: typing.Optional[bool] = None,
scryfall_id: typing.Optional[str] = None,
limit: int = 10,
distinct: bool = True,
in_collection: typing.Optional[bool] = None,
) -> typing.List[tutor.models.Card]:
db.row_factory = psycopg.rows.dict_row
joins = []
constraints = []
params: typing.Dict[str, typing.Any] = {}
if name is not None:
constraints.append("cards.name LIKE %(name)s")
params["name"] = name
if collector_number is not None:
constraints.append("cards.collector_number LIKE %(number)s")
params["number"] = collector_number
if set_code is not None:
constraints.append("cards.set_code LIKE %(set_code)s")
params["set_code"] = set_code.upper()
if set_name is not None:
constraints.append("sets.name LIKE %(set_name)s")
params["set_name"] = set_name
if foil is not None:
constraints.append("cards.foil IS %(foil)s")
params["foil"] = foil
if alternate_art is not None:
constraints.append("cards.variation IS %(alternative)s")
params["alternative"] = alternate_art
if scryfall_id is not None:
constraints.append("cards.scryfall_id = %(scryfall_id)s")
params["scryfall_id"] = scryfall_id
if in_collection is not None:
if in_collection:
joins.append("JOIN copies USING (scryfall_id)")
else:
joins.append("LEFT JOIN copies USING (scryfall_id)")
constraints.append("copies.id IS NULL")
joins.append("JOIN sets USING (set_code)")
query = " ".join(
[
"SELECT cards.*, sets.name AS set_name FROM cards",
" ".join(joins),
"WHERE" if constraints else "",
" AND ".join(constraints),
f"LIMIT {limit}",
]
)
await db.execute(query, params)
rows = await db.fetchall()
return [
tutor.models.Card(
scryfall_id=row["scryfall_id"],
oracle_id=row["oracle_id"],
name=row["name"],
set_code=row["set_code"],
set_name=row["set_name"],
collector_number=row["collector_number"],
rarity=tutor.models.Rarity.from_string(row["rarity"]),
color_identity=tutor.models.Color.from_string(row["color_identity"]),
cmc=row["cmc"],
type_line=row["type_line"],
release_date=datetime.date.fromisoformat(row["release_date"]),
games=set(),
legalities={},
edhrec_rank=row["edhrec_rank"],
oracle_text=row["oracle_text"],
)
for row in rows
]
async def advanced_search(
db: psycopg.AsyncCursor,
search: tutor.search.Search,
limit: int = 10,
offset: int = 0,
sort_by: str = "rarity",
in_collection: typing.Optional[bool] = None,
) -> typing.List[tutor.models.CardCopy]:
db.row_factory = psycopg.rows.dict_row
joins = []
constraints = []
params: typing.Dict[str, typing.Any] = {}
logger.debug("Performing search for: %s", search)
for i, criterion in enumerate(search.criteria):
param = f"param_{i}"
if isinstance(criterion, tutor.search.Name):
constraints.append(f"cards.name ILIKE %({param})s")
params[param] = f"%{criterion.text}%"
if isinstance(criterion, tutor.search.Type):
constraints.append(f"cards.type_line ILIKE %({param})s")
params[param] = f"%{criterion.text}%"
if isinstance(criterion, tutor.search.Expansion):
constraints.append(f"cards.set_code ILIKE %({param})s")
params[param] = criterion.set_code
if isinstance(criterion, tutor.search.Color):
if criterion.operator == tutor.search.Operator.matches:
constraints.append(f"cards.color_identity ILIKE %({param})s")
params[param] = tutor.models.Color.to_string(criterion.colors)
if criterion.operator == tutor.search.Operator.lte:
colors = list(
{
tutor.models.Color.to_string(list(combo))
for n in range(len(criterion.colors))
for combo in itertools.combinations(criterion.colors, n + 1)
}
| {""}
)
constraints.append(
"({})".format(
" OR ".join(
[
f"cards.color_identity ILIKE %({param}_{color})s"
for color in colors
]
)
)
)
params.update({f"{param}_{color}": color for color in colors})
if criterion.operator == tutor.search.Operator.gte:
constraints.append(f"cards.color_identity ILIKE %({param})s")
params[param] = "%{}%".format(
"%".join(tutor.models.Color.to_string(criterion.colors))
)
if isinstance(criterion, tutor.search.Rarity):
if criterion.operator == tutor.search.Operator.matches:
constraints.append(f"cards.rarity = %({param})s::rarity")
params[param] = str(criterion.rarity)
if criterion.operator == tutor.search.Operator.lte:
constraints.append(f"cards.rarity <= %({param})s::rarity")
params[param] = criterion.rarity.value
if criterion.operator == tutor.search.Operator.gte:
constraints.append(f"cards.rarity >= %({param})s::rarity")
params[param] = criterion.rarity.value
if isinstance(criterion, tutor.search.Oracle):
constraints.append(f"cards.oracle_text ILIKE %({param})s")
params[param] = f"%{criterion.text}%"
if isinstance(criterion, tutor.search.Collection) and in_collection:
constraints.append(f"copies.collection ILIKE %({param})s")
params[param] = f"%{criterion.text}%"
if in_collection is not None:
if in_collection:
joins.append("JOIN copies ON (cards.scryfall_id = copies.scryfall_id)")
else:
joins.append("LEFT JOIN copies ON (cards.scryfall_id = copies.scryfall_id)")
constraints.append("copies.id IS NULL")
else:
joins.append("LEFT JOIN copies ON (cards.scryfall_id = copies.scryfall_id)")
joins.append("JOIN sets ON (cards.set_code = sets.set_code)")
orderings = [
"cards.rarity DESC",
"length(cards.color_identity) DESC",
"CASE "
" WHEN length(cards.color_identity) > 0 THEN '0'"
" ELSE cards.color_identity END ASC",
"cards.name ASC",
]
if sort_by == "price":
orderings = [
'CAST(COALESCE(CASE WHEN "copies"."isFoil" THEN cards.price_usd_foil ELSE cards.price_usd END, 0) as decimal) DESC',
*orderings,
]
params["last_update_key"] = "last_update"
query = " ".join(
[
"SELECT cards.*, copies.*",
', CASE WHEN "copies"."isFoil" THEN cards.price_usd_foil',
" ELSE cards.price_usd END AS usd",
"FROM cards",
" ".join(joins),
"WHERE" if constraints else "",
" AND ".join(constraints),
"ORDER BY " if orderings else "",
", ".join(orderings),
f"LIMIT {limit} OFFSET {offset}",
]
)
logger.debug("Query: %s", (query, params))
await db.execute(query, params)
rows = await db.fetchall()
return [
tutor.models.CardCopy(
card=tutor.models.Card(
scryfall_id=row["scryfall_id"],
oracle_id=row["oracle_id"],
name=row["name"],
set_code=row["set_code"],
set_name=row["set_name"],
collector_number=row["collector_number"],
rarity=tutor.models.Rarity.from_string(row["rarity"]),
color_identity=tutor.models.Color.from_string(row["color_identity"]),
cmc=row["cmc"],
mana_cost=row["mana_cost"],
type_line=row["type_line"],
release_date=datetime.date.fromisoformat(row["release_date"]),
games=set(),
legalities={},
edhrec_rank=row["edhrec_rank"],
oracle_text=row["oracle_text"],
price_usd=convert_price(row["price_usd"]),
price_usd_foil=convert_price(row["price_usd_foil"]),
price_eur=convert_price(row["price_eur"]),
price_eur_foil=convert_price(row["price_eur_foil"]),
price_tix=convert_price(row["price_tix"]),
),
foil=row["isFoil"] if row["isFoil"] is not None else False,
collection=row["collection"] or "Default",
)
for row in rows
]
async def oracle_id_by_name(
db: psycopg.AsyncCursor, name: str
) -> typing.Optional[uuid.UUID]:
db.row_factory = psycopg.rows.dict_row
await db.execute(
'SELECT "oracle_id" FROM "oracle" WHERE "name" ILIKE %(name)s', {"name": name}
)
row = await db.fetchone()
if row:
return row["oracle_id"]
else:
return None
async def store_card(db: psycopg.AsyncCursor, card: tutor.models.Card) -> None:
await db.execute(
"""
INSERT INTO tmp_cards
("scryfall_id", "oracle_id", "name", "set_code", "collector_number",
"rarity", "color_identity", "cmc", "mana_cost", "type_line",
"release_date", "edhrec_rank", "oracle_text")
VALUES (%(scryfall_id)s, %(oracle_id)s, %(name)s, %(set_code)s,
%(collector_number)s, %(rarity)s, %(color_identity)s, %(cmc)s,
%(mana_cost)s, %(type_line)s, %(release_date)s, %(edhrec_rank)s,
%(oracle_text)s)
""",
{
"scryfall_id": str(card.scryfall_id),
"oracle_id": str(card.oracle_id) if card.oracle_id else None,
"name": card.name,
"set_code": card.set_code,
"collector_number": card.collector_number,
"rarity": str(card.rarity),
"color_identity": tutor.models.Color.to_string(card.color_identity),
"cmc": str(card.cmc),
"mana_cost": card.mana_cost,
"type_line": card.type_line,
"release_date": str(card.release_date) if card.release_date else None,
"edhrec_rank": card.edhrec_rank,
"oracle_text": card.oracle_text,
},
)
async def store_price(
db: psycopg.AsyncCursor, date: datetime.date, card: tutor.models.Card
) -> None:
await db.execute(
"""
INSERT INTO "tmp_prices"
("scryfall_id", "date", "usd", "usd_foil", "eur", "eur_foil", "tix")
VALUES (%(scryfall_id)s, %(date)s, %(usd)s, %(usd_foil)s, %(eur)s, %(eur_foil)s, %(tix)s)
""",
{
"scryfall_id": card.scryfall_id,
"date": str(date),
"usd": str(card.price_usd) if card.price_usd else None,
"usd_foil": str(card.price_usd_foil) if card.price_usd_foil else None,
"eur": str(card.price_eur) if card.price_eur else None,
"eur_foil": str(card.price_eur_foil) if card.price_eur_foil else None,
"tix": str(card.price_tix) if card.price_tix else None,
},
)
async def store_set(db: psycopg.AsyncCursor, set_code: str, name: str) -> None:
await db.execute(
"""
INSERT INTO "sets" ("set_code", "name")
VALUES (%(set_code)s, %(name)s)
ON CONFLICT ("set_code") DO NOTHING
""",
{"set_code": set_code, "name": name},
)
async def store_copy(db: psycopg.AsyncCursor, copy: tutor.models.CardCopy) -> None:
await db.execute(
"""
INSERT INTO copies ("scryfall_id", "isFoil", "collection", "condition")
VALUES (%(scryfall_id)s, %(foil)s, %(collection)s, %(condition)s)
""",
{
"scryfall_id": str(copy.card.scryfall_id),
"foil": copy.foil,
"collection": copy.collection,
"condition": copy.condition,
},
)
async def clear_copies(
db: psycopg.AsyncCursor, collection: typing.Optional[str] = None
):
if collection:
await db.execute("DELETE FROM copies WHERE collection = %s", collection)
else:
await db.execute("DELETE FROM copies")
async def store_deck(db: psycopg.AsyncCursor, name: str) -> typing.Optional[int]:
await db.execute(
'INSERT INTO "decks" ("name") VALUES (%(name)s) RETURNING "deck_id"',
{"name": name},
)
result = await db.fetchone()
if result:
return result[0]
else:
return None
async def store_deck_card(
db: psycopg.AsyncCursor, deck_id: int, oracle_id: uuid.UUID, quantity: int = 1
) -> None:
await db.execute(
"""
INSERT INTO "deck_list" ("deck_id", "oracle_id", "quantity")
VALUES (%(deck_id)s, %(oracle_id)s, %(quantity)s)
ON CONFLICT ("deck_id", "oracle_id") DO UPDATE
SET "quantity" = "deck_list"."quantity" + EXCLUDED."quantity"
""",
{"deck_id": deck_id, "oracle_id": str(oracle_id), "quantity": quantity},
)
async def clear_deck(db: psycopg.AsyncCursor, deck_id: int) -> None:
await db.execute(
"""
DELETE FROM "deck_list"
WHERE "deck_id" = %(deck_id)s
""",
{"deck_id": deck_id},
)
async def get_decks(
db: psycopg.AsyncCursor, limit: int = 10, offset: int = 0
) -> typing.List[tutor.models.Deck]:
db.row_factory = psycopg.rows.dict_row
await db.execute(
"""
SELECT "decks"."deck_id"
, "decks"."name"
, JSON_STRIP_NULLS(JSON_AGG(JSON_BUILD_OBJECT(
'oracle_id', "deck_list"."oracle_id",
'name', "oracle_latest"."name",
'color_identity', "oracle_latest"."color_identity",
'cmc', "oracle_latest"."cmc",
'mana_cost', "oracle_latest"."mana_cost",
'type_line', "oracle_latest"."type_line",
'edhrec_rank', "oracle_latest"."edhrec_rank",
'oracle_text', "oracle_latest"."oracle_text",
'scryfall_id', "oracle_latest"."scryfall_id",
'set_code', "oracle_latest"."set_code",
'set_name', "oracle_latest"."set_name",
'collector_number', "oracle_latest"."collector_number",
'rarity', "oracle_latest"."rarity",
'release_date', "oracle_latest"."release_date",
'price_usd', "oracle_latest"."price_usd",
'price_usd_foil', "oracle_latest"."price_usd_foil",
'price_eur', "oracle_latest"."price_eur",
'price_eur_foil', "oracle_latest"."price_eur_foil",
'price_tix', "oracle_latest"."price_tix",
'quantity', "deck_list"."quantity"
))) AS "cards"
FROM "decks"
JOIN "deck_list" USING ("deck_id")
JOIN "oracle_latest" USING ("oracle_id")
GROUP BY "decks"."deck_id"
, "decks"."name"
ORDER BY "decks"."deck_id"
LIMIT %(limit)s OFFSET %(offset)s
""",
{"limit": limit, "offset": offset},
)
rows = await db.fetchall()
return [
tutor.models.Deck(
deck_id=row["deck_id"],
name=row["name"],
cards=[
tutor.models.DeckCard(
card=tutor.models.Card(
oracle_id=card["oracle_id"],
name=card["name"],
color_identity=tutor.models.Color.from_string(
card["color_identity"]
),
cmc=card["cmc"],
mana_cost=card["mana_cost"],
type_line=card["type_line"],
games=set(),
legalities={},
edhrec_rank=card.get("edhrec_rank"),
oracle_text=card.get("oracle_text"),
scryfall_id=card["scryfall_id"],
set_code=card["set_code"],
set_name=card["set_name"],
collector_number=card["collector_number"],
rarity=card["rarity"],
release_date=card["release_date"],
price_usd=convert_price(card.get("price_usd")),
price_usd_foil=convert_price(card.get("price_usd_foil")),
price_eur=convert_price(card.get("price_eur")),
price_eur_foil=convert_price(card.get("price_eur_foil")),
price_tix=convert_price(card.get("price_tix")),
),
quantity=card["quantity"],
)
for card in row["cards"]
if card and card.get("oracle_id")
],
)
for row in rows
]
async def get_deck(
db: psycopg.AsyncCursor, deck_id: int
) -> typing.Optional[tutor.models.Deck]:
db.row_factory = psycopg.rows.dict_row
await db.execute(
"""
SELECT "decks"."deck_id"
, "decks"."name"
, JSON_STRIP_NULLS(JSON_AGG(JSON_BUILD_OBJECT(
'oracle_id', "deck_list"."oracle_id",
'name', "oracle_latest"."name",
'color_identity', "oracle_latest"."color_identity",
'cmc', "oracle_latest"."cmc",
'mana_cost', "oracle_latest"."mana_cost",
'type_line', "oracle_latest"."type_line",
'edhrec_rank', "oracle_latest"."edhrec_rank",
'oracle_text', "oracle_latest"."oracle_text",
'scryfall_id', "oracle_latest"."scryfall_id",
'set_code', "oracle_latest"."set_code",
'set_name', "oracle_latest"."set_name",
'collector_number', "oracle_latest"."collector_number",
'rarity', "oracle_latest"."rarity",
'release_date', "oracle_latest"."release_date",
'price_usd', "oracle_latest"."price_usd",
'price_usd_foil', "oracle_latest"."price_usd_foil",
'price_eur', "oracle_latest"."price_eur",
'price_eur_foil', "oracle_latest"."price_eur_foil",
'price_tix', "oracle_latest"."price_tix",
'quantity', "deck_list"."quantity"
))) AS "cards"
FROM "decks"
JOIN "deck_list" USING ("deck_id")
JOIN "oracle_latest" USING ("oracle_id")
WHERE "decks"."deck_id" = %(deck_id)s
GROUP BY "decks"."deck_id"
, "decks"."name"
""",
{"deck_id": deck_id},
)
row = await db.fetchone()
if row:
return tutor.models.Deck(
deck_id=row["deck_id"],
name=row["name"],
cards=[
tutor.models.DeckCard(
card=tutor.models.Card(
oracle_id=card["oracle_id"],
name=card["name"],
color_identity=tutor.models.Color.from_string(
card["color_identity"]
),
cmc=card["cmc"],
mana_cost=card["mana_cost"],
type_line=card["type_line"],
games=set(),
legalities={},
edhrec_rank=card.get("edhrec_rank"),
oracle_text=card.get("oracle_text"),
scryfall_id=card["scryfall_id"],
set_code=card["set_code"],
set_name=card["set_name"],
collector_number=card["collector_number"],
rarity=card["rarity"],
release_date=card["release_date"],
price_usd=convert_price(card.get("price_usd")),
price_usd_foil=convert_price(card.get("price_usd_foil")),
price_eur=convert_price(card.get("price_eur")),
price_eur_foil=convert_price(card.get("price_eur_foil")),
price_tix=convert_price(card.get("price_tix")),
),
quantity=card["quantity"],
)
for card in row["cards"]
if card and card.get("oracle_id")
],
)
else:
return None
async def store_var(db: psycopg.AsyncCursor, key: str, value: str) -> None:
await db.execute(
"""
INSERT INTO "vars" ("key", "value")
VALUES (%(key)s, %(value)s)
ON CONFLICT ("key") DO UPDATE
SET "value" = %(value)s
""",
{"key": key, "value": value},
)
async def get_var(db: psycopg.AsyncCursor, key: str) -> typing.Any:
await db.execute(
'SELECT "value" FROM "vars" WHERE "key" = %(key)s',
{"key": key},
)
if result := await db.fetchone():
return result[0]
else:
return None
async def collection_stats(db: psycopg.AsyncCursor) -> typing.Optional[dict]:
db.row_factory = psycopg.rows.dict_row
await db.execute(
"""
SELECT COUNT("copies"."id") AS cards
, SUM(
CASE WHEN "copies"."isFoil"
THEN "cards"."price_usd_foil"
ELSE "cards"."price_usd"
END
) AS value
, COUNT(DISTINCT cards.set_code) AS sets
FROM "copies"
JOIN "cards" USING ("scryfall_id")
"""
)
return await db.fetchone()