-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathdbPhashApi.py
213 lines (162 loc) · 6.88 KB
/
dbPhashApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/python
# -*- coding: utf-8 -*-
import dbApi
import time
import collections
import gc
import pyximport
pyximport.install()
import deduplicator.cyHamDb as hamDb
import traceback
import server.decorators
@server.decorators.Singleton
class TreeProxy(hamDb.BkHammingTree):
pass
'''
The idea here is that we have a child-class of DbApi that makes it /look/ like we can query PostgreSQL
for phash-related things, and which simply intercepts all phash search related calls, and looks them up
itself, in it's internal data-structures. Any calls that modify or update the phash data-sets should be
replayed onto the internal BK tree.
The Tree is loaded from the database when the class is first instantiated, and then maintained as an
in-memory database for the life of the program. The BK-tree is a singleton, which should be thread-safe, so
it's being shared across multiple connections should not be an issue.
The BK-tree being a singleton is done for memory reasons. With ~12M tree items, the BKtree requires about 5 GB
of memory, and takes about 45 seconds to load from posrgresql. As such, having a tree-per-connection would
be prohibitive for memory reasons, and each connection would be extremely slow to start.
Because any changes to the database are committed to Postgres immediately, and then the corresponding
update is simply performed on the BK tree, the BK tree should always be fully up to date with the contents
of the postgres database, so it /shouldn't/ need to be reloaded periodically or anything (we'll see).
The tree reload facility is mostly intended for refreshing the tree when the db has been changed by
external tools, such as the hash scanner.
'''
def hammingDistance(v1, v2):
return hamDb.hamming_dist(v1, v2)
class PhashDbApi(dbApi.DbApi):
# Read in phash/dbId values in chunks of 50K rows
streamChunkSize = 50000
def __init__(self, noglobal=False):
super().__init__()
# if noglobal:
# self.tree = hamDb.BkHammingTree()
# else:
# self.tree = TreeProxy.Instance()
# assert self.tree is not None
# Only load the tree if it's empty
# with self.tree.updateLock.writer_context():
# self.doLoad(silent=True)
def forceReload(self):
pass
# with self.tree.writer_context():
# self.log.warning("Forcing a reload of the tree from the database!")
# self.log.warning("Dropping Tree")
# self.tree.dropTree()
# # Flush the dropped tree out of memory
# collected = gc.collect()
# self.log.info("GC collected %s items.", collected)
# self.log.warning("Tree Dropped. Rebuilding")
# self.unlocked_doLoad(silent=False)
# self.log.warning("Tree Rebuilt")
def unlocked_doLoad(self, silent=False):
pass
# print("DoLoad: ", self.tree, self.tree.nodes, self.tree.root)
# assert self.tree.root is not None
# if self.tree.nodes > 0:
# if not silent:
# self.log.error("Tree already built (%s nodes). Reloading will have no effect!", self.tree.nodes)
# raise ValueError
# return
# cur = self.getStreamingCursor(wantCols=['dbId', 'pHash'], where=(self.table.phash != None))
# loaded = 0
# rows = cur.fetchmany(self.streamChunkSize)
# while rows:
# for dbId, pHash in rows:
# if pHash != None:
# self.tree.unlocked_insert(pHash, dbId)
# loaded += len(rows)
# self.log.info("Loaded %s phash data sets.", loaded)
# rows = cur.fetchmany(self.streamChunkSize)
# cur.close()
# def insertIntoDb(self, *args, **kwargs):
# super().insertIntoDb(*args, **kwargs)
# if 'commit' in kwargs:
# kwargs.pop("commit")
# dbId, itemHash = self.getItem(wantCols=['dbId', 'pHash'], **kwargs)
# # "0" is a valid hash value, so we have to explicitly check for none,
# # rather then allowing type coercion
# if itemHash != None:
# with self.tree.writer_context():
# self.tree.insert(itemHash, dbId)
# def updateDbEntry(self, *args, **kwargs):
# super().updateDbEntry(*args, **kwargs)
# if 'commit' in kwargs:
# kwargs.pop("commit")
# # reinsert every item that would be changed
# # Probably unnecessary.
# ret = self.getItems(wantCols=['dbId', 'pHash'], **kwargs)
# # "0" is a valid hash value, so we have to explicitly check for none,
# # rather then allowing type coercion
# if any([item for item in ret if item[1] != None]):
# with self.tree.writer_context():
# for dbId, itemHash in [item for item in ret if item[1] != None]:
# self.tree.insert(itemHash, dbId)
# def deleteDbRows(self, *args, **kwargs):
# if kwargs:
# ret = self.getItems(wantCols=['dbId', 'pHash'], **kwargs)
# super().deleteDbRows(*args, **kwargs)
# with self.tree.writer_context():
# # If kwargs is not defined, deleteDbRows will error, so we don't
# # care about the additional error of trying to iterate over
# # the (then undefined) ret, since it won't happen.
# for dbId, itemHash in [item for item in ret if item[1]]:
# try:
# self.tree.remove(itemHash, dbId)
# except KeyError:
# self.log.critical("Failure when deleting node?")
# for line in traceback.format_exc().split("\n"):
# self.log.critical(line)
# self.log.critical("Ignoring error")
def searchPhashSet(self, phash_list, distance):
ret = {}
# with self.tree.reader_context():
for phash in phash_list:
ret[phash] = self.getWithinDistance(phash, distance)
return ret
def getRandomPhashRows(self, sample_ratio=0.0005):
with self.transaction() as cur:
cur.execute("SELECT dbid, phash FROM {table} TABLESAMPLE SYSTEM(%s);".format(table=self.tableName), (sample_ratio, ))
ret = cur.fetchall()
ret = set([tmp for tmp in ret if tmp[1]])
return ret
def getWithinDistance(self, inPhash, distance=2):
with self.transaction() as cur:
start = time.time()
cur.execute("SELECT dbid FROM {table} WHERE phash <@ (%s, %s);".format(table=self.tableName), (inPhash, distance))
ret = cur.fetchall()
stop = time.time()
ret = set([tmp[0] for tmp in ret])
self.log.info("Search for '%s', distance '%s'. Discovered %s match(es) in %s seconds", inPhash, distance, len(ret), stop - start)
if len(ret) > 500:
self.log.warning("Lots of return values!")
cur.execute('''
INSERT INTO high_incidence_hashes (phash, match_count, distance)
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
''', (inPhash, len(ret), distance))
return ret
def getIdsWithinDistance(self, inPhash, distance=2):
# with self.tree.reader_context():
ids = self.getWithinDistance(inPhash, distance)
return set(ids)
def getWithinDistance_tree(self, inPhash, distance=2, wantCols=None):
ids = self.getIdsWithinDistance(inPhash, distance)
ret = []
for itemId in ids:
itemRow = self.getItem(dbId=itemId, wantCols=wantCols)
# Sometimes a row has been deleted without being removed from the tree.
# If this has happened, getItem() will return an empty list.
# Don't return that, if it happens
if not itemRow:
self.log.info("Row deleted without updating tree")
else:
ret.append(itemRow)
return ret