-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathremote.py
93 lines (74 loc) · 2.93 KB
/
remote.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import ipaddress
import os
import re
import yaml
from paramiko import SSHClient, AutoAddPolicy
configFilename = "deploy_config.yaml"
def try_read_config_file():
if os.path.isfile(configFilename):
with open(configFilename, "r") as stream:
try:
return yaml.safe_load(stream)
except:
return None
def write_config_file(config):
with open(configFilename, 'w', encoding='utf8') as outfile:
yaml.dump(config, outfile, default_flow_style=False, allow_unicode=True)
def get_destination_from_config(config, machineName):
if machineName is None and 'default_machine' in config:
machineName = config['default_machine']
machine = config['machines'].get(machineName)
if machine is not None:
username = machine['username']
address = machine['address']
port = machine['port']
return username, address, port
else:
raise Exception(f'Machine {machineName} was not found')
def try_parse_destination_string(destination):
regexResult = re.findall(r'(.+)@([a-zA-Z0-9.]+)\:?([0-9]+)?', destination)
if len(regexResult) != 1:
raise Exception(f"Invalid format for destination argument. Expected user@address[:port], got {destination}")
destinationElements = regexResult[0]
username = destinationElements[0]
address = destinationElements[1]
try:
port = int(destinationElements[2])
except:
port = 22
return username, address, port
def try_retrieve_destination(destination, machineName):
config = try_read_config_file()
if destination is not None:
try:
return try_parse_destination_string(destination)
except Exception as e:
raise e
elif config is not None:
try:
return get_destination_from_config(config, machineName)
except Exception as e:
raise e
def open_ssh(address, port, username, autoCopyKey = False):
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
try:
ssh.connect(address, port=port, username=username, key_filename=f'{os.path.expanduser("~")}/.ssh/id_rsa')
return ssh
except:
if not autoCopyKey: return None
keyDir = f'{os.path.expanduser("~")}/.ssh'
if not os.path.exists(f"{keyDir}/id_rsa"): os.system(f"ssh-keygen -m PEM -N '' -f {keyDir}/id_rsa")
os.system(f"cat {keyDir}/id_rsa.pub | ssh -p {port} {username}@{address} 'cat >> .ssh/authorized_keys'")
try:
ssh.connect(address, port=port, username=username, key_filename=f'{os.path.expanduser("~")}/.ssh/id_rsa')
return ssh
except:
return None
def close_ssh(ssh):
ssh.close()
def exec_command_and_print_output(ssh, command):
ssh_stdout = ssh.exec_command(command, get_pty=True)[1]
for line in iter(ssh_stdout.readline, ""):
print(line, end="")