forked from sroze/kafka-tunnel
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathInstance.py
80 lines (67 loc) · 2.91 KB
/
Instance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import boto3
import socket
class Instance:
def __init__(self, name, ip, port):
self.name = name
self.ip = ip
self.port = port
class RetrieveInstanceIPs():
def getIps(self,service,port):
raise NotImplementedError("Subclass must implement abstract method")
class ManualInstances(RetrieveInstanceIPs):
def getIps(self,service,ips,port):
instances=[]
for ip in ips.split(','):
instances.append(Instance(name=service,ip=ip,port=port))
return instances
class AWSEC2Instances(RetrieveInstanceIPs):
def __init__(self,profile,region):
self.profile = profile
self.region = region
self.session = boto3.Session(profile_name=profile, region_name=region)
def getIps(self,service,port):
instances=[]
for ec2_ip in self.req_aws_ips(service, self.region):
instances.append(Instance(name=service,ip=ec2_ip,port=port))
return instances
def req_aws_ips(self,service, region):
ips=[]
aws_filter = lambda name,value: [{'Name':'tag:'+name,'Values':[value]}]
client = self.session.client('ec2')
response = client.describe_instances(Filters=aws_filter('Name',service))
for res in response.get('Reservations'):
for instance in res.get('Instances'):
ip = instance.get(u'PrivateIpAddress')
if ip is not None:
ips.append(instance.get(u'PrivateIpAddress'))
return ips
class AWSMSKInstances:
def __init__(self, region):
self.region = region
self.session = boto3.Session(region_name=region)
def set_aws_profile(self, profile):
self.session = boto3.Session(profile_name=profile, region_name=self.session.region_name)
def get_instances(self, cluster_arn):
client = self.session.client('kafka')
cluster_info = client.describe_cluster(ClusterArn=cluster_arn)['ClusterInfo']
brokers_info = client.get_bootstrap_brokers(ClusterArn=cluster_arn)
hosts_as_string = ""
if 'BootstrapBrokerString' in brokers_info:
hosts_as_string = brokers_info['BootstrapBrokerString']
if 'BootstrapBrokerStringSaslScram' in brokers_info:
hosts_as_string = brokers_info['BootstrapBrokerStringSaslScram']
hosts = hosts_as_string.split(',')
host_as_string = hosts[0].split('.')
host_as_string.pop(0)
connection_hosts = []
i = 1
while i <= cluster_info['NumberOfBrokerNodes']:
connection_hosts.append("b-" + str(i)+"."+".".join(host_as_string))
i += 1
hosts_as_string = ",".join(connection_hosts) + "," + cluster_info['ZookeeperConnectString']
instances = []
for row in hosts_as_string.split(','):
[host, port] = row.split(':')
ip = socket.gethostbyname(host)
instances.append(Instance(host, ip, port))
return instances