Cleaned up database.py

- Moved SQLDatabase to a SQL specific file
- Database class is now an abstract base class
- Cursor moved into SQL specific file
- Allowed for multi-database support in the future
This commit is contained in:
Wessie 2013-12-20 18:16:35 +01:00
parent ec823f56e4
commit f276efdf32
4 changed files with 472 additions and 306 deletions

View file

@ -1,4 +1,4 @@
from dejavu.database import SQLDatabase
from dejavu.database import get_database
import dejavu.decoder as decoder
import fingerprint
from multiprocessing import Process, cpu_count
@ -13,8 +13,9 @@ class Dejavu(object):
self.config = config
# initialize db
self.db = SQLDatabase(**config.get("database", {}))
db_cls = get_database(config.get("database_type", None))
self.db = db_cls(**config.get("database", {}))
self.db.setup()
# get songs previously indexed

View file

@ -1,59 +0,0 @@
from __future__ import unicode_literals
from __future__ import absolute_import
import Queue
import MySQLdb as mysql
import MySQLdb.cursors
def cursor_factory(**factory_options):
def cursor(**options):
options.update(factory_options)
return Cursor(**options)
return cursor
class Cursor(object):
"""
Establishes a connection to the database and returns an open cursor.
```python
# Use as context manager
with Cursor() as cur:
cur.execute(query)
```
"""
_cache = Queue.Queue(maxsize=5)
def __init__(self, cursor_type=mysql.cursors.Cursor, **options):
super(Cursor, self).__init__()
try:
conn = self._cache.get_nowait()
except Queue.Empty:
conn = mysql.connect(**options)
else:
# Ping the connection before using it from the cache.
conn.ping(True)
self.conn = conn
self.cursor_type = cursor_type
def __enter__(self):
self.cursor = self.conn.cursor(self.cursor_type)
return self.cursor
def __exit__(self, extype, exvalue, traceback):
# if we had a MySQL related error we try to rollback the cursor.
if extype is mysql.MySQLError:
self.cursor.rollback()
self.cursor.close()
self.conn.commit()
# Put it back on the queue
try:
self._cache.put_nowait(self.conn)
except Queue.Full:
self.conn.close()

View file

