Add Postgres support (#67)

* Add Postgres support

- Add engine switch to command
- Add support for Postgres schema syntax
- Leverage Postgres enum types
- Leverage Postgres array types

* Formatting, types, cleanup
This commit is contained in:
Antony Derham 2021-01-17 06:41:45 +00:00 committed by GitHub
parent 0c889412fd
commit 2c2e51ddd3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 150 additions and 112 deletions

View file

@ -37,6 +37,12 @@ if __name__ == "__main__":
action="store_true",
required=False,
)
parser.add_argument(
"-e",
help="SQL database engine ('postgres' or 'mysql'). Only used if output file has .sql extension.",
default="postgres",
required=False,
)
args = parser.parse_args()
# Define our I/O paths
@ -59,6 +65,7 @@ if __name__ == "__main__":
input_file,
{"path": output_file["path"].joinpath("AllPrintings.sql"), "handle": None},
args.x,
args.e,
)
logging.info("> Creating AllPrintings CSV components")
@ -69,4 +76,4 @@ if __name__ == "__main__":
elif str(input_file).endswith(".sqlite"):
sql2csv.execute(input_file, output_file)
else:
json2sql.execute(input_file, output_file, args.x)
json2sql.execute(input_file, output_file, args.x, args.e)

View file

@ -7,18 +7,20 @@ import logging
import pathlib
import sqlite3
import time
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Union, Literal
LOGGER = logging.getLogger(__name__)
JsonDict = Dict[str, any]
Engine = Literal["postgres", "mysql", "sqlite"]
def execute(json_input, output_file, check_extras=False) -> None:
def execute(json_input, output_file, check_extras=False, engine: Engine = "postgres") -> None:
"""Main function to handle the logic
:param json_input: Input file (JSON)
:param output_file: Output dir
:param extras: additional json files to process
:param check_extras: additional json files to process
:param engine: SQL database engine
"""
if not valid_input_output(json_input, output_file):
exit(1)
@ -27,10 +29,10 @@ def execute(json_input, output_file, check_extras=False) -> None:
LOGGER.info("Loading json file into memory")
with json_input.open("r", encoding="utf8") as json_file:
json_data = json.load(json_file)
build_sql_database(output_file, json_data)
build_sql_schema(json_data, output_file)
parse_and_import_cards(json_data, json_input, output_file)
parse_and_import_extras(json_input, output_file)
build_sql_database(output_file, json_data, engine)
build_sql_schema(json_data, output_file, engine)
parse_and_import_cards(json_data, json_input, output_file, engine)
parse_and_import_extras(json_input, output_file, engine)
commit_changes_and_close_db(output_file)
@ -76,7 +78,7 @@ def check_extra_inputs(input_file: pathlib.Path,
output_dir[extra] = True
def build_sql_database(output_file: str, json_data: JsonDict) -> None:
def build_sql_database(output_file: Dict, json_data: JsonDict, engine: Engine) -> None:
if output_file["path"].suffix == ".sql":
version = get_version(json_data)
output_file["handle"] = open(output_file["path"], "w", encoding="utf8")
@ -89,7 +91,7 @@ def build_sql_database(output_file: str, json_data: JsonDict) -> None:
"-- MTGJSON Version: {}".format(version),
"",
"START TRANSACTION;",
"SET names 'utf8mb4';",
"SET names 'utf8mb4';" if engine != "postgres" else "",
"",
"",
)
@ -112,24 +114,24 @@ def get_version(json_data: Dict) -> str:
return "Unknown"
def build_sql_schema(json_data: Dict, output_file: Dict) -> None:
def build_sql_schema(json_data: Dict, output_file: Dict, engine: Engine) -> None:
"""
Create the SQLite DB schema
"""
LOGGER.info("Building SQLite schema")
if output_file["path"].suffix == ".sql":
schema = generate_sql_schema(json_data, output_file, "mysql")
LOGGER.info("Building SQL schema")
schema = generate_sql_schema(json_data, output_file, engine)
output_file["handle"].write(schema)
output_file["handle"].write("COMMIT;\n\n")
else:
LOGGER.info("Building SQLite schema")
schema = generate_sql_schema(json_data, output_file, "sqlite")
cursor = output_file["handle"].cursor()
cursor.executescript(schema)
output_file["handle"].commit()
def generate_sql_schema(json_data: Dict,
output_file: Dict, engine: str) -> str:
def generate_sql_schema(json_data: Dict, output_file: Dict, engine: Engine) -> str:
"""
Generate the SQL database schema from the JSON input
@ -151,19 +153,23 @@ def generate_sql_schema(json_data: Dict,
"date": {"type": "DATE"},
},
"legalities": {
"format": {"type": "TEXT" if engine == "sqlite" else "ENUM"},
"status": {"type": "TEXT" if engine == "sqlite" else "ENUM"},
"format": {
"type": "legalities_format" if engine == "postgres" else "TEXT" if engine == "sqlite" else "ENUM"},
"status": {
"type": "legalities_status" if engine == "postgres" else "TEXT" if engine == "sqlite" else "ENUM"},
},
"foreign_data": {
"flavorText": {"type": "TEXT"},
"language": {"type": "TEXT" if engine == "sqlite" else "ENUM"},
"language": {
"type": "foreign_data_language" if engine == "postgres" else "TEXT" if engine == "sqlite" else "ENUM"},
"multiverseid": {"type": "INTEGER"},
"name": {"type": "TEXT"},
"text": {"type": "TEXT"},
"type": {"type": "TEXT"},
},
"set_translations": {
"language": {"type": "TEXT" if engine == "sqlite" else "ENUM"},
"language": {
"type": "set_translations_language" if engine == "postgres" else "TEXT" if engine == "sqlite" else "ENUM"},
"translation": {"type": "TEXT"},
},
}
@ -213,22 +219,23 @@ def generate_sql_schema(json_data: Dict,
# handle enum options
if cardKey in enums:
if cardKey == "foreign_data":
if schema[cardKey]["language"]["type"] == "ENUM":
if schema[cardKey]["language"]["type"] != "TEXT":
for foreign in cardValue:
if "options" in schema[cardKey]["language"]:
if foreign["language"] not in schema[cardKey]["language"]["options"]:
schema[cardKey]["language"]["options"].append(foreign["language"])
schema[cardKey]["language"]["options"].append(
foreign["language"])
else:
schema[cardKey]["language"]["options"] = [foreign["language"]]
elif cardKey == "legalities":
if schema[cardKey]["format"]["type"] == "ENUM":
if schema[cardKey]["format"]["type"] != "TEXT":
for format in cardValue.keys():
if "options" in schema[cardKey]["format"]:
if format not in schema[cardKey]["format"]["options"]:
schema[cardKey]["format"]["options"].append(format)
else:
schema[cardKey]["format"]["options"] = [format]
if schema[cardKey]["status"]["type"] == "ENUM":
if schema[cardKey]["status"]["type"] != "TEXT":
for status in cardValue.values():
if "options" in schema[cardKey]["status"]:
if status not in schema[cardKey]["status"]["options"]:
@ -236,7 +243,7 @@ def generate_sql_schema(json_data: Dict,
else:
schema[cardKey]["status"]["options"] = [status]
elif cardKey == "prices":
if schema[cardKey]["type"]["type"] == "ENUM":
if schema[cardKey]["type"]["type"] != "TEXT":
for type in cardValue.keys():
if "options" in schema[cardKey]["type"]:
if type not in schema[cardKey]["type"]["options"]:
@ -249,6 +256,10 @@ def generate_sql_schema(json_data: Dict,
schema[cardKey]["uuid"] = {
"type": "TEXT(36) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE"
}
if engine == "postgres":
schema[cardKey]["uuid"] = {
"type": "CHAR(36) NOT NULL,\n FOREIGN KEY (uuid) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE"
}
else:
schema[cardKey]["uuid"] = {
"type": "CHAR(36) NOT NULL,\n INDEX(uuid),\n FOREIGN KEY (uuid) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE"
@ -260,8 +271,12 @@ def generate_sql_schema(json_data: Dict,
if cardValue not in schema[setKey][cardKey]["options"]:
schema[setKey][cardKey]["options"].append(cardValue)
else:
if cardKey in enums[setKey] and not engine == "sqlite":
schema[setKey][cardKey] = {"type": "ENUM", "options": [cardValue]}
if cardKey in enums[setKey]:
if engine == "postgres":
schema[setKey][cardKey] = {"type": f"{setKey}_{cardKey}",
"options": [cardValue]}
if engine == "mysql":
schema[setKey][cardKey] = {"type": "ENUM", "options": [cardValue]}
else:
# determine type of the property
schema[setKey][cardKey] = {
@ -271,16 +286,16 @@ def generate_sql_schema(json_data: Dict,
if cardKey in indexes[setKey]:
if engine == "sqlite":
schema[setKey][cardKey]["type"] += (
indexes[setKey][cardKey] + " NOT NULL"
indexes[setKey][cardKey] + " NOT NULL"
)
else:
schema[setKey][cardKey]["type"] = (
"CHAR"
+ indexes[setKey][cardKey]
+ " NOT NULL"
"CHAR"
+ indexes[setKey][cardKey]
+ " NOT NULL"
)
if setKey == "set_translations":
if schema[setKey]["language"]["type"] == "ENUM":
if schema[setKey]["language"]["type"] != "TEXT":
if setValue:
for language in setValue.keys():
if "options" not in schema[setKey]["language"]:
@ -294,6 +309,10 @@ def generate_sql_schema(json_data: Dict,
schema[setKey]["setCode"] = {
"type": "TEXT(8) REFERENCES sets(code) ON UPDATE CASCADE ON DELETE CASCADE"
}
if engine == "postgres":
schema[setKey]["setCode"] = {
"type": "VARCHAR(8) NOT NULL,\n FOREIGN KEY (setCode) REFERENCES sets(code) ON UPDATE CASCADE ON DELETE CASCADE"
}
else:
schema[setKey]["setCode"] = {
"type": "VARCHAR(8) NOT NULL,\n INDEX(setCode),\n FOREIGN KEY (setCode) REFERENCES sets(code) ON UPDATE CASCADE ON DELETE CASCADE"
@ -307,14 +326,17 @@ def generate_sql_schema(json_data: Dict,
else:
# handle boosters
if setKey == "booster":
if engine == "sqlite":
if engine == "sqlite" or engine == "postgres":
schema["sets"]["booster"] = {"type": "TEXT"}
else:
schema["sets"]["booster"] = {"type": "LONGTEXT"}
continue
# determine type of the set property
if setKey in enums["sets"] and not engine == "sqlite":
schema["sets"][setKey] = {"type": "ENUM", "options": [setValue]}
if setKey in enums["sets"]:
if engine == "postgres":
schema["sets"][setKey] = {"type": f"sets_{setKey}", "options": [setValue]}
if engine == "mysql":
schema["sets"][setKey] = {"type": "ENUM", "options": [setValue]}
elif setKey == "releaseDate":
schema["sets"][setKey] = {"type": "DATE"}
else:
@ -324,11 +346,11 @@ def generate_sql_schema(json_data: Dict,
if setKey in indexes["sets"]:
if engine == "sqlite":
schema["sets"][setKey]["type"] += (
indexes["sets"][setKey] + " NOT NULL"
indexes["sets"][setKey] + " NOT NULL"
)
else:
schema["sets"][setKey]["type"] = (
"VARCHAR" + indexes["sets"][setKey] + " NOT NULL"
"VARCHAR" + indexes["sets"][setKey] + " NOT NULL"
)
# add extra tables manually if necessary
@ -339,7 +361,8 @@ def generate_sql_schema(json_data: Dict,
}
if output_file["AllPrices.json"] or version.startswith("4"):
schema["prices"] = {
"uuid": { "type": "TEXT(36) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE" if engine == "sqlite" else "CHAR(36) NOT NULL,\n INDEX(uuid),\n FOREIGN KEY (uuid) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE" },
"uuid": {
"type": "TEXT(36) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE" if engine == "sqlite" else "CHAR(36) NOT NULL,\n INDEX(uuid),\n FOREIGN KEY (uuid) REFERENCES cards(uuid) ON UPDATE CASCADE ON DELETE CASCADE"},
"price": {"type": "FLOAT" if engine == "sqlite" else "DECIMAL(8,2)"},
"type": {"type": "TEXT" if engine == "sqlite" else "ENUM"},
"date": {"type": "DATE"},
@ -373,16 +396,20 @@ def generate_sql_schema(json_data: Dict,
return get_query_from_dict(schema, engine)
def get_sql_type(mixed, engine: str) -> str:
def get_sql_type(mixed, engine: Engine) -> str:
"""
Return a string with the type of the parameter mixed
The type depends on the SQL engine in some cases
"""
if isinstance(mixed, str) or isinstance(mixed, list) or isinstance(mixed, dict):
if isinstance(mixed, list) and engine == "postgres":
return "TEXT[]"
elif isinstance(mixed, str) or isinstance(mixed, list) or isinstance(mixed, dict):
return "TEXT"
elif isinstance(mixed, bool):
if engine == "sqlite":
if engine == "postgres":
return "BOOLEAN NOT NULL DEFAULT false"
elif engine == "sqlite":
return "INTEGER NOT NULL DEFAULT 0"
else:
return "TINYINT(1) NOT NULL DEFAULT 0"
@ -393,11 +420,21 @@ def get_sql_type(mixed, engine: str) -> str:
return "TEXT"
def get_query_from_dict(schema, engine):
def get_query_from_dict(schema, engine: Engine):
q = ""
for table_name, table_data in schema.items():
q += f"CREATE TABLE `{table_name}` (\n"
if engine == "sqlite":
if engine == "postgres":
for attribute in sorted(table_data.keys()):
if "options" in table_data[attribute]:
q += f"CREATE TYPE {table_data[attribute]['type']} AS ENUM ('" + "', '".join(
table_data[attribute]["options"]) + "');\n"
q += f"CREATE TABLE \"{table_name}\" (\n"
else:
q += f"CREATE TABLE `{table_name}` (\n"
if engine == "postgres":
q += " id SERIAL PRIMARY KEY,\n"
elif engine == "sqlite":
q += " id INTEGER PRIMARY KEY AUTOINCREMENT,\n"
else:
q += " id INTEGER PRIMARY KEY AUTO_INCREMENT,\n"
@ -408,7 +445,7 @@ def get_query_from_dict(schema, engine):
if table_data[attribute]["type"] == "ENUM":
q += "('" + "', '".join(table_data[attribute]["options"]) + "')"
q += ",\n"
if engine == "sqlite":
if engine == "sqlite" or engine == "postgres":
q = q[:-2] + "\n);\n\n"
else:
q = q[:-2] + "\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;\n\n"
@ -416,9 +453,7 @@ def get_query_from_dict(schema, engine):
return q
def parse_and_import_cards(
json_data: Dict, input_file: pathlib.Path, output_file: Dict
) -> None:
def parse_and_import_cards(json_data: Dict, input_file: pathlib.Path, output_file: Dict, engine: Engine) -> None:
"""
Parse the JSON cards and input them into the database
@ -427,32 +462,32 @@ def parse_and_import_cards(
"""
LOGGER.info("Building sets")
if "data" in json_data:
sql_dict_insert(json_data["meta"], "meta", output_file)
sql_dict_insert(json_data["meta"], "meta", output_file, engine)
json_data = json_data["data"]
for set_code, set_data in json_data.items():
LOGGER.info(f"Inserting set row for {set_code}")
set_insert_values = handle_set_row_insertion(set_data)
sql_dict_insert(set_insert_values, "sets", output_file)
set_insert_values = handle_set_row_insertion(set_data, engine)
sql_dict_insert(set_insert_values, "sets", output_file, engine)
for card in set_data.get("cards"):
LOGGER.debug(f"Inserting card row for {card.get('name')}")
card_attr: JsonDict = handle_card_row_insertion(card, set_code)
sql_insert_all_card_fields(card_attr, output_file)
card_attr: JsonDict = handle_card_row_insertion(card, set_code, engine)
sql_insert_all_card_fields(card_attr, output_file, engine)
for token in set_data.get("tokens"):
LOGGER.debug(f"Inserting token row for {token.get('name')}")
token_attr = handle_token_row_insertion(token, set_code)
sql_dict_insert(token_attr, "tokens", output_file)
token_attr = handle_token_row_insertion(token, set_code, engine)
sql_dict_insert(token_attr, "tokens", output_file, engine)
for language, translation in set_data.get("translations", {}).items():
LOGGER.debug(f"Inserting set_translation row for {language}")
set_translation_attr = handle_set_translation_row_insertion(
language, translation, set_code
)
sql_dict_insert(set_translation_attr, "set_translations", output_file)
sql_dict_insert(set_translation_attr, "set_translations", output_file, engine)
def handle_set_row_insertion(set_data: JsonDict) -> JsonDict:
def handle_set_row_insertion(set_data: JsonDict, engine: Engine) -> JsonDict:
"""
This method will take the set data and convert it,
preparing for SQLite insertion
@ -468,15 +503,15 @@ def handle_set_row_insertion(set_data: JsonDict) -> JsonDict:
continue
if key == "boosterV3":
set_insert_values[key] = modify_for_sql_insert(str(value))
set_insert_values[key] = modify_for_sql_insert(str(value), engine)
continue
set_insert_values[key] = modify_for_sql_insert(value)
set_insert_values[key] = modify_for_sql_insert(value, engine)
return set_insert_values
def handle_card_row_insertion(card_data: JsonDict, set_name: str) -> JsonDict:
def handle_card_row_insertion(card_data: JsonDict, set_name: str, engine: Engine) -> JsonDict:
"""
This method will take the card data and convert it,
preparing for SQLite insertion
@ -494,9 +529,9 @@ def handle_card_row_insertion(card_data: JsonDict, set_name: str) -> JsonDict:
continue
if key == "identifiers":
for idKey, idValue in value.items():
card_insert_values[idKey] = modify_for_sql_insert(idValue)
card_insert_values[idKey] = modify_for_sql_insert(idValue, engine)
else:
card_insert_values[key] = modify_for_sql_insert(value)
card_insert_values[key] = modify_for_sql_insert(value, engine)
foreign_insert_values: List[JsonDict] = []
if card_skip_keys[0] in card_data.keys():
@ -523,9 +558,7 @@ def handle_card_row_insertion(card_data: JsonDict, set_name: str) -> JsonDict:
}
def sql_insert_all_card_fields(
card_attributes: JsonDict, output_file: Dict
) -> None:
def sql_insert_all_card_fields(card_attributes: JsonDict, output_file: Dict, engine: Engine) -> None:
"""
Given all of the card's data, insert the data into the
appropriate SQLite tables.
@ -533,23 +566,26 @@ def sql_insert_all_card_fields(
:param card_attributes: Tuple of data
:param output_file: Output info dictionary
"""
sql_dict_insert(card_attributes["cards"], "cards", output_file)
sql_dict_insert(card_attributes["cards"], "cards", output_file, engine)
for foreign_val in card_attributes["foreign_data"]:
sql_dict_insert(foreign_val, "foreign_data", output_file)
sql_dict_insert(foreign_val, "foreign_data", output_file, engine)
for legal_val in card_attributes["legalities"]:
sql_dict_insert(legal_val, "legalities", output_file)
sql_dict_insert(legal_val, "legalities", output_file, engine)
for rule_val in card_attributes["rulings"]:
sql_dict_insert(rule_val, "rulings", output_file)
sql_dict_insert(rule_val, "rulings", output_file, engine)
if not output_file["AllPrices.json"]:
for price_val in card_attributes["prices"]:
sql_dict_insert(price_val, "prices", output_file)
sql_dict_insert(price_val, "prices", output_file, engine)
def handle_token_row_insertion(token_data: JsonDict, set_name: str) -> JsonDict:
def handle_token_row_insertion(
token_data: JsonDict,
set_name: str,
engine: Engine) -> JsonDict:
"""
This method will take the token data and convert it,
preparing for SQLite insertion
@ -562,16 +598,14 @@ def handle_token_row_insertion(token_data: JsonDict, set_name: str) -> JsonDict:
for key, value in token_data.items():
if key == "identifiers":
for idKey, idValue in value.items():
token_insert_values[idKey] = modify_for_sql_insert(idValue)
token_insert_values[idKey] = modify_for_sql_insert(idValue, engine)
else:
token_insert_values[key] = modify_for_sql_insert(value)
token_insert_values[key] = modify_for_sql_insert(value, engine)
return token_insert_values
def handle_set_translation_row_insertion(
language: str, translation: str, set_name: str
) -> JsonDict:
def handle_set_translation_row_insertion(language: str, translation: str, set_name: str) -> JsonDict:
"""
This method will take the set translation data and convert it,
preparing for SQLite insertion
@ -590,7 +624,7 @@ def handle_set_translation_row_insertion(
return set_translation_insert_values
def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict) -> None:
def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict, engine: Engine) -> None:
"""
Parse the extra data files and input them into the database
@ -600,7 +634,7 @@ def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict) -> None
if output_file["AllPrices.json"]:
LOGGER.info("Inserting AllPrices rows")
with input_file.parent.joinpath("AllPrices.json").open(
"r", encoding="utf8"
"r", encoding="utf8"
) as f:
json_data = json.load(f)
for card_uuid, price_data in json_data.items():
@ -617,6 +651,7 @@ def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict) -> None
},
"prices",
output_file,
engine
)
if output_file["AllDeckFiles"]:
@ -638,12 +673,12 @@ def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict) -> None
deck_data[key] = value
if "fileName" not in deck_data:
deck_data["fileName"] = deck_file.stem
sql_dict_insert(deck_data, "decks", output_file)
sql_dict_insert(deck_data, "decks", output_file, engine)
if output_file["Keywords.json"]:
LOGGER.info("Inserting Keyword rows")
with input_file.parent.joinpath("Keywords.json").open(
"r", encoding="utf8"
"r", encoding="utf8"
) as f:
json_data = json.load(f)
for keyword_type in json_data:
@ -651,13 +686,13 @@ def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict) -> None
continue
for keyword in json_data[keyword_type]:
sql_dict_insert(
{"word": keyword, "type": keyword_type}, "keywords", output_file
{"word": keyword, "type": keyword_type}, "keywords", output_file, engine
)
if output_file["CardTypes.json"]:
LOGGER.info("Inserting Card Type rows")
with input_file.parent.joinpath("CardTypes.json").open(
"r", encoding="utf8"
"r", encoding="utf8"
) as f:
json_data = json.load(f)
for type in json_data["types"]:
@ -675,12 +710,11 @@ def parse_and_import_extras(input_file: pathlib.Path, output_file: Dict) -> None
},
"types",
output_file,
engine
)
def handle_foreign_rows(
card_data: JsonDict, card_uuid: str
) -> List[JsonDict]:
def handle_foreign_rows(card_data: JsonDict, card_uuid: str) -> List[JsonDict]:
"""
This method will take the card data and convert it,
preparing for SQLite insertion
@ -707,9 +741,7 @@ def handle_foreign_rows(
return foreign_entries
def handle_legal_rows(
card_data: JsonDict, card_uuid: str
) -> List[JsonDict]:
def handle_legal_rows(card_data: JsonDict, card_uuid: str) -> List[JsonDict]:
"""
This method will take the card data and convert it,
preparing for SQLite insertion
@ -727,9 +759,7 @@ def handle_legal_rows(
return legalities
def handle_ruling_rows(
card_data: JsonDict, card_uuid: str
) -> List[JsonDict]:
def handle_ruling_rows(card_data: JsonDict, card_uuid: str) -> List[JsonDict]:
"""This method will take the card data and convert it,
preparing for SQLite insertion
@ -749,9 +779,7 @@ def handle_ruling_rows(
return rulings
def handle_price_rows(
card_data: JsonDict, card_uuid: str
) -> List[JsonDict]:
def handle_price_rows(card_data: JsonDict, card_uuid: str) -> List[JsonDict]:
"""This method will take the card data and convert it,
preparing for SQLite insertion
@ -775,11 +803,12 @@ def handle_price_rows(
return prices
def modify_for_sql_insert(data: Any) -> Union[str, int, float, None]:
def modify_for_sql_insert(data: Any, engine: Engine) -> Union[str, int, float, None]:
"""
Arrays and booleans can't be inserted, so we need to stringify
:param data: Data to modify
:param engine: SQL engine in use
:return: string value
"""
if isinstance(data, (str, int, float)):
@ -789,11 +818,13 @@ def modify_for_sql_insert(data: Any) -> Union[str, int, float, None]:
if not data:
return None
if isinstance(data, list) and data and isinstance(data[0], str):
if isinstance(data, list) and engine == "postgres":
return "{\"" + "\",\"".join(data) + "\"}"
elif isinstance(data, list) and data and isinstance(data[0], str):
return ",".join(data)
if isinstance(data, bool):
return int(data)
return data if engine == "postgres" else int(data)
if isinstance(data, dict):
return str(data)
@ -801,20 +832,20 @@ def modify_for_sql_insert(data: Any) -> Union[str, int, float, None]:
return ""
def modify_for_sql_file(data: JsonDict) -> JsonDict:
def modify_for_sql_file(data: JsonDict, engine: Engine) -> JsonDict:
for key in data.keys():
if isinstance(data[key], str):
data[key] = "'" + data[key].replace("'", "''") + "'"
if str(data[key]) == "False":
data[key] = 0
data[key] = "false" if engine == "postgres" else 0
if str(data[key]) == "True":
data[key] = 1
data[key] = "true" if engine == "postgres" else 1
if data[key] is None:
data[key] = "NULL"
return data
def sql_dict_insert(data: JsonDict, table: str, output_file: Dict) -> None:
def sql_dict_insert(data: JsonDict, table: str, output_file: Dict, engine: Engine) -> None:
"""
Insert a dictionary into a sqlite table
@ -823,16 +854,16 @@ def sql_dict_insert(data: JsonDict, table: str, output_file: Dict) -> None:
:param output_file: Output info dictionary
"""
try:
if output_file["path"].suffix == ".sql":
data = modify_for_sql_file(data)
if engine != "sqlite":
data = modify_for_sql_file(data, engine)
query = (
"INSERT INTO "
+ table
+ " ("
+ ", ".join(data.keys())
+ ") VALUES ({"
+ "}, {".join(data.keys())
+ "});\n"
"INSERT INTO "
+ table
+ " ("
+ ", ".join(data.keys())
+ ") VALUES ({"
+ "}, {".join(data.keys())
+ "});\n"
)
query = query.format(**data)
output_file["handle"].write(query)