527 lines
20 KiB
Python
527 lines
20 KiB
Python
import datetime
|
|
import decimal
|
|
import itertools
|
|
import logging
|
|
import typing
|
|
import uuid
|
|
|
|
import psycopg
|
|
import psycopg.rows
|
|
import psycopg.sql as sql
|
|
|
|
import tutor.models
|
|
import tutor.search
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def search(
|
|
db: psycopg.Cursor,
|
|
name: typing.Optional[str] = None,
|
|
collector_number: typing.Optional[str] = None,
|
|
set_code: typing.Optional[str] = None,
|
|
set_name: typing.Optional[str] = None,
|
|
foil: typing.Optional[bool] = None,
|
|
alternate_art: typing.Optional[bool] = None,
|
|
scryfall_id: typing.Optional[str] = None,
|
|
limit: int = 10,
|
|
distinct: bool = True,
|
|
in_collection: typing.Optional[bool] = None,
|
|
) -> typing.List[tutor.models.Card]:
|
|
db.row_factory = psycopg.rows.dict_row
|
|
joins = []
|
|
constraints = []
|
|
params = {}
|
|
if name is not None:
|
|
constraints.append("cards.name LIKE %(name)s")
|
|
params["name"] = name
|
|
if collector_number is not None:
|
|
constraints.append("cards.collector_number LIKE %(number)s")
|
|
params["number"] = collector_number
|
|
if set_code is not None:
|
|
constraints.append("cards.set_code LIKE %(set_code)s")
|
|
params["set_code"] = set_code.upper()
|
|
if set_name is not None:
|
|
constraints.append("sets.name LIKE %(set_name)s")
|
|
params["set_name"] = set_name
|
|
if foil is not None:
|
|
constraints.append("cards.foil IS %(foil)s")
|
|
params["foil"] = foil
|
|
if alternate_art is not None:
|
|
constraints.append("cards.variation IS %(alternative)s")
|
|
params["alternative"] = alternate_art
|
|
if scryfall_id is not None:
|
|
constraints.append("cards.scryfall_id = %(scryfall_id)s")
|
|
params["scryfall_id"] = scryfall_id
|
|
if in_collection is not None:
|
|
if in_collection:
|
|
joins.append("JOIN copies USING (scryfall_id)")
|
|
else:
|
|
joins.append("LEFT JOIN copies USING (scryfall_id)")
|
|
constraints.append("copies.id IS NULL")
|
|
joins.append("JOIN sets USING (set_code)")
|
|
query = " ".join(
|
|
[
|
|
"SELECT cards.* FROM cards",
|
|
" ".join(joins),
|
|
"WHERE" if constraints else "",
|
|
" AND ".join(constraints),
|
|
f"LIMIT {limit}",
|
|
]
|
|
)
|
|
await db.execute(query, params)
|
|
rows = await db.fetchall()
|
|
return [
|
|
tutor.models.Card(
|
|
scryfall_id=row["scryfall_id"],
|
|
oracle_id=row["oracle_id"],
|
|
name=row["name"],
|
|
set_code=row["set_code"],
|
|
collector_number=row["collector_number"],
|
|
rarity=tutor.models.Rarity.from_string(row["rarity"]),
|
|
color_identity=tutor.models.Color.from_string(row["color_identity"]),
|
|
cmc=row["cmc"],
|
|
type_line=row["type_line"],
|
|
release_date=datetime.date.fromisoformat(row["release_date"]),
|
|
games=set(),
|
|
legalities={},
|
|
edhrec_rank=row["edhrec_rank"],
|
|
oracle_text=row["oracle_text"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
async def advanced_search(
|
|
db: psycopg.Cursor,
|
|
search: tutor.search.Search,
|
|
limit: int = 10,
|
|
offset: int = 0,
|
|
sort_by: str = "rarity",
|
|
in_collection: typing.Optional[bool] = None,
|
|
) -> typing.List[tutor.models.CardCopy]:
|
|
db.row_factory = psycopg.rows.dict_row
|
|
joins = []
|
|
constraints = []
|
|
params = {}
|
|
sets = []
|
|
|
|
logger.debug("Performing search for: %s", search)
|
|
for i, criterion in enumerate(search.criteria):
|
|
param = f"param_{i}"
|
|
if isinstance(criterion, tutor.search.Name):
|
|
constraints.append(f"cards.name ILIKE %({param})s")
|
|
params[param] = f"%{criterion.text}%"
|
|
if isinstance(criterion, tutor.search.Type):
|
|
constraints.append(f"cards.type_line ILIKE %({param})s")
|
|
params[param] = f"%{criterion.text}%"
|
|
if isinstance(criterion, tutor.search.Expansion):
|
|
constraints.append(f"cards.set_code ILIKE %({param})s")
|
|
params[param] = criterion.set_code
|
|
if isinstance(criterion, tutor.search.Color):
|
|
if criterion.operator == tutor.search.Operator.matches:
|
|
constraints.append(f"cards.color_identity ILIKE %({param})s")
|
|
params[param] = tutor.models.Color.to_string(criterion.colors)
|
|
if criterion.operator == tutor.search.Operator.lte:
|
|
colors = list(
|
|
{
|
|
tutor.models.Color.to_string(list(combo))
|
|
for n in range(len(criterion.colors))
|
|
for combo in itertools.combinations(criterion.colors, n + 1)
|
|
}
|
|
| {""}
|
|
)
|
|
constraints.append(
|
|
"({})".format(
|
|
" OR ".join(
|
|
[
|
|
f"cards.color_identity ILIKE %({param}_{color})s"
|
|
for color in colors
|
|
]
|
|
)
|
|
)
|
|
)
|
|
params.update({f"{param}_{color}": color for color in colors})
|
|
if criterion.operator == tutor.search.Operator.gte:
|
|
constraints.append(f"cards.color_identity ILIKE %({param})s")
|
|
params[param] = "%{}%".format(
|
|
"%".join(tutor.models.Color.to_string(criterion.colors))
|
|
)
|
|
if isinstance(criterion, tutor.search.Rarity):
|
|
if criterion.operator == tutor.search.Operator.matches:
|
|
constraints.append(f"cards.rarity = %({param})s::rarity")
|
|
params[param] = str(criterion.rarity)
|
|
if criterion.operator == tutor.search.Operator.lte:
|
|
constraints.append(f"cards.rarity <= %({param})s::rarity")
|
|
params[param] = criterion.rarity.value
|
|
if criterion.operator == tutor.search.Operator.gte:
|
|
constraints.append(f"cards.rarity >= %({param})s::rarity")
|
|
params[param] = criterion.rarity.value
|
|
if isinstance(criterion, tutor.search.Oracle):
|
|
constraints.append(f"cards.oracle_text ILIKE %({param})s")
|
|
params[param] = f"%{criterion.text}%"
|
|
|
|
if in_collection is not None:
|
|
if in_collection:
|
|
joins.append("JOIN copies ON (cards.scryfall_id = copies.scryfall_id)")
|
|
else:
|
|
joins.append("LEFT JOIN copies ON (cards.scryfall_id = copies.scryfall_id)")
|
|
constraints.append("copies.id IS NULL")
|
|
else:
|
|
joins.append("LEFT JOIN copies ON (cards.scryfall_id = copies.scryfall_id)")
|
|
joins.append("JOIN sets ON (cards.set_code = sets.set_code)")
|
|
joins.append(
|
|
"JOIN card_prices ON (cards.scryfall_id = card_prices.scryfall_id "
|
|
"AND card_prices.date = (select value from vars where key = %(last_update_key)s))"
|
|
)
|
|
orderings = [
|
|
"cards.rarity DESC",
|
|
"length(cards.color_identity) DESC",
|
|
"CASE "
|
|
" WHEN length(cards.color_identity) > 0 THEN '0'"
|
|
" ELSE cards.color_identity END ASC",
|
|
"cards.name ASC",
|
|
]
|
|
if sort_by == "price":
|
|
orderings = [
|
|
'CAST(COALESCE(CASE WHEN "copies"."isFoil" THEN card_prices.usd_foil ELSE card_prices.usd END, 0) as decimal) DESC',
|
|
*orderings,
|
|
]
|
|
params["last_update_key"] = "last_update"
|
|
query = " ".join(
|
|
[
|
|
"SELECT cards.*, card_prices.*, copies.*",
|
|
', CASE WHEN "copies"."isFoil" THEN card_prices.usd_foil',
|
|
" ELSE card_prices.usd END AS usd",
|
|
"FROM cards",
|
|
" ".join(joins),
|
|
"WHERE" if constraints else "",
|
|
" AND ".join(constraints),
|
|
"ORDER BY " if orderings else "",
|
|
", ".join(orderings),
|
|
f"LIMIT {limit} OFFSET {offset}",
|
|
]
|
|
)
|
|
logger.debug("Query: %s", (query, params))
|
|
await db.execute(query, params)
|
|
rows = await db.fetchall()
|
|
|
|
def convert_price(price: typing.Optional[str]) -> typing.Optional[decimal.Decimal]:
|
|
if price:
|
|
return decimal.Decimal(price)
|
|
else:
|
|
return None
|
|
|
|
return [
|
|
tutor.models.CardCopy(
|
|
card=tutor.models.Card(
|
|
scryfall_id=row["scryfall_id"],
|
|
oracle_id=row["oracle_id"],
|
|
name=row["name"],
|
|
set_code=row["set_code"],
|
|
collector_number=row["collector_number"],
|
|
rarity=tutor.models.Rarity.from_string(row["rarity"]),
|
|
color_identity=tutor.models.Color.from_string(row["color_identity"]),
|
|
cmc=row["cmc"],
|
|
type_line=row["type_line"],
|
|
release_date=datetime.date.fromisoformat(row["release_date"]),
|
|
games=set(),
|
|
legalities={},
|
|
edhrec_rank=row["edhrec_rank"],
|
|
oracle_text=row["oracle_text"],
|
|
price_usd=convert_price(row["usd"]),
|
|
price_usd_foil=convert_price(row["usd_foil"]),
|
|
price_eur=convert_price(row["eur"]),
|
|
price_eur_foil=convert_price(row["eur_foil"]),
|
|
price_tix=convert_price(row["tix"]),
|
|
),
|
|
foil=row["isFoil"] if row["isFoil"] is not None else False,
|
|
collection=row["collection"] or "Default",
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
async def oracle_id_by_name(
|
|
db: psycopg.Cursor, name: str
|
|
) -> typing.Optional[uuid.UUID]:
|
|
db.row_factory = psycopg.rows.dict_row
|
|
await db.execute(
|
|
'SELECT "oracle_id" FROM "oracle" WHERE "name" ILIKE %(name)s', {"name": name}
|
|
)
|
|
row = await db.fetchone()
|
|
if row:
|
|
return row["oracle_id"]
|
|
|
|
|
|
async def store_card(db: psycopg.Cursor, card: tutor.models.Card) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO tmp_cards
|
|
("scryfall_id", "oracle_id", "name", "set_code", "collector_number",
|
|
"rarity", "color_identity", "cmc", "mana_cost", "type_line",
|
|
"release_date", "edhrec_rank", "oracle_text")
|
|
VALUES (%(scryfall_id)s, %(oracle_id)s, %(name)s, %(set_code)s,
|
|
%(collector_number)s, %(rarity)s, %(color_identity)s, %(cmc)s,
|
|
%(mana_cost)s, %(type_line)s, %(release_date)s, %(edhrec_rank)s,
|
|
%(oracle_text)s)
|
|
""",
|
|
{
|
|
"scryfall_id": str(card.scryfall_id),
|
|
"oracle_id": str(card.oracle_id) if card.oracle_id else None,
|
|
"name": card.name,
|
|
"set_code": card.set_code,
|
|
"collector_number": card.collector_number,
|
|
"rarity": str(card.rarity),
|
|
"color_identity": tutor.models.Color.to_string(card.color_identity),
|
|
"cmc": str(card.cmc),
|
|
"mana_cost": card.mana_cost,
|
|
"type_line": card.type_line,
|
|
"release_date": str(card.release_date) if card.release_date else None,
|
|
"edhrec_rank": card.edhrec_rank,
|
|
"oracle_text": card.oracle_text,
|
|
},
|
|
)
|
|
|
|
|
|
async def store_price(
|
|
db: psycopg.Cursor, date: datetime.date, card: tutor.models.Card
|
|
) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO "tmp_prices"
|
|
("scryfall_id", "date", "usd", "usd_foil", "eur", "eur_foil", "tix")
|
|
VALUES (%(scryfall_id)s, %(date)s, %(usd)s, %(usd_foil)s, %(eur)s, %(eur_foil)s, %(tix)s)
|
|
""",
|
|
{
|
|
"scryfall_id": card.scryfall_id,
|
|
"date": str(date),
|
|
"usd": str(card.price_usd) if card.price_usd else None,
|
|
"usd_foil": str(card.price_usd_foil) if card.price_usd_foil else None,
|
|
"eur": str(card.price_eur) if card.price_eur else None,
|
|
"eur_foil": str(card.price_eur_foil) if card.price_eur_foil else None,
|
|
"tix": str(card.price_tix) if card.price_tix else None,
|
|
},
|
|
)
|
|
|
|
|
|
async def store_set(db: psycopg.Cursor, set_code: str, name: str) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO "sets" ("set_code", "name")
|
|
VALUES (%(set_code)s, %(name)s)
|
|
ON CONFLICT ("set_code") DO NOTHING
|
|
""",
|
|
{"set_code": set_code, "name": name},
|
|
)
|
|
|
|
|
|
async def store_copy(db: psycopg.Cursor, copy: tutor.models.CardCopy) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO copies ("scryfall_id", "isFoil", "collection", "condition")
|
|
VALUES (%(scryfall_id)s, %(foil)s, %(collection)s, %(condition)s)
|
|
""",
|
|
{
|
|
"scryfall_id": str(copy.card.scryfall_id),
|
|
"foil": copy.foil,
|
|
"collection": copy.collection,
|
|
"condition": copy.condition,
|
|
},
|
|
)
|
|
|
|
|
|
async def clear_copies(db: psycopg.Cursor, collection: typing.Optional[str] = None):
|
|
if collection:
|
|
await db.execute("DELETE FROM copies WHERE collection = %s", collection)
|
|
else:
|
|
await db.execute("DELETE FROM copies")
|
|
|
|
|
|
async def store_deck(db: psycopg.Cursor, name: str) -> None:
|
|
await db.execute(
|
|
'INSERT INTO "decks" ("name") VALUES (%(name)s) RETURNING "deck_id"',
|
|
{"name": name},
|
|
)
|
|
result = await db.fetchone()
|
|
return result[0]
|
|
|
|
|
|
async def store_deck_card(
|
|
db: psycopg.Cursor, deck_id: int, oracle_id: uuid.UUID, quantity: int = 1
|
|
) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO "deck_list" ("deck_id", "oracle_id", "quantity")
|
|
VALUES (%(deck_id)s, %(oracle_id)s, %(quantity)s)
|
|
""",
|
|
{"deck_id": deck_id, "oracle_id": str(oracle_id), "quantity": quantity},
|
|
)
|
|
|
|
|
|
async def get_decks(
|
|
db: psycopg.Cursor, limit: int = 10, offset: int = 0
|
|
) -> typing.List[tutor.models.Deck]:
|
|
db.row_factory = psycopg.rows.dict_row
|
|
await db.execute(
|
|
"""
|
|
SELECT "decks"."deck_id"
|
|
, "decks"."name"
|
|
, JSON_STRIP_NULLS(JSON_AGG(JSON_BUILD_OBJECT(
|
|
'oracle_id', "deck_list"."oracle_id",
|
|
'name', "oracle_latest"."name",
|
|
'color_identity', "oracle_latest"."color_identity",
|
|
'cmc', "oracle_latest"."cmc",
|
|
'type_line', "oracle_latest"."type_line",
|
|
'edhrec_rank', "oracle_latest"."edhrec_rank",
|
|
'oracle_text', "oracle_latest"."oracle_text",
|
|
'scryfall_id', "oracle_latest"."scryfall_id",
|
|
'set_code', "oracle_latest"."set_code",
|
|
'collector_number', "oracle_latest"."collector_number",
|
|
'rarity', "oracle_latest"."rarity",
|
|
'release_date', "oracle_latest"."release_date",
|
|
'quantity', "deck_list"."quantity"
|
|
))) AS "cards"
|
|
FROM "decks"
|
|
LEFT JOIN "deck_list" USING ("deck_id")
|
|
LEFT JOIN "oracle_latest" USING ("oracle_id")
|
|
GROUP BY "decks"."deck_id"
|
|
, "decks"."name"
|
|
ORDER BY "decks"."deck_id"
|
|
LIMIT %(limit)s OFFSET %(offset)s
|
|
""",
|
|
{"limit": limit, "offset": offset},
|
|
)
|
|
rows = await db.fetchall()
|
|
return [
|
|
tutor.models.Deck(
|
|
deck_id=row["deck_id"],
|
|
name=row["name"],
|
|
cards=[
|
|
tutor.models.DeckCard(
|
|
card=tutor.models.Card(
|
|
oracle_id=card["oracle_id"],
|
|
name=card["name"],
|
|
color_identity=tutor.models.Color.from_string(
|
|
card["color_identity"]
|
|
),
|
|
cmc=card["cmc"],
|
|
type_line=card["type_line"],
|
|
games=set(),
|
|
legalities={},
|
|
edhrec_rank=card.get("edhrec_rank"),
|
|
oracle_text=card.get("oracle_text"),
|
|
scryfall_id=card["scryfall_id"],
|
|
set_code=card["set_code"],
|
|
collector_number=card["collector_number"],
|
|
rarity=card["rarity"],
|
|
release_date=card["release_date"],
|
|
),
|
|
quantity=card["quantity"],
|
|
)
|
|
for card in row["cards"]
|
|
if card and card.get("oracle_id")
|
|
],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
async def get_deck(
|
|
db: psycopg.Cursor, deck_id: int
|
|
) -> typing.Optional[tutor.models.Deck]:
|
|
db.row_factory = psycopg.rows.dict_row
|
|
await db.execute(
|
|
"""
|
|
SELECT "decks"."deck_id"
|
|
, "decks"."name"
|
|
, JSON_STRIP_NULLS(JSON_AGG(JSON_BUILD_OBJECT(
|
|
'oracle_id', "deck_list"."oracle_id",
|
|
'name', "oracle_latest"."name",
|
|
'color_identity', "oracle_latest"."color_identity",
|
|
'cmc', "oracle_latest"."cmc",
|
|
'type_line', "oracle_latest"."type_line",
|
|
'edhrec_rank', "oracle_latest"."edhrec_rank",
|
|
'oracle_text', "oracle_latest"."oracle_text",
|
|
'scryfall_id', "oracle_latest"."scryfall_id",
|
|
'set_code', "oracle_latest"."set_code",
|
|
'collector_number', "oracle_latest"."collector_number",
|
|
'rarity', "oracle_latest"."rarity",
|
|
'release_date', "oracle_latest"."release_date",
|
|
'quantity', "deck_list"."quantity"
|
|
))) AS "cards"
|
|
FROM "decks"
|
|
LEFT JOIN "deck_list" USING ("deck_id")
|
|
LEFT JOIN "oracle_latest" USING ("oracle_id")
|
|
WHERE "decks"."deck_id" = %(deck_id)s
|
|
GROUP BY "decks"."deck_id"
|
|
, "decks"."name"
|
|
""",
|
|
{"deck_id": deck_id},
|
|
)
|
|
row = await db.fetchone()
|
|
if row:
|
|
return tutor.models.Deck(
|
|
deck_id=row["deck_id"],
|
|
name=row["name"],
|
|
cards=[
|
|
tutor.models.DeckCard(
|
|
card=tutor.models.Card(
|
|
oracle_id=card["oracle_id"],
|
|
name=card["name"],
|
|
color_identity=tutor.models.Color.from_string(
|
|
card["color_identity"]
|
|
),
|
|
cmc=card["cmc"],
|
|
type_line=card["type_line"],
|
|
games=set(),
|
|
legalities={},
|
|
edhrec_rank=card.get("edhrec_rank"),
|
|
oracle_text=card.get("oracle_text"),
|
|
scryfall_id=card["scryfall_id"],
|
|
set_code=card["set_code"],
|
|
collector_number=card["collector_number"],
|
|
rarity=card["rarity"],
|
|
release_date=card["release_date"],
|
|
),
|
|
quantity=card["quantity"],
|
|
)
|
|
for card in row["cards"]
|
|
if card and card.get("oracle_id")
|
|
],
|
|
)
|
|
|
|
|
|
async def store_var(db: psycopg.Cursor, key: str, value: str) -> None:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO "vars" ("key", "value")
|
|
VALUES (%(key)s, %(value)s)
|
|
ON CONFLICT ("key") DO UPDATE
|
|
SET "value" = %(value)s
|
|
""",
|
|
{"key": key, "value": value},
|
|
)
|
|
|
|
|
|
async def collection_stats(db: psycopg.Cursor) -> dict:
|
|
db.row_factory = psycopg.rows.dict_row
|
|
await db.execute(
|
|
"""
|
|
SELECT COUNT("copies"."id") AS cards
|
|
, SUM(
|
|
CASE WHEN "copies"."isFoil"
|
|
THEN "card_prices"."usd_foil"
|
|
ELSE "card_prices"."usd"
|
|
END
|
|
) AS value
|
|
, COUNT(DISTINCT cards.set_code) AS sets
|
|
FROM "copies"
|
|
JOIN "cards" USING ("scryfall_id")
|
|
LEFT JOIN "card_prices" USING ("scryfall_id")
|
|
WHERE "card_prices"."date" =
|
|
(SELECT "value" FROM "vars" where "key" = 'last_update')
|
|
"""
|
|
)
|
|
return await db.fetchone()
|