@ -1,308 +1,166 @@
from __future__ import absolute_import
from itertools import izip_longest
from dejavu.cursor import cursor_factory
from MySQLdb.cursors import DictCursor
import abc
class Database(object):
__metaclass__ = abc.ABCMeta
# Name of your Database subclass, this is used in configuration
# to refer to your class
type = None
def __init__(self):
super(Database, self).__init__()
class SQLDatabase(Database):
def before_fork(self):
"""
Queries:
1) Find duplicates (shouldn't be any, though):
select `hash`, `song_id`, `offset`, count(*) cnt
from fingerprints
group by `hash`, `song_id`, `offset`
having cnt > 1
order by cnt asc;
2) Get number of hashes by song:
select song_id, song_name, count(song_id) as num
from fingerprints
natural join songs
group by song_id
order by count(song_id) desc;
3) get hashes with highest number of collisions
select
hash,
count(distinct song_id) as n
from fingerprints
group by `hash`
order by n DESC;
=> 26 different songs with same fingerprint (392 times):
select songs.song_name, fingerprints.offset
from fingerprints natural join songs
where fingerprints.hash = "08d3c833b71c60a7b620322ac0c0aba7bf5a3e73";
Called before the database instance is given to the new process
"""
pass
# config keys
CONNECTION = "connection"
KEY_USERNAME = "username"
KEY_DATABASE = "database"
KEY_PASSWORD = "password"
KEY_HOSTNAME = "hostname"
def after_fork(self):
"""
Called after the database instance has been given to the new process
# tables
FINGERPRINTS_TABLENAME = "fingerprints"
SONGS_TABLENAME = "songs"
# fields
FIELD_HASH = "hash"
FIELD_SONG_ID = "song_id"
FIELD_OFFSET = "offset"
FIELD_SONGNAME = "song_name"
FIELD_FINGERPRINTED = "fingerprinted"
# creates
CREATE_FINGERPRINTS_TABLE = """
CREATE TABLE IF NOT EXISTS `%s` (
`%s` binary(10) not null,
`%s` mediumint unsigned not null,
`%s` int unsigned not null,
PRIMARY KEY(%s),
UNIQUE(%s, %s, %s),
FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE CASCADE
) ENGINE=INNODB;""" % (
FINGERPRINTS_TABLENAME, FIELD_HASH,
FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH,
FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH,
FIELD_SONG_ID, SONGS_TABLENAME, FIELD_SONG_ID
)
CREATE_SONGS_TABLE = """
CREATE TABLE IF NOT EXISTS `%s` (
`%s` mediumint unsigned not null auto_increment,
`%s` varchar(250) not null,
`%s` tinyint default 0,
PRIMARY KEY (`%s`),
UNIQUE KEY `%s` (`%s`)
) ENGINE=INNODB;""" % (
SONGS_TABLENAME, FIELD_SONG_ID, FIELD_SONGNAME, FIELD_FINGERPRINTED,
FIELD_SONG_ID, FIELD_SONG_ID, FIELD_SONG_ID,
)
# inserts (ignores duplicates)
INSERT_FINGERPRINT = """
INSERT IGNORE INTO %s (%s, %s, %s) VALUES
(UNHEX(%%s), %%s, %%s);
""" % (FINGERPRINTS_TABLENAME, FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET)
INSERT_SONG = "INSERT INTO %s (%s) VALUES (%%s);" % (
SONGS_TABLENAME, FIELD_SONGNAME)
# selects
SELECT = """
SELECT %s, %s FROM %s WHERE %s = UNHEX(%%s);
""" % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME, FIELD_HASH)
SELECT_MULTIPLE = """
SELECT HEX(%s), %s, %s FROM %s WHERE %s IN (%%s);
""" % (FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET,
FINGERPRINTS_TABLENAME, FIELD_HASH)
SELECT_ALL = """
SELECT %s, %s FROM %s;
""" % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME)
SELECT_SONG = """
SELECT %s FROM %s WHERE %s = %%s
""" % (FIELD_SONGNAME, SONGS_TABLENAME, FIELD_SONG_ID)
SELECT_NUM_FINGERPRINTS = """
SELECT COUNT(*) as n FROM %s
""" % (FINGERPRINTS_TABLENAME)
SELECT_UNIQUE_SONG_IDS = """
SELECT COUNT(DISTINCT %s) as n FROM %s WHERE %s = 1;
""" % (FIELD_SONG_ID, SONGS_TABLENAME, FIELD_FINGERPRINTED)
SELECT_SONGS = """
SELECT %s, %s FROM %s WHERE %s = 1;
""" % (FIELD_SONG_ID, FIELD_SONGNAME, SONGS_TABLENAME, FIELD_FINGERPRINTED)
# drops
DROP_FINGERPRINTS = "DROP TABLE IF EXISTS %s;" % FINGERPRINTS_TABLENAME
DROP_SONGS = "DROP TABLE IF EXISTS %s;" % SONGS_TABLENAME
# update
UPDATE_SONG_FINGERPRINTED = """
UPDATE %s SET %s = 1 WHERE %s = %%s
""" % (SONGS_TABLENAME, FIELD_FINGERPRINTED, FIELD_SONG_ID)
# delete
DELETE_UNFINGERPRINTED = """
DELETE FROM %s WHERE %s = 0;
""" % (SONGS_TABLENAME, FIELD_FINGERPRINTED)
def __init__(self, **options):
super(SQLDatabase, self).__init__()
self.cursor = cursor_factory(**options)
This will be called in the new process.
"""
pass
def setup(self):
"""
Creates any non-existing tables required for dejavu to function.
This also removes all songs that have been added but have no
fingerprints associated with them.
Called on creation or shortly afterwards.
"""
with self.cursor() as cur:
cur.execute(self.CREATE_SONGS_TABLE)
cur.execute(self.CREATE_FINGERPRINTS_TABLE)
cur.execute(self.DELETE_UNFINGERPRINTED)
pass
@abc.abstractmethod
def empty(self):
"""
Drops tables created by dejavu and then creates them again
by calling `SQLDatabase.setup`.
.. warning:
This will result in a loss of data
Called when the database should be cleared of all data.
"""
with self.cursor() as cur:
cur.execute(self.DROP_FINGERPRINTS)
cur.execute(self.DROP_SONGS)
self.setup()
pass
@abc.abstractmethod
def delete_unfingerprinted_songs(self):
"""
Removes all songs that have no fingerprints associated with them.
Called to remove any song entries that do not have any fingerprints
associated with them.
"""
with self.cursor() as cur:
cur.execute(self.DELETE_UNFINGERPRINTED)
pass
@abc.abstractmethod
def get_num_songs(self):
"""
Returns number of songs the database has fingerprinted.
Returns the amount of songs in the database.
"""
with self.cursor() as cur:
cur.execute(self.SELECT_UNIQUE_SONG_IDS)
for count, in cur:
return count
return 0
pass
@abc.abstractmethod
def get_num_fingerprints(self):
"""
Returns number of fingerprints the database has fingerprinted.
Returns the number of fingerprints in the database.
"""
with self.cursor() as cur:
cur.execute(self.SELECT_NUM_FINGERPRINTS)
for count, in cur:
return count
return 0
pass
@abc.abstractmethod
def set_song_fingerprinted(self, sid):
"""
Set the fingerprinted flag to TRUE (1) once a song has been completely
fingerprinted in the database.
"""
with self.cursor() as cur:
cur.execute(self.UPDATE_SONG_FINGERPRINTED, (sid,))
Sets a specific song as having all fingerprints in the database.
sid: Song identifier
"""
pass
@abc.abstractmethod
def get_songs(self):
"""
Return songs that have the fingerprinted flag set TRUE (1).
Returns all fully fingerprinted songs in the database.
"""
with self.cursor(cursor_type=DictCursor) as cur:
cur.execute(self.SELECT_SONGS)
for row in cur:
yield row
pass
@abc.abstractmethod
def get_song_by_id(self, sid):
"""
Returns song by its ID.
"""
with self.cursor(cursor_type=DictCursor) as cur:
cur.execute(self.SELECT_SONG, (sid,))
return cur.fetchone()
Return a song by its identifier
sid: Song identifier
"""
pass
@abc.abstractmethod
def insert(self, hash, sid, offset):
"""
Insert a (sha1, song_id, offset) row into database.
"""
with self.cursor() as cur:
cur.execute(self.INSERT_FINGERPRINT, (hash, sid, offset))
Inserts a single fingerprint into the database.
def insert_song(self, songname):
hash: Part of a sha1 hash, in hexadecimal format
sid: Song identifier this fingerprint is off
offset: The offset this hash is from
"""
Inserts song in the database and returns the ID of the inserted record.
"""
with self.cursor() as cur:
cur.execute(self.INSERT_SONG, (songname,))
return cur.lastrowid
pass
@abc.abstractmethod
def insert_song(self, song_name):
"""
Inserts a song name into the database, returns the new
identifier of the song.
song_name: The name of the song.
"""
pass
@abc.abstractmethod
def query(self, hash):
"""
Return all tuples associated with hash.
Returns all matching fingerprint entries associated with
the given hash as parameter.
If hash is None, returns all entries in the
database (be careful with that one!).
hash: Part of a sha1 hash, in hexadecimal format
"""
# select all if no key
query = self.SELECT_ALL if hash is None else self.SELECT
with self.cursor() as cur:
cur.execute(query)
for sid, offset in cur:
yield (sid, offset)
pass
@abc.abstractmethod
def get_iterable_kv_pairs(self):
"""
Returns all tuples in database.
Returns all fingerprints in the database.
"""
return self.query(None)
pass
@abc.abstractmethod
def insert_hashes(self, sid, hashes):
"""
Insert series of hash => song_id, offset
values into the database.
Insert a multitude of fingerprints.
sid: Song identifier the fingerprints belong to
hashes: A sequence of tuples in the format (hash, offset)
- hash: Part of a sha1 hash, in hexadecimal format
- offset: Offset this hash was created from/at.
"""
values = []
for hash, offset in hashes:
values.append((hash, sid, offset))
with self.cursor() as cur:
cur.executemany(self.INSERT_FINGERPRINT, values)
pass
@abc.abstractmethod
def return_matches(self, hashes):
"""
Return the (song_id, offset_diff) tuples associated with
a list of (sha1, sample_offset) values.
Searches the database for pairs of (hash, offset) values.
hashes: A sequence of tuples in the format (hash, offset)
- hash: Part of a sha1 hash, in hexadecimal format
- offset: Offset this hash was created from/at.
Returns a sequence of (sid, offset_difference) tuples.
sid: Song identifier
offset_difference: (offset - database_offset)
"""
# Create a dictionary of hash => offset pairs for later lookups
mapper = {}
for hash, offset in hashes:
mapper[hash.upper()] = offset
# Get an iteratable of all the hashes we need
values = mapper.keys()
with self.cursor() as cur:
for split_values in grouper(values, 1000):
# Create our IN part of the query
query = self.SELECT_MULTIPLE
query = query % ', '.join(['UNHEX(%s)'] * len(split_values))
cur.execute(query, split_values)
for hash, sid, offset in cur:
# (sid, db_offset - song_sampled_offset)
yield (sid, offset - mapper[hash])
pass
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
def get_database(database_type=None):
# Default to using the mysql database
database_type = database_type or "mysql"
# Lower all the input.
database_type = database_type.lower()
for db_cls in Database.__subclasses__():
if db_cls.type == database_type:
return db_cls
raise TypeError("Unsupported database type supplied.")

