From f276efdf324c24836d8c3bfc128f9558f7245e23 Mon Sep 17 00:00:00 2001 From: Wessie Date: Fri, 20 Dec 2013 18:16:35 +0100 Subject: [PATCH] Cleaned up database.py - Moved SQLDatabase to a SQL specific file - Database class is now an abstract base class - Cursor moved into SQL specific file - Allowed for multi-database support in the future --- dejavu/__init__.py | 5 +- dejavu/cursor.py | 59 ------- dejavu/database.py | 348 ++++++++++++--------------------------- dejavu/database_sql.py | 366 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 472 insertions(+), 306 deletions(-) delete mode 100644 dejavu/cursor.py create mode 100644 dejavu/database_sql.py diff --git a/dejavu/__init__.py b/dejavu/__init__.py index c10e54d..e93edac 100755 --- a/dejavu/__init__.py +++ b/dejavu/__init__.py @@ -1,4 +1,4 @@ -from dejavu.database import SQLDatabase +from dejavu.database import get_database import dejavu.decoder as decoder import fingerprint from multiprocessing import Process, cpu_count @@ -13,8 +13,9 @@ class Dejavu(object): self.config = config # initialize db - self.db = SQLDatabase(**config.get("database", {})) + db_cls = get_database(config.get("database_type", None)) + self.db = db_cls(**config.get("database", {})) self.db.setup() # get songs previously indexed diff --git a/dejavu/cursor.py b/dejavu/cursor.py deleted file mode 100644 index 55429a7..0000000 --- a/dejavu/cursor.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import unicode_literals -from __future__ import absolute_import -import Queue - -import MySQLdb as mysql -import MySQLdb.cursors - - -def cursor_factory(**factory_options): - def cursor(**options): - options.update(factory_options) - return Cursor(**options) - return cursor - - -class Cursor(object): - """ - Establishes a connection to the database and returns an open cursor. - - - ```python - # Use as context manager - with Cursor() as cur: - cur.execute(query) - ``` - """ - _cache = Queue.Queue(maxsize=5) - - def __init__(self, cursor_type=mysql.cursors.Cursor, **options): - super(Cursor, self).__init__() - - try: - conn = self._cache.get_nowait() - except Queue.Empty: - conn = mysql.connect(**options) - else: - # Ping the connection before using it from the cache. - conn.ping(True) - - self.conn = conn - self.cursor_type = cursor_type - - def __enter__(self): - self.cursor = self.conn.cursor(self.cursor_type) - return self.cursor - - def __exit__(self, extype, exvalue, traceback): - # if we had a MySQL related error we try to rollback the cursor. - if extype is mysql.MySQLError: - self.cursor.rollback() - - self.cursor.close() - self.conn.commit() - - # Put it back on the queue - try: - self._cache.put_nowait(self.conn) - except Queue.Full: - self.conn.close() diff --git a/dejavu/database.py b/dejavu/database.py index 75af1bf..b0bfc4a 100755 --- a/dejavu/database.py +++ b/dejavu/database.py @@ -1,308 +1,166 @@ from __future__ import absolute_import -from itertools import izip_longest - -from dejavu.cursor import cursor_factory -from MySQLdb.cursors import DictCursor +import abc class Database(object): + __metaclass__ = abc.ABCMeta + + # Name of your Database subclass, this is used in configuration + # to refer to your class + type = None + def __init__(self): super(Database, self).__init__() + def before_fork(self): + """ + Called before the database instance is given to the new process + """ + pass -class SQLDatabase(Database): - """ - Queries: + def after_fork(self): + """ + Called after the database instance has been given to the new process - 1) Find duplicates (shouldn't be any, though): - - select `hash`, `song_id`, `offset`, count(*) cnt - from fingerprints - group by `hash`, `song_id`, `offset` - having cnt > 1 - order by cnt asc; - - 2) Get number of hashes by song: - - select song_id, song_name, count(song_id) as num - from fingerprints - natural join songs - group by song_id - order by count(song_id) desc; - - 3) get hashes with highest number of collisions - - select - hash, - count(distinct song_id) as n - from fingerprints - group by `hash` - order by n DESC; - - => 26 different songs with same fingerprint (392 times): - - select songs.song_name, fingerprints.offset - from fingerprints natural join songs - where fingerprints.hash = "08d3c833b71c60a7b620322ac0c0aba7bf5a3e73"; - """ - - # config keys - CONNECTION = "connection" - KEY_USERNAME = "username" - KEY_DATABASE = "database" - KEY_PASSWORD = "password" - KEY_HOSTNAME = "hostname" - - # tables - FINGERPRINTS_TABLENAME = "fingerprints" - SONGS_TABLENAME = "songs" - - # fields - FIELD_HASH = "hash" - FIELD_SONG_ID = "song_id" - FIELD_OFFSET = "offset" - FIELD_SONGNAME = "song_name" - FIELD_FINGERPRINTED = "fingerprinted" - - # creates - CREATE_FINGERPRINTS_TABLE = """ - CREATE TABLE IF NOT EXISTS `%s` ( - `%s` binary(10) not null, - `%s` mediumint unsigned not null, - `%s` int unsigned not null, - PRIMARY KEY(%s), - UNIQUE(%s, %s, %s), - FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE CASCADE - ) ENGINE=INNODB;""" % ( - FINGERPRINTS_TABLENAME, FIELD_HASH, - FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH, - FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH, - FIELD_SONG_ID, SONGS_TABLENAME, FIELD_SONG_ID - ) - - CREATE_SONGS_TABLE = """ - CREATE TABLE IF NOT EXISTS `%s` ( - `%s` mediumint unsigned not null auto_increment, - `%s` varchar(250) not null, - `%s` tinyint default 0, - PRIMARY KEY (`%s`), - UNIQUE KEY `%s` (`%s`) - ) ENGINE=INNODB;""" % ( - SONGS_TABLENAME, FIELD_SONG_ID, FIELD_SONGNAME, FIELD_FINGERPRINTED, - FIELD_SONG_ID, FIELD_SONG_ID, FIELD_SONG_ID, - ) - - # inserts (ignores duplicates) - INSERT_FINGERPRINT = """ - INSERT IGNORE INTO %s (%s, %s, %s) VALUES - (UNHEX(%%s), %%s, %%s); - """ % (FINGERPRINTS_TABLENAME, FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET) - - INSERT_SONG = "INSERT INTO %s (%s) VALUES (%%s);" % ( - SONGS_TABLENAME, FIELD_SONGNAME) - - # selects - SELECT = """ - SELECT %s, %s FROM %s WHERE %s = UNHEX(%%s); - """ % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME, FIELD_HASH) - - SELECT_MULTIPLE = """ - SELECT HEX(%s), %s, %s FROM %s WHERE %s IN (%%s); - """ % (FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET, - FINGERPRINTS_TABLENAME, FIELD_HASH) - - SELECT_ALL = """ - SELECT %s, %s FROM %s; - """ % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME) - - SELECT_SONG = """ - SELECT %s FROM %s WHERE %s = %%s - """ % (FIELD_SONGNAME, SONGS_TABLENAME, FIELD_SONG_ID) - - SELECT_NUM_FINGERPRINTS = """ - SELECT COUNT(*) as n FROM %s - """ % (FINGERPRINTS_TABLENAME) - - SELECT_UNIQUE_SONG_IDS = """ - SELECT COUNT(DISTINCT %s) as n FROM %s WHERE %s = 1; - """ % (FIELD_SONG_ID, SONGS_TABLENAME, FIELD_FINGERPRINTED) - - SELECT_SONGS = """ - SELECT %s, %s FROM %s WHERE %s = 1; - """ % (FIELD_SONG_ID, FIELD_SONGNAME, SONGS_TABLENAME, FIELD_FINGERPRINTED) - - # drops - DROP_FINGERPRINTS = "DROP TABLE IF EXISTS %s;" % FINGERPRINTS_TABLENAME - DROP_SONGS = "DROP TABLE IF EXISTS %s;" % SONGS_TABLENAME - - # update - UPDATE_SONG_FINGERPRINTED = """ - UPDATE %s SET %s = 1 WHERE %s = %%s - """ % (SONGS_TABLENAME, FIELD_FINGERPRINTED, FIELD_SONG_ID) - - # delete - DELETE_UNFINGERPRINTED = """ - DELETE FROM %s WHERE %s = 0; - """ % (SONGS_TABLENAME, FIELD_FINGERPRINTED) - - def __init__(self, **options): - super(SQLDatabase, self).__init__() - self.cursor = cursor_factory(**options) + This will be called in the new process. + """ + pass def setup(self): """ - Creates any non-existing tables required for dejavu to function. - - This also removes all songs that have been added but have no - fingerprints associated with them. + Called on creation or shortly afterwards. """ - with self.cursor() as cur: - cur.execute(self.CREATE_SONGS_TABLE) - cur.execute(self.CREATE_FINGERPRINTS_TABLE) - cur.execute(self.DELETE_UNFINGERPRINTED) + pass + @abc.abstractmethod def empty(self): """ - Drops tables created by dejavu and then creates them again - by calling `SQLDatabase.setup`. - - .. warning: - This will result in a loss of data + Called when the database should be cleared of all data. """ - with self.cursor() as cur: - cur.execute(self.DROP_FINGERPRINTS) - cur.execute(self.DROP_SONGS) - - self.setup() + pass + @abc.abstractmethod def delete_unfingerprinted_songs(self): """ - Removes all songs that have no fingerprints associated with them. + Called to remove any song entries that do not have any fingerprints + associated with them. """ - with self.cursor() as cur: - cur.execute(self.DELETE_UNFINGERPRINTED) + pass + @abc.abstractmethod def get_num_songs(self): """ - Returns number of songs the database has fingerprinted. + Returns the amount of songs in the database. """ - with self.cursor() as cur: - cur.execute(self.SELECT_UNIQUE_SONG_IDS) - - for count, in cur: - return count - return 0 + pass + @abc.abstractmethod def get_num_fingerprints(self): """ - Returns number of fingerprints the database has fingerprinted. + Returns the number of fingerprints in the database. """ - with self.cursor() as cur: - cur.execute(self.SELECT_NUM_FINGERPRINTS) - - for count, in cur: - return count - return 0 + pass + @abc.abstractmethod def set_song_fingerprinted(self, sid): """ - Set the fingerprinted flag to TRUE (1) once a song has been completely - fingerprinted in the database. - """ - with self.cursor() as cur: - cur.execute(self.UPDATE_SONG_FINGERPRINTED, (sid,)) + Sets a specific song as having all fingerprints in the database. + sid: Song identifier + """ + pass + + @abc.abstractmethod def get_songs(self): """ - Return songs that have the fingerprinted flag set TRUE (1). + Returns all fully fingerprinted songs in the database. """ - with self.cursor(cursor_type=DictCursor) as cur: - cur.execute(self.SELECT_SONGS) - for row in cur: - yield row + pass + @abc.abstractmethod def get_song_by_id(self, sid): """ - Returns song by its ID. - """ - with self.cursor(cursor_type=DictCursor) as cur: - cur.execute(self.SELECT_SONG, (sid,)) - return cur.fetchone() + Return a song by its identifier + sid: Song identifier + """ + pass + + @abc.abstractmethod def insert(self, hash, sid, offset): """ - Insert a (sha1, song_id, offset) row into database. - """ - with self.cursor() as cur: - cur.execute(self.INSERT_FINGERPRINT, (hash, sid, offset)) + Inserts a single fingerprint into the database. - def insert_song(self, songname): + hash: Part of a sha1 hash, in hexadecimal format + sid: Song identifier this fingerprint is off + offset: The offset this hash is from """ - Inserts song in the database and returns the ID of the inserted record. - """ - with self.cursor() as cur: - cur.execute(self.INSERT_SONG, (songname,)) - return cur.lastrowid + pass + @abc.abstractmethod + def insert_song(self, song_name): + """ + Inserts a song name into the database, returns the new + identifier of the song. + + song_name: The name of the song. + """ + pass + + @abc.abstractmethod def query(self, hash): """ - Return all tuples associated with hash. + Returns all matching fingerprint entries associated with + the given hash as parameter. - If hash is None, returns all entries in the - database (be careful with that one!). + hash: Part of a sha1 hash, in hexadecimal format """ - # select all if no key - query = self.SELECT_ALL if hash is None else self.SELECT - - with self.cursor() as cur: - cur.execute(query) - for sid, offset in cur: - yield (sid, offset) + pass + @abc.abstractmethod def get_iterable_kv_pairs(self): """ - Returns all tuples in database. + Returns all fingerprints in the database. """ - return self.query(None) + pass + @abc.abstractmethod def insert_hashes(self, sid, hashes): """ - Insert series of hash => song_id, offset - values into the database. + Insert a multitude of fingerprints. + + sid: Song identifier the fingerprints belong to + hashes: A sequence of tuples in the format (hash, offset) + - hash: Part of a sha1 hash, in hexadecimal format + - offset: Offset this hash was created from/at. """ - values = [] - for hash, offset in hashes: - values.append((hash, sid, offset)) - - with self.cursor() as cur: - cur.executemany(self.INSERT_FINGERPRINT, values) + pass + @abc.abstractmethod def return_matches(self, hashes): """ - Return the (song_id, offset_diff) tuples associated with - a list of (sha1, sample_offset) values. + Searches the database for pairs of (hash, offset) values. + + hashes: A sequence of tuples in the format (hash, offset) + - hash: Part of a sha1 hash, in hexadecimal format + - offset: Offset this hash was created from/at. + + Returns a sequence of (sid, offset_difference) tuples. + + sid: Song identifier + offset_difference: (offset - database_offset) """ - # Create a dictionary of hash => offset pairs for later lookups - mapper = {} - for hash, offset in hashes: - mapper[hash.upper()] = offset - - # Get an iteratable of all the hashes we need - values = mapper.keys() - - with self.cursor() as cur: - for split_values in grouper(values, 1000): - # Create our IN part of the query - query = self.SELECT_MULTIPLE - query = query % ', '.join(['UNHEX(%s)'] * len(split_values)) - - cur.execute(query, split_values) - - for hash, sid, offset in cur: - # (sid, db_offset - song_sampled_offset) - yield (sid, offset - mapper[hash]) + pass -def grouper(iterable, n, fillvalue=None): - args = [iter(iterable)] * n - return izip_longest(fillvalue=fillvalue, *args) +def get_database(database_type=None): + # Default to using the mysql database + database_type = database_type or "mysql" + # Lower all the input. + database_type = database_type.lower() + + for db_cls in Database.__subclasses__(): + if db_cls.type == database_type: + return db_cls + + raise TypeError("Unsupported database type supplied.") diff --git a/dejavu/database_sql.py b/dejavu/database_sql.py new file mode 100644 index 0000000..03c8ad6 --- /dev/null +++ b/dejavu/database_sql.py @@ -0,0 +1,366 @@ +from __future__ import absolute_import +from itertools import izip_longest +import Queue + +import MySQLdb as mysql +from MySQLdb.cursors import DictCursor + +from dejavu.database import Database + + +class SQLDatabase(Database): + """ + Queries: + + 1) Find duplicates (shouldn't be any, though): + + select `hash`, `song_id`, `offset`, count(*) cnt + from fingerprints + group by `hash`, `song_id`, `offset` + having cnt > 1 + order by cnt asc; + + 2) Get number of hashes by song: + + select song_id, song_name, count(song_id) as num + from fingerprints + natural join songs + group by song_id + order by count(song_id) desc; + + 3) get hashes with highest number of collisions + + select + hash, + count(distinct song_id) as n + from fingerprints + group by `hash` + order by n DESC; + + => 26 different songs with same fingerprint (392 times): + + select songs.song_name, fingerprints.offset + from fingerprints natural join songs + where fingerprints.hash = "08d3c833b71c60a7b620322ac0c0aba7bf5a3e73"; + """ + + type = "mysql" + + # tables + FINGERPRINTS_TABLENAME = "fingerprints" + SONGS_TABLENAME = "songs" + + # fields + FIELD_HASH = "hash" + FIELD_SONG_ID = "song_id" + FIELD_OFFSET = "offset" + FIELD_SONGNAME = "song_name" + FIELD_FINGERPRINTED = "fingerprinted" + + # creates + CREATE_FINGERPRINTS_TABLE = """ + CREATE TABLE IF NOT EXISTS `%s` ( + `%s` binary(10) not null, + `%s` mediumint unsigned not null, + `%s` int unsigned not null, + PRIMARY KEY(%s), + UNIQUE(%s, %s, %s), + FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE CASCADE + ) ENGINE=INNODB;""" % ( + FINGERPRINTS_TABLENAME, FIELD_HASH, + FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH, + FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH, + FIELD_SONG_ID, SONGS_TABLENAME, FIELD_SONG_ID + ) + + CREATE_SONGS_TABLE = """ + CREATE TABLE IF NOT EXISTS `%s` ( + `%s` mediumint unsigned not null auto_increment, + `%s` varchar(250) not null, + `%s` tinyint default 0, + PRIMARY KEY (`%s`), + UNIQUE KEY `%s` (`%s`) + ) ENGINE=INNODB;""" % ( + SONGS_TABLENAME, FIELD_SONG_ID, FIELD_SONGNAME, FIELD_FINGERPRINTED, + FIELD_SONG_ID, FIELD_SONG_ID, FIELD_SONG_ID, + ) + + # inserts (ignores duplicates) + INSERT_FINGERPRINT = """ + INSERT IGNORE INTO %s (%s, %s, %s) values + (UNHEX(%%s), %%s, %%s); + """ % (FINGERPRINTS_TABLENAME, FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET) + + INSERT_SONG = "INSERT INTO %s (%s) values (%%s);" % ( + SONGS_TABLENAME, FIELD_SONGNAME) + + # selects + SELECT = """ + SELECT %s, %s FROM %s WHERE %s = UNHEX(%%s); + """ % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME, FIELD_HASH) + + SELECT_MULTIPLE = """ + SELECT HEX(%s), %s, %s FROM %s WHERE %s IN (%%s); + """ % (FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET, + FINGERPRINTS_TABLENAME, FIELD_HASH) + + SELECT_ALL = """ + SELECT %s, %s FROM %s; + """ % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME) + + SELECT_SONG = """ + SELECT %s FROM %s WHERE %s = %%s + """ % (FIELD_SONGNAME, SONGS_TABLENAME, FIELD_SONG_ID) + + SELECT_NUM_FINGERPRINTS = """ + SELECT COUNT(*) as n FROM %s + """ % (FINGERPRINTS_TABLENAME) + + SELECT_UNIQUE_SONG_IDS = """ + SELECT COUNT(DISTINCT %s) as n FROM %s WHERE %s = 1; + """ % (FIELD_SONG_ID, SONGS_TABLENAME, FIELD_FINGERPRINTED) + + SELECT_SONGS = """ + SELECT %s, %s FROM %s WHERE %s = 1; + """ % (FIELD_SONG_ID, FIELD_SONGNAME, SONGS_TABLENAME, FIELD_FINGERPRINTED) + + # drops + DROP_FINGERPRINTS = "DROP TABLE IF EXISTS %s;" % FINGERPRINTS_TABLENAME + DROP_SONGS = "DROP TABLE IF EXISTS %s;" % SONGS_TABLENAME + + # update + UPDATE_SONG_FINGERPRINTED = """ + UPDATE %s SET %s = 1 WHERE %s = %%s + """ % (SONGS_TABLENAME, FIELD_FINGERPRINTED, FIELD_SONG_ID) + + # delete + DELETE_UNFINGERPRINTED = """ + DELETE FROM %s WHERE %s = 0; + """ % (SONGS_TABLENAME, FIELD_FINGERPRINTED) + + def __init__(self, **options): + super(SQLDatabase, self).__init__() + self.cursor = cursor_factory(**options) + self._options = options + + def after_fork(self): + # Clear the cursor cache, we don't want any stale connections from + # the previous process. + Cursor.clear_cache() + + def setup(self): + """ + Creates any non-existing tables required for dejavu to function. + + This also removes all songs that have been added but have no + fingerprints associated with them. + """ + with self.cursor() as cur: + cur.execute(self.CREATE_SONGS_TABLE) + cur.execute(self.CREATE_FINGERPRINTS_TABLE) + cur.execute(self.DELETE_UNFINGERPRINTED) + + def empty(self): + """ + Drops tables created by dejavu and then creates them again + by calling `SQLDatabase.setup`. + + .. warning: + This will result in a loss of data + """ + with self.cursor() as cur: + cur.execute(self.DROP_FINGERPRINTS) + cur.execute(self.DROP_SONGS) + + self.setup() + + def delete_unfingerprinted_songs(self): + """ + Removes all songs that have no fingerprints associated with them. + """ + with self.cursor() as cur: + cur.execute(self.DELETE_UNFINGERPRINTED) + + def get_num_songs(self): + """ + Returns number of songs the database has fingerprinted. + """ + with self.cursor() as cur: + cur.execute(self.SELECT_UNIQUE_SONG_IDS) + + for count, in cur: + return count + return 0 + + def get_num_fingerprints(self): + """ + Returns number of fingerprints the database has fingerprinted. + """ + with self.cursor() as cur: + cur.execute(self.SELECT_NUM_FINGERPRINTS) + + for count, in cur: + return count + return 0 + + def set_song_fingerprinted(self, sid): + """ + Set the fingerprinted flag to TRUE (1) once a song has been completely + fingerprinted in the database. + """ + with self.cursor() as cur: + cur.execute(self.UPDATE_SONG_FINGERPRINTED, (sid,)) + + def get_songs(self): + """ + Return songs that have the fingerprinted flag set TRUE (1). + """ + with self.cursor(cursor_type=DictCursor) as cur: + cur.execute(self.SELECT_SONGS) + for row in cur: + yield row + + def get_song_by_id(self, sid): + """ + Returns song by its ID. + """ + with self.cursor(cursor_type=DictCursor) as cur: + cur.execute(self.SELECT_SONG, (sid,)) + return cur.fetchone() + + def insert(self, hash, sid, offset): + """ + Insert a (sha1, song_id, offset) row into database. + """ + with self.cursor() as cur: + cur.execute(self.INSERT_FINGERPRINT, (hash, sid, offset)) + + def insert_song(self, songname): + """ + Inserts song in the database and returns the ID of the inserted record. + """ + with self.cursor() as cur: + cur.execute(self.INSERT_SONG, (songname,)) + return cur.lastrowid + + def query(self, hash): + """ + Return all tuples associated with hash. + + If hash is None, returns all entries in the + database (be careful with that one!). + """ + # select all if no key + query = self.SELECT_ALL if hash is None else self.SELECT + + with self.cursor() as cur: + cur.execute(query) + for sid, offset in cur: + yield (sid, offset) + + def get_iterable_kv_pairs(self): + """ + Returns all tuples in database. + """ + return self.query(None) + + def insert_hashes(self, sid, hashes): + """ + Insert series of hash => song_id, offset + values into the database. + """ + values = [] + for hash, offset in hashes: + values.append((hash, sid, offset)) + + with self.cursor() as cur: + for split_values in grouper(values, 1000): + cur.executemany(self.INSERT_FINGERPRINT, split_values) + + def return_matches(self, hashes): + """ + Return the (song_id, offset_diff) tuples associated with + a list of (sha1, sample_offset) values. + """ + # Create a dictionary of hash => offset pairs for later lookups + mapper = {} + for hash, offset in hashes: + mapper[hash.upper()] = offset + + # Get an iteratable of all the hashes we need + values = mapper.keys() + + with self.cursor() as cur: + for split_values in grouper(values, 1000): + # Create our IN part of the query + query = self.SELECT_MULTIPLE + query = query % ', '.join(['UNHEX(%s)'] * len(split_values)) + + cur.execute(query, split_values) + + for hash, sid, offset in cur: + # (sid, db_offset - song_sampled_offset) + yield (sid, offset - mapper[hash]) + + +def grouper(iterable, n, fillvalue=None): + args = [iter(iterable)] * n + return izip_longest(fillvalue=fillvalue, *args) + + +def cursor_factory(**factory_options): + def cursor(**options): + options.update(factory_options) + return Cursor(**options) + return cursor + + +class Cursor(object): + """ + Establishes a connection to the database and returns an open cursor. + + + ```python + # Use as context manager + with Cursor() as cur: + cur.execute(query) + ``` + """ + _cache = Queue.Queue(maxsize=5) + + def __init__(self, cursor_type=mysql.cursors.Cursor, **options): + super(Cursor, self).__init__() + + try: + conn = self._cache.get_nowait() + except Queue.Empty: + conn = mysql.connect(**options) + else: + # Ping the connection before using it from the cache. + conn.ping(True) + + self.conn = conn + self.conn.autocommit(False) + self.cursor_type = cursor_type + + @classmethod + def clear_cache(cls): + cls._cache = Queue.Queue(maxsize=5) + + def __enter__(self): + self.cursor = self.conn.cursor(self.cursor_type) + return self.cursor + + def __exit__(self, extype, exvalue, traceback): + # if we had a MySQL related error we try to rollback the cursor. + if extype is mysql.MySQLError: + self.cursor.rollback() + + self.cursor.close() + self.conn.commit() + + # Put it back on the queue + try: + self._cache.put_nowait(self.conn) + except Queue.Full: + self.conn.close()