-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyoutube.py
executable file
·169 lines (144 loc) · 5.81 KB
/
youtube.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
# if you don't import apiclient first, oath2 can't import urllib
from apiclient.discovery import build
from steve.util import (
get_from_config,
get_project_config,
save_json_files
)
import argparse
from pyconscrape import (
parse_title,
parse_speakers,
parse_speakers_and_description,
)
DRAFT = 2
class YouTubeScraper(object):
def __init__(self, cfg, max_results=50):
self.max_results = max_results
self.cfg = cfg
self.svc = build(
get_from_config(cfg, 'api_service_name', 'youtube'),
get_from_config(cfg, 'api_version', 'youtube'),
developerKey=get_from_config(cfg, 'api_key', 'youtube'),
)
def scrape_channel(self, channel_id):
video_ids = self.list_channel(channel_id)
videos = self.list_videos(video_ids)
data = self.video_results_to_steve_data(videos)
return data
def scrape_playlist(self, playlist_id):
video_ids = self.list_playlist(playlist_id)
videos = self.list_videos(video_ids)
data = self.video_results_to_steve_data(videos)
return data
def list_playlist(self, playlist_id):
options = {
'part': 'id,snippet',
'maxResults': self.max_results,
'playlistId': playlist_id,
}
response = self.svc.playlistItems().list(**options).execute()
pages = [[item['snippet']['resourceId']['videoId'] for item in response.get('items', [])]]
while 'nextPageToken' in response:
options['pageToken'] = response['nextPageToken']
print 'fetching next page {}'.format(options['pageToken'])
response = self.svc.playlistItems().list(**options).execute()
pages.append([item['snippet']['resourceId']['videoId'] for item in response.get('items', [])])
return pages
def list_channel(self, channel_id):
options = {
'channelId': channel_id,
'maxResults': self.max_results,
'part': 'id',
'type': 'video',
}
search_response = self.svc.search().list(**options).execute()
pages = [[item['id']['videoId'] for item in search_response.get('items', [])]]
while 'nextPageToken' in search_response:
options['pageToken'] = search_response['nextPageToken']
print 'fetching next page {}'.format(options['pageToken'])
search_response = self.svc.search().list(**options).execute()
pages.append([item['id']['videoId'] for item in search_response.get('items', [])])
return pages
def list_videos(self, video_pages):
videos = []
for page in video_pages:
videostr = ','.join(page)
video_response = self.svc.videos().list(
id=videostr,
part='snippet,player,status'
).execute()
videos.extend([v for v in video_response.get('items', [])])
return videos
def video_results_to_steve_data(self, video_results):
data = []
for v in video_results:
d = self.video_to_dict(v)
if 'id' not in v:
# should never happen
continue
fn = 'json/{}.json'.format(v['id'])
data.append((fn, d))
return data
def video_to_dict(self, video):
"""Converts youtube#video to a python dict
"""
snippet = video.get('snippet', {})
status = video.get('status', {})
player = video.get('player', {})
thumbnails = snippet.get('thumbnails', {})
thumbnail = thumbnails.get('high', {})
video_id = video['id']
"""
if self.cfg.has_option('project', 'language'):
language = get_from_config(self.cfg, 'language'),
else:
language = 'English'
"""
raw_title = snippet.get('title', '')
title = parse_title(raw_title)
raw_description = snippet.get('description', '')
d = parse_speakers_and_description(raw_description)
raw_speakers = d['speakers']
speakers = parse_speakers(raw_speakers)
item = {
'category': get_from_config(self.cfg, 'category'),
'title': title,
'description': d['description'],
'copyright_text': status.get('license', ''),
'recorded': snippet.get('publishedAt', '')[0:10],
'thumbnail_url': thumbnail.get('url', ''),
'embed': player.get('embedHtml', ''),
'summary': '',
'language': 'English',
'state': DRAFT,
'whiteboard': 'needs editing',
'quality_notes': '',
'slug': '',
'speakers': speakers,
'source_url': 'https://www.youtube.com/watch?v={}'.format(video_id)
}
return item
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--maxresults", help="Max results", default=50)
parser.add_argument("-c", "--channel", action='store_true', help="YouTube channel id")
parser.add_argument("-p", "--playlist", action='store_true', help="YouTube playlist id")
args = parser.parse_args()
cfg = get_project_config()
scraper = YouTubeScraper(cfg, max_results=args.maxresults)
if args.channel:
channel_id = get_from_config(cfg, 'channel_id', 'youtube')
print("scraping channel {}".format(channel_id))
data = scraper.scrape_channel(channel_id)
save_json_files(cfg, data)
elif args.playlist:
playlist_id = get_from_config(cfg, 'playlist_id', 'youtube')
print("scraping playlist {}".format(playlist_id))
data = scraper.scrape_playlist(playlist_id)
save_json_files(cfg, data)
else:
print("nothing to do. no channel or playlist requested")