366
dejavu/database_sql.py Normal file
View file

@ -0,0 +1,366 @@
from __future__ import absolute_import
from itertools import izip_longest
import Queue
import MySQLdb as mysql
from MySQLdb.cursors import DictCursor
from dejavu.database import Database
class SQLDatabase(Database):
"""
Queries:
1) Find duplicates (shouldn't be any, though):
select `hash`, `song_id`, `offset`, count(*) cnt
from fingerprints
group by `hash`, `song_id`, `offset`
having cnt > 1
order by cnt asc;
2) Get number of hashes by song:
select song_id, song_name, count(song_id) as num
from fingerprints
natural join songs
group by song_id
order by count(song_id) desc;
3) get hashes with highest number of collisions
select
hash,
count(distinct song_id) as n
from fingerprints
group by `hash`
order by n DESC;
=> 26 different songs with same fingerprint (392 times):
select songs.song_name, fingerprints.offset
from fingerprints natural join songs
where fingerprints.hash = "08d3c833b71c60a7b620322ac0c0aba7bf5a3e73";
"""
type = "mysql"
# tables
FINGERPRINTS_TABLENAME = "fingerprints"
SONGS_TABLENAME = "songs"
# fields
FIELD_HASH = "hash"
FIELD_SONG_ID = "song_id"
FIELD_OFFSET = "offset"
FIELD_SONGNAME = "song_name"
FIELD_FINGERPRINTED = "fingerprinted"
# creates
CREATE_FINGERPRINTS_TABLE = """
CREATE TABLE IF NOT EXISTS `%s` (
`%s` binary(10) not null,
`%s` mediumint unsigned not null,
`%s` int unsigned not null,
PRIMARY KEY(%s),
UNIQUE(%s, %s, %s),
FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE CASCADE
) ENGINE=INNODB;""" % (
FINGERPRINTS_TABLENAME, FIELD_HASH,
FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH,
FIELD_SONG_ID, FIELD_OFFSET, FIELD_HASH,
FIELD_SONG_ID, SONGS_TABLENAME, FIELD_SONG_ID
)
CREATE_SONGS_TABLE = """
CREATE TABLE IF NOT EXISTS `%s` (
`%s` mediumint unsigned not null auto_increment,
`%s` varchar(250) not null,
`%s` tinyint default 0,
PRIMARY KEY (`%s`),
UNIQUE KEY `%s` (`%s`)
) ENGINE=INNODB;""" % (
SONGS_TABLENAME, FIELD_SONG_ID, FIELD_SONGNAME, FIELD_FINGERPRINTED,
FIELD_SONG_ID, FIELD_SONG_ID, FIELD_SONG_ID,
)
# inserts (ignores duplicates)
INSERT_FINGERPRINT = """
INSERT IGNORE INTO %s (%s, %s, %s) values
(UNHEX(%%s), %%s, %%s);
""" % (FINGERPRINTS_TABLENAME, FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET)
INSERT_SONG = "INSERT INTO %s (%s) values (%%s);" % (
SONGS_TABLENAME, FIELD_SONGNAME)
# selects
SELECT = """
SELECT %s, %s FROM %s WHERE %s = UNHEX(%%s);
""" % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME, FIELD_HASH)
SELECT_MULTIPLE = """
SELECT HEX(%s), %s, %s FROM %s WHERE %s IN (%%s);
""" % (FIELD_HASH, FIELD_SONG_ID, FIELD_OFFSET,
FINGERPRINTS_TABLENAME, FIELD_HASH)
SELECT_ALL = """
SELECT %s, %s FROM %s;
""" % (FIELD_SONG_ID, FIELD_OFFSET, FINGERPRINTS_TABLENAME)
SELECT_SONG = """
SELECT %s FROM %s WHERE %s = %%s
""" % (FIELD_SONGNAME, SONGS_TABLENAME, FIELD_SONG_ID)
SELECT_NUM_FINGERPRINTS = """
SELECT COUNT(*) as n FROM %s
""" % (FINGERPRINTS_TABLENAME)
SELECT_UNIQUE_SONG_IDS = """
SELECT COUNT(DISTINCT %s) as n FROM %s WHERE %s = 1;
""" % (FIELD_SONG_ID, SONGS_TABLENAME, FIELD_FINGERPRINTED)
SELECT_SONGS = """
SELECT %s, %s FROM %s WHERE %s = 1;
""" % (FIELD_SONG_ID, FIELD_SONGNAME, SONGS_TABLENAME, FIELD_FINGERPRINTED)
# drops
DROP_FINGERPRINTS = "DROP TABLE IF EXISTS %s;" % FINGERPRINTS_TABLENAME
DROP_SONGS = "DROP TABLE IF EXISTS %s;" % SONGS_TABLENAME
# update
UPDATE_SONG_FINGERPRINTED = """
UPDATE %s SET %s = 1 WHERE %s = %%s
""" % (SONGS_TABLENAME, FIELD_FINGERPRINTED, FIELD_SONG_ID)
# delete
DELETE_UNFINGERPRINTED = """
DELETE FROM %s WHERE %s = 0;
""" % (SONGS_TABLENAME, FIELD_FINGERPRINTED)
def __init__(self, **options):
super(SQLDatabase, self).__init__()
self.cursor = cursor_factory(**options)
self._options = options
def after_fork(self):
# Clear the cursor cache, we don't want any stale connections from
# the previous process.
Cursor.clear_cache()
def setup(self):
"""
Creates any non-existing tables required for dejavu to function.
This also removes all songs that have been added but have no
fingerprints associated with them.
"""
with self.cursor() as cur:
cur.execute(self.CREATE_SONGS_TABLE)
cur.execute(self.CREATE_FINGERPRINTS_TABLE)
cur.execute(self.DELETE_UNFINGERPRINTED)
def empty(self):
"""
Drops tables created by dejavu and then creates them again
by calling `SQLDatabase.setup`.
.. warning:
This will result in a loss of data
"""
with self.cursor() as cur:
cur.execute(self.DROP_FINGERPRINTS)
cur.execute(self.DROP_SONGS)
self.setup()
def delete_unfingerprinted_songs(self):
"""
Removes all songs that have no fingerprints associated with them.
"""
with self.cursor() as cur:
cur.execute(self.DELETE_UNFINGERPRINTED)
def get_num_songs(self):
"""
Returns number of songs the database has fingerprinted.
"""
with self.cursor() as cur:
cur.execute(self.SELECT_UNIQUE_SONG_IDS)
for count, in cur:
return count
return 0
def get_num_fingerprints(self):
"""
Returns number of fingerprints the database has fingerprinted.
"""
with self.cursor() as cur:
cur.execute(self.SELECT_NUM_FINGERPRINTS)
for count, in cur:
return count
return 0
def set_song_fingerprinted(self, sid):
"""
Set the fingerprinted flag to TRUE (1) once a song has been completely
fingerprinted in the database.
"""
with self.cursor() as cur:
cur.execute(self.UPDATE_SONG_FINGERPRINTED, (sid,))
def get_songs(self):
"""
Return songs that have the fingerprinted flag set TRUE (1).
"""
with self.cursor(cursor_type=DictCursor) as cur:
cur.execute(self.SELECT_SONGS)
for row in cur:
yield row
def get_song_by_id(self, sid):
"""
Returns song by its ID.
"""
with self.cursor(cursor_type=DictCursor) as cur:
cur.execute(self.SELECT_SONG, (sid,))
return cur.fetchone()
def insert(self, hash, sid, offset):
"""
Insert a (sha1, song_id, offset) row into database.
"""
with self.cursor() as cur:
cur.execute(self.INSERT_FINGERPRINT, (hash, sid, offset))
def insert_song(self, songname):
"""
Inserts song in the database and returns the ID of the inserted record.
"""
with self.cursor() as cur:
cur.execute(self.INSERT_SONG, (songname,))
return cur.lastrowid
def query(self, hash):
"""
Return all tuples associated with hash.
If hash is None, returns all entries in the
database (be careful with that one!).
"""
# select all if no key
query = self.SELECT_ALL if hash is None else self.SELECT
with self.cursor() as cur:
cur.execute(query)
for sid, offset in cur:
yield (sid, offset)
def get_iterable_kv_pairs(self):
"""
Returns all tuples in database.
"""
return self.query(None)
def insert_hashes(self, sid, hashes):
"""
Insert series of hash => song_id, offset
values into the database.
"""
values = []
for hash, offset in hashes:
values.append((hash, sid, offset))
with self.cursor() as cur:
for split_values in grouper(values, 1000):
cur.executemany(self.INSERT_FINGERPRINT, split_values)
def return_matches(self, hashes):
"""
Return the (song_id, offset_diff) tuples associated with
a list of (sha1, sample_offset) values.
"""
# Create a dictionary of hash => offset pairs for later lookups
mapper = {}
for hash, offset in hashes:
mapper[hash.upper()] = offset
# Get an iteratable of all the hashes we need
values = mapper.keys()
with self.cursor() as cur:
for split_values in grouper(values, 1000):
# Create our IN part of the query
query = self.SELECT_MULTIPLE
query = query % ', '.join(['UNHEX(%s)'] * len(split_values))
cur.execute(query, split_values)
for hash, sid, offset in cur:
# (sid, db_offset - song_sampled_offset)
yield (sid, offset - mapper[hash])
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
def cursor_factory(**factory_options):
def cursor(**options):
options.update(factory_options)
return Cursor(**options)
return cursor
class Cursor(object):
"""
Establishes a connection to the database and returns an open cursor.
```python
# Use as context manager
with Cursor() as cur:
cur.execute(query)
```
"""
_cache = Queue.Queue(maxsize=5)
def __init__(self, cursor_type=mysql.cursors.Cursor, **options):
super(Cursor, self).__init__()
try:
conn = self._cache.get_nowait()
except Queue.Empty:
conn = mysql.connect(**options)
else:
# Ping the connection before using it from the cache.
conn.ping(True)
self.conn = conn
self.conn.autocommit(False)
self.cursor_type = cursor_type
@classmethod
def clear_cache(cls):
cls._cache = Queue.Queue(maxsize=5)
def __enter__(self):
self.cursor = self.conn.cursor(self.cursor_type)
return self.cursor
def __exit__(self, extype, exvalue, traceback):
# if we had a MySQL related error we try to rollback the cursor.
if extype is mysql.MySQLError:
self.cursor.rollback()
self.cursor.close()
self.conn.commit()
# Put it back on the queue
try:
self._cache.put_nowait(self.conn)
except Queue.Full:
self.conn.close()