-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathbasedb.py
113 lines (94 loc) · 4.18 KB
/
basedb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
# Author: Binux<[email protected]>
# http://binux.me
# Created on 2012-08-30 17:43:49
import logging
logger = logging.getLogger()
class BaseDB:
'''
BaseDB
dbcur should be overwirte
'''
@property
def dbcur(self):
raise Exception("NOT IMPLEMENTED")
def _execute(self, sql_query, values=[]):
return self.dbcur.execute(sql_query, values)
def _select(self, tablename, what="*", where="", offset=0, limit=None):
sql_query = "SELECT %s FROM %s" % (what, tablename)
if where: sql_query += " WHERE %s" % where
if limit: sql_query += " LIMIT %d, %d" % (offset, limit)
logger.debug("<sql: %s>" % sql_query)
return self._execute(sql_query).fetchall()
def _select2dic(self, tablename, what="*", where="", offset=0, limit=None):
sql_query = "SELECT %s FROM %s" % (what, tablename)
if where: sql_query += " WHERE %s" % where
if limit: sql_query += " LIMIT %d, %d" % (offset, limit)
logger.debug("<sql: %s>" % sql_query)
dbcur = self._execute(sql_query)
fields = [f[0] for f in dbcur.description]
if limit:
return [dict(zip(fields, row)) for row in dbcur.fetchall()]
else:
return (dict(zip(fields, row)) for row in dbcur.fetchall())
def _replace(self, tablename, **values):
if values:
_keys = ", ".join(("`%s`" % k for k in values.iterkeys()))
_values = ", ".join(["?", ] * len(values))
sql_query = "REPLACE INTO `%s` (%s) VALUES (%s)" % (tablename, _keys, _values)
else:
sql_query = "REPLACE INTO %s DEFAULT VALUES" % tablename
logger.debug("<sql: %s>" % sql_query)
if values:
dbcur = self._execute(sql_query, values.values())
else:
dbcur = self._execute(sql_query)
return dbcur.lastrowid
def _insert(self, tablename, **values):
if values:
_keys = ", ".join(("`%s`" % k for k in values.iterkeys()))
_values = ", ".join(["?", ] * len(values))
sql_query = "INSERT INTO `%s` (%s) VALUES (%s)" % (tablename, _keys, _values)
else:
sql_query = "INSERT INTO %s DEFAULT VALUES" % tablename
logger.debug("<sql: %s>" % sql_query)
if values:
dbcur = self._execute(sql_query, values.values())
else:
dbcur = self._execute(sql_query)
return dbcur.lastrowid
def _update(self, tablename, where, **values):
_key_values = ", ".join(["`%s` = ?" % k for k in values.iterkeys()])
sql_query = "UPDATE %s SET %s WHERE %s" % (tablename, _key_values, where)
logger.debug("<sql: %s>" % sql_query)
return self._execute(sql_query, values.values())
def _delete(self, tablename, where):
sql_query = "DELETE FROM %s" % tablename
if where: sql_query += " WHERE %s" % where
logger.debug("<sql: %s>" % sql_query)
return self._execute(sql_query)
if __name__ == "__main__":
import sqlite3
class DB(BaseDB):
__tablename__ = "test"
def __init__(self):
self.conn = sqlite3.connect(":memory:")
cursor = self.conn.cursor()
cursor.execute('''CREATE TABLE `%s` (id INTEGER PRIMARY KEY AUTOINCREMENT, name, age)'''
% self.__tablename__)
@property
def dbcur(self):
return self.conn.cursor()
db = DB()
assert db._insert(db.__tablename__, name="binux", age=23) == 1
assert db._select(db.__tablename__, "name, age").fetchone() == ("binux", 23)
assert db._select2dic(db.__tablename__, "name, age")[0]["name"] == "binux"
assert db._select2dic(db.__tablename__, "name, age")[0]["age"] == 23
db._replace(db.__tablename__, id=1, age=24)
assert db._select(db.__tablename__, "name, age").fetchone() == (None, 24)
db._update(db.__tablename__, "id = 1", age=16)
assert db._select(db.__tablename__, "name, age").fetchone() == (None, 16)
db._delete(db.__tablename__, "id = 1")
assert db._select(db.__tablename__).fetchall() == []