-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpackages.py
70 lines (62 loc) · 2.09 KB
/
packages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# -*- coding: utf-8 -*-
import pip
import sys
import socket
import multiprocessing
if sys.version_info < (3, 0):
from xmlrpclib import ServerProxy
from itertools import izip_longest as zip_longest
else:
from xmlrpc.client import ServerProxy
from itertools import zip_longest
from functools import reduce
def check_package(args):
dist, status_callback = args
pypi = ServerProxy("https://pypi.python.org/pypi")
available = None
callback = lambda status: status_callback(dist, available, status)
try:
possible_package_names = [dist.project_name,
dist.project_name.capitalize(),
dist.project_name.replace("-", "_")]
available = reduce(lambda a, b: a if a is not None else b, map(pypi.package_releases, possible_package_names))
if not available:
callback(Packages.FAIL)
elif available[0] != dist.version:
callback(Packages.UPDATED)
else:
callback(Packages.OK)
except socket.timeout:
callback(Packages.FAIL)
except KeyboardInterrupt:
return False
class Packages(object):
OK = 1
FAIL = 2
UPDATED = 4
def __init__(self, callback):
self.updated = 0
self.status_callback = callback
@classmethod
def use_multiprocessing(cls):
return sys.platform != "win32"
@classmethod
def create_counter(cls):
if cls.use_multiprocessing():
return multiprocessing.Manager().list()
else:
return list()
def check_for_updates(self):
self.updated = 0
socket.setdefaulttimeout(5.0)
dists = pip.get_installed_distributions()
map_params = (check_package, zip_longest(dists, [], fillvalue=self.status_callback))
if self.use_multiprocessing():
pypi_pool = multiprocessing.Pool()
pypi_pool_map = pypi_pool.map_async(*map_params)
try:
pypi_pool_map.get(0xFFFF)
except KeyboardInterrupt:
print("Aborted")
else:
map(*map_params)