-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaptly_steps.py
167 lines (128 loc) · 4.91 KB
/
aptly_steps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from twisted.internet import defer
from twisted.internet import reactor
from buildbot import config
from buildbot.process.buildstep import FAILURE
from buildbot.process.buildstep import SUCCESS
from buildbot.process.buildstep import BuildStep
from buildbot.plugins import *
import json
# use the 'requests' lib: http://python-requests.org
try:
import txrequests
import requests
except ImportError:
txrequests = None
# This step uses a global Session object, which encapsulates a thread pool as
# well as state such as cookies and authentication. This state may pose
# problems for users, where one step may get a cookie that is subsequently used
# by another step in a different build.
_session = None
def getSession():
global _session
if _session is None:
_session = txrequests.Session()
reactor.addSystemEventTrigger("before", "shutdown", closeSession)
return _session
def setSession(session):
global _session
_session = session
def closeSession():
global _session
if _session is not None:
_session.close()
_session = None
class AptlyUpdatePublishStep(steps.HTTPStep):
'''
PUT an empty dict to /api/publish/ENDPOINT
'''
def __init__(self, aptly_base_url, auth, endpoint, **kwargs):
super().__init__(
aptly_base_url + '/api/publish/' + endpoint,
auth=auth, method='PUT', json={},
**kwargs
)
class AptlyCopyPackageStep(BuildStep):
name = 'AptlyCopyPackageStep'
description = 'Copying Package'
descriptionDone = 'Copied Package'
renderables = ["aptly_base_url", "from_repo", "dest_repo", "package_query"]
session = None
def __init__(self, aptly_base_url, auth, from_repo, dest_repo, package_query, **kwargs):
if txrequests is None:
config.error(
"Need to install txrequest to use this step:\n\n pip3 install txrequests")
self.aptly_base_url = aptly_base_url
self.from_repo = from_repo
self.dest_repo = dest_repo
self.package_query = package_query
self.auth = auth
super().__init__(**kwargs)
def start(self):
d = self.doCopy()
d.addErrback(self.failed)
@defer.inlineCallbacks
def doRequest(self, request_kwargs):
log = self.getLog('log')
log.addHeader('Performing %s request to %s\n' %
(request_kwargs['method'], request_kwargs['url']))
if 'params' in request_kwargs:
log.addHeader('Parameters:\n')
for k, v in request_kwargs['params'].items():
log.addHeader('\t%s: %s\n' % (k, v))
try:
r = yield self.session.request(**request_kwargs)
except requests.exceptions.ConnectionError as e:
log.addStderr(
'An exception occurred while performing the request: %s' % e)
self.finished(FAILURE)
return
if r.history:
log.addStdout('\nRedirected %d times:\n\n' % len(r.history))
for rr in r.history:
self.log_response(rr)
log.addStdout('=' * 60 + '\n')
self.log_response(r)
return r
@defer.inlineCallbacks
def doCopy(self):
# create a new session if it doesn't exist
self.session = getSession()
log = self.addLog('log')
search_kwargs = {
'method': 'GET',
'url': self.aptly_base_url + '/api/repos/' + self.from_repo + '/packages',
'auth': self.auth,
'params': {'q' : self.package_query}
}
r_search = yield self.doRequest(search_kwargs)
if (r_search.status_code >= 400):
self.finished(FAILURE)
log.finish()
copy_kwargs = {
'method': 'POST',
'url': self.aptly_base_url + '/api/repos/' + self.dest_repo + '/packages',
'auth': self.auth,
'json': {'PackageRefs' : r_search.json()}
}
r_copy = yield self.doRequest(copy_kwargs)
if (r_search.status_code >= 400):
self.finished(FAILURE)
log.finish()
self.finished(SUCCESS)
def log_response(self, response):
log = self.getLog('log')
log.addHeader('Request Header:\n')
for k, v in response.request.headers.items():
if k == 'Authorization':
v = '[not logging secrets]'
log.addHeader('\t%s: %s\n' % (k, v))
log.addStdout('URL: %s\n' % response.url)
if response.status_code == requests.codes.ok:
log.addStdout('Status: %s\n' % response.status_code)
else:
log.addStderr('Status: %s\n' % response.status_code)
log.addHeader('Response Header:\n')
for k, v in response.headers.items():
log.addHeader('\t%s: %s\n' % (k, v))
log.addStdout(' ------ Content ------\n%s' % response.text)
self.addLog('content').addStdout(response.text)