forked from AlmaLinux/mirrors
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmirrors_update.py
executable file
·356 lines (328 loc) · 11.3 KB
/
mirrors_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
#!/usr/bin/env python3
import logging
import os
import dateparser
import shutil
import socket
from collections import defaultdict
from pathlib import Path
from typing import Dict, AnyStr, List, Union
from geoip import geolite2, IPInfo, open_database
import requests
import yaml
REQUIRED_MIRROR_PROTOCOLS = (
'https',
'http',
)
ALL_MIRROR_PROTOCOLS = (
*REQUIRED_MIRROR_PROTOCOLS,
'rsync',
)
DEFAULT_ARCH = 'x86_64'
# set User-Agent for python-requests
HEADERS = {
'User-Agent': 'libdnf (AlmaLinux 8.3; generic; Linux.x86_64)'
}
# the list of mirrors which should be always available
WHITELIST_MIRRORS = (
'repo.almalinux.org',
)
GEOPIP_DB = 'geoip_db.mmdb'
logging.basicConfig(level=logging.INFO)
def get_config(path_to_config: AnyStr = 'config.yml') -> Dict:
"""
Read, parse and return mirrorlist config
"""
with open(path_to_config, mode='r') as config_file:
return yaml.safe_load(config_file)
def mirror_available(
mirror_info: Dict[AnyStr, Union[Dict, AnyStr]],
versions: List[AnyStr],
repos: List[Dict[AnyStr, Union[Dict, AnyStr]]],
) -> bool:
"""
Check mirror availability
:param mirror_info: the dictionary which contains info about a mirror
(name, address, update frequency, sponsor info, email)
:param versions: the list of versions which should be provided by a mirror
:param repos: the list of repos which should be provided by a mirror
"""
logging.info('Checking mirror "%s"...', mirror_info['name'])
try:
addresses = mirror_info['address'] # type: Dict[AnyStr, AnyStr]
mirror_url = next(iter([
address for protocol_type, address in addresses.items()
if protocol_type in REQUIRED_MIRROR_PROTOCOLS
]))
except StopIteration:
logging.error(
'Mirror "%s" has no one address with protocols "%s"',
mirror_info['name'],
REQUIRED_MIRROR_PROTOCOLS,
)
return False
for version in versions:
for repo_info in repos:
repo_path = repo_info['path'].replace('$basearch', DEFAULT_ARCH)
check_url = os.path.join(
mirror_url,
str(version),
repo_path,
'repodata/repomd.xml',
)
try:
request = requests.get(check_url, headers=HEADERS)
request.raise_for_status()
except requests.RequestException:
logging.warning(
'Mirror "%s" is not available for version '
'"%s" and repo path "%s"',
mirror_info['name'],
version,
repo_path,
)
return False
logging.info(
'Mirror "%s" is available',
mirror_info['name']
)
return True
def set_repo_status(
mirror_info: Dict[AnyStr, Union[Dict, AnyStr]],
allowed_outdate: AnyStr
) -> None:
"""
Return status of a mirror
:param mirror_info: info about a mirror
:param allowed_outdate: allowed mirror lag
:return: Status of a mirror: expired or ok
"""
addresses = mirror_info['address']
mirror_url = next(iter([
address for protocol_type, address in addresses.items()
if protocol_type in REQUIRED_MIRROR_PROTOCOLS
]))
timestamp_url = os.path.join(
mirror_url,
'TIME',
)
try:
request = requests.get(
url=timestamp_url,
headers=HEADERS,
)
request.raise_for_status()
except requests.RequestException:
logging.error(
'Mirror "%s" has no timestamp file by url "%s"',
mirror_info['name'],
timestamp_url,
)
mirror_info['status'] = 'expired'
return
try:
mirror_should_updated_at = dateparser.parse(
f'now-{allowed_outdate} UTC'
).timestamp()
mirror_last_updated = float(request.content)
if mirror_last_updated > mirror_should_updated_at:
mirror_info['status'] = 'ok'
else:
mirror_info['status'] = 'expired'
return
except AttributeError:
mirror_info['status'] = 'expired'
return
def get_verified_mirrors(
mirrors_dir: AnyStr,
versions: List[AnyStr],
repos: List[Dict[AnyStr, Union[Dict, AnyStr]]],
allowed_outdate: AnyStr
) -> List[Dict[AnyStr, Union[Dict, AnyStr]]]:
"""
Loop through the list of mirrors and return only available
and not expired mirrors
:param mirrors_dir: path to the directory which contains
config files of mirrors
:param versions: the list of versions which should be provided by mirrors
:param repos: the list of repos which should be provided by mirrors
:param allowed_outdate: allowed mirror lag
"""
result = []
for config_path in Path(mirrors_dir).rglob('*.yml'):
with open(str(config_path), 'r') as config_file:
mirror_info = yaml.safe_load(config_file)
if 'name' not in mirror_info:
logging.error(
'Mirror file "%s" doesn\'t have name of the mirror',
config_path,
)
continue
if 'address' not in mirror_info:
logging.error(
'Mirror file "%s" doesn\'t have addresses of the mirror',
config_path,
)
continue
if mirror_info['name'] in WHITELIST_MIRRORS:
mirror_info['status'] = 'ok'
result.append(mirror_info)
continue
if mirror_available(
mirror_info=mirror_info,
versions=versions,
repos=repos,
):
set_repo_status(mirror_info, allowed_outdate)
result.append(mirror_info)
return result
def write_mirrors_to_mirrorslists(
verified_mirrors: List[Dict[AnyStr, Union[Dict, AnyStr]]],
versions: List[AnyStr],
repos: List[Dict[AnyStr, Union[Dict, AnyStr]]],
mirrorlist_dir: AnyStr,
) -> None:
"""
Generate the following folder structure:
mirrorlist -> <version1> -> <reponame1_mirrorlist>
-> <reponame2_mirrorlist>
-> <version2> -> <reponame1_mirrorlist>
:param verified_mirrors: List of verified and not expired mirrors
:param versions: the list of versions which should be provided by mirrors
:param repos: the list of repos which should be provided by mirrors
:param mirrorlist_dir: the directory which contains mirrorlist files
per an each version
"""
for mirror_info in verified_mirrors:
if mirror_info['status'] != 'ok':
logging.warning(
'Mirror "%s" is expired and isn\'t added to mirrorlist',
mirror_info['name']
)
continue
addresses = mirror_info['address']
for version in versions:
version_dir = os.path.join(
mirrorlist_dir,
str(version),
)
os.makedirs(version_dir, exist_ok=True)
for repo_info in repos:
mirror_url = next(iter([
address for protocol_type, address in addresses.items()
if protocol_type in REQUIRED_MIRROR_PROTOCOLS
]))
full_mirror_path = os.path.join(
mirror_url,
str(version),
repo_info['path'],
)
mirrorlist_path = os.path.join(
version_dir,
repo_info['name'],
)
with open(mirrorlist_path, 'a') as mirrorlist_file:
mirrorlist_file.write(f'{full_mirror_path}\n')
def set_mirror_country(
mirror_info: Dict[AnyStr, Union[Dict, AnyStr]],
) -> None:
"""
Set country by IP of a mirror
:param mirror_info: Dict with info about a mirror
"""
mirror_name = mirror_info['name']
ip = socket.gethostbyname(mirror_name)
db = open_database(GEOPIP_DB)
match = db.lookup(ip) # type: IPInfo
logging.info('Set country for mirror "%s"', mirror_name)
if match is None:
mirror_info['country'] = 'Unknown'
else:
country = match.get_info_dict()['country']['names']['en']
mirror_info['country'] = country
def generate_mirrors_table(
mirrors_table_path: AnyStr,
verified_mirrors: List[Dict[AnyStr, Union[Dict, AnyStr]]],
) -> None:
"""
Generates mirrors table from list verified mirrors
:param mirrors_table_path: path to file with mirrors table
:param verified_mirrors: list of verified mirrors
"""
columns_names = (
'Name',
'Sponsor',
'Status',
'Country',
*(
protocol.upper() for protocol in ALL_MIRROR_PROTOCOLS
),
)
header_separator = f"| {' | '.join(['---'] * len(columns_names))} |"
table_header = f"| {' | '.join(columns_names)} |\n{header_separator}"
address_prefixes = defaultdict(lambda: 'Link')
address_prefixes.update({
'https': 'Mirror',
'http': 'Mirror',
'rsync': 'Link',
})
with open(mirrors_table_path, 'a') as mirrors_table_file:
logging.info('Generate mirrors table')
mirrors_table_file.write(f'{table_header}\n')
for mirror_info in verified_mirrors:
logging.info(
'Adding mirror "%s" to mirrors table',
mirror_info['name']
)
addresses = mirror_info['address']
for protocol in ALL_MIRROR_PROTOCOLS:
if protocol in addresses:
link = f'[{address_prefixes[protocol]}]' \
f'({addresses[protocol].strip("/")})'
else:
link = ''
mirror_info[f'{protocol}_link'] = link
set_mirror_country(mirror_info)
table_row = '|'.join((
mirror_info['name'],
f"[{mirror_info['sponsor']}]({mirror_info['sponsor_url']})",
mirror_info['status'],
mirror_info['country'],
*(
mirror_info[f'{protocol}_link'] for protocol
in ALL_MIRROR_PROTOCOLS
),
))
mirrors_table_file.write(f'{table_row}\n')
def main():
config = get_config()
versions = config['version']
repos = config['repos']
mirrors_table_path = config['mirrors_table']
shutil.rmtree(
config['mirrorlist_dir'],
ignore_errors=True,
)
verified_mirrors = get_verified_mirrors(
mirrors_dir=config['mirrors_dir'],
versions=versions,
repos=repos,
allowed_outdate=config['allowed_outdate']
)
if not verified_mirrors:
logging.error('No available and not expired mirrors found')
exit(1)
write_mirrors_to_mirrorslists(
verified_mirrors=verified_mirrors,
versions=versions,
repos=repos,
mirrorlist_dir=config['mirrorlist_dir'],
)
if os.path.exists(mirrors_table_path):
os.remove(mirrors_table_path)
generate_mirrors_table(
mirrors_table_path=mirrors_table_path,
verified_mirrors=verified_mirrors,
)
if __name__ == '__main__':
main()