-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathparser.py
155 lines (126 loc) · 5.02 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
""" Feed parser """
from asyncio import get_event_loop, set_event_loop_policy
from config import (DATABASE_NAME, MONGO_SERVER, REDIS_NAMESPACE, get_profile,
log)
from datetime import datetime
from hashlib import sha1
from time import mktime
from traceback import format_exc
from bs4 import BeautifulSoup
from common import connect_redis, dequeue, enqueue, safe_id, publish
from feedparser import parse as feed_parse
from gensim import corpora, models
from langdetect import detect
from langkit import extract_keywords, tokenize
from motor.motor_asyncio import AsyncIOMotorClient
from uvloop import EventLoopPolicy
def get_entry_content(entry):
"""Select the best content from an entry"""
candidates = entry.get('content', [])
if 'summary_detail' in entry:
candidates.append(entry.summary_detail)
for candidate in candidates:
if hasattr(candidate, 'type'): # speedparser doesn't set this
if 'html' in candidate.type:
return candidate.value
if candidates:
try:
return candidates[0].value
except AttributeError: # speedparser does this differently
return candidates[0]['value']
return ''
def get_entry_date(entry):
"""Select the best timestamp for an entry"""
for header in ['modified', 'issued', 'created']:
when = entry.get(header+'_parsed', None)
if when:
return datetime.fromtimestamp(mktime(when))
return datetime.now()
def get_entry_id(entry):
"""Get a useful id from a feed entry"""
if 'id' in entry and entry.id:
if isinstance(entry.id, dict):
return entry.id.values()[0]
return entry.id
content = get_entry_content(entry)
if content:
return sha1(content.encode('utf-8')).hexdigest()
if 'link' in entry:
return entry.link
if 'title' in entry:
return sha1(entry.title.encode('utf-8')).hexdigest()
def get_plaintext(html):
"""Scrub out tags and extract plaintext"""
soup = BeautifulSoup(html)
for script in soup(["script", "style"]):
script.extract()
return soup.get_text()
def lda(tokens):
# Perform Latent Dirchelet Allocation
dictionary = corpora.Dictionary(tokens)
corpus = [dictionary.doc2bow(token) for token in tokens]
lda_model = gensim.models.ldamodel.LdaModel(corpus, num_topics=3, id2word=dictionary, passes=20)
return lda_model
async def parse(database, feed, redis):
"""Parse a feed into its constituent entries"""
result = feed_parse(feed['raw'])
if not len(result.entries):
log.info('%s: No valid entries', feed['_id'])
return
else:
log.info('%s: %d entries', feed['_id'], len(result.entries))
# TODO: turn this into a bulk insert
for entry in result.entries:
log.debug(entry.link)
when = get_entry_date(entry)
body = get_entry_content(entry)
plaintext = entry.title + " " + get_plaintext(body)
lang = detect(plaintext)
try:
keywords = extract_keywords(plaintext, lang, scores=True)[:10]
tokens = list(set(tokenize(plaintext, lang)))
except (KeyError, TypeError):
keywords = None
tokens = None
await publish(redis, 'ui', {'event':'new_entry', 'url':entry.link})
await database.entries.update_one({'_id': safe_id(entry.link)},
{'$set': {"date": when,
"title": entry.title,
"body": body,
"plaintext": plaintext,
"lang": lang,
"keywords": keywords,
"tokens": tokens,
"url": entry.link}},
upsert=True)
async def item_handler(database):
"""Break down feeds into individual items"""
redis = await connect_redis()
log.info("Beginning run.")
while True:
try:
job = await(dequeue(redis, 'parser'))
log.debug(job)
feed = await database.feeds.find_one({'_id': job['_id']})
if feed:
await parse(database, feed, redis)
except Exception:
log.error(format_exc())
except KeyboardInterrupt:
break
await redis.hset(REDIS_NAMESPACE + 'status', 'item_count', await database.items.count())
redis.close()
await redis.wait_closed()
def main():
"""Main loop"""
set_event_loop_policy(EventLoopPolicy())
conn = AsyncIOMotorClient(MONGO_SERVER)
database = conn[DATABASE_NAME]
loop = get_event_loop()
try:
loop.run_until_complete(item_handler(database))
finally:
loop.close()
if __name__ == '__main__':
main()