Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull request update/250123 #531

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions diworker/diworker/importers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,16 @@ def _get_cloud_extras(self, info):
res['meta'][k] = val
return res

def update_cloud_account_config(self):
config = self.cloud_acc.get('config')
if not config.get('cur_version'):
cur_version = self.cloud_adapter.config.get('cur_version')
if cur_version:
config.pop('region_name', None)
config.update({'cur_version': cur_version})
self.rest_cl.cloud_account_update(
self.cloud_acc_id, {'config': config})

def create_traffic_processing_tasks(self):
self._create_traffic_processing_tasks()

Expand Down
4 changes: 4 additions & 0 deletions diworker/diworker/importers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def import_report(self):

LOG.info('Updating import time')
self.update_cloud_import_time(int(time.time()))
self.update_cloud_account_config()
LOG.info('Import completed')

LOG.info('Processing alerts')
Expand All @@ -457,6 +458,9 @@ def update_cloud_import_attempt(self, ts, error=None):
{'last_import_attempt_at': ts,
'last_import_attempt_error': error})

def update_cloud_account_config(self):
pass

@staticmethod
def extract_tags(raw_tags):
tags = {}
Expand Down
83 changes: 64 additions & 19 deletions tools/cloud_adapter/clouds/aws.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timezone
import enum
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps
import json
Expand Down Expand Up @@ -49,11 +50,11 @@
MAX_RESULTS = 1000
CSV_FORMAT_PATTERN = r'\.csv.(gz|zip)$'
PARQUET_FORMAT_PATTERN = r'\.snappy.parquet$'
GROUP_DATES_PATTERNS = [
'BILLING_PERIOD=[0-9]{4}-[0-9]{2}/',
'[0-9]{8}-[0-9]{8}/',
'year=[0-9]{4}/month=([1-9]|1[0-2])/'
]
GROUP_DATES_PATTERNS = {
2: ['BILLING_PERIOD=[0-9]{4}-[0-9]{2}/'],
1: ['[0-9]{8}-[0-9]{8}/',
'year=[0-9]{4}/month=([1-9]|1[0-2])/']
}


def _retry_on_error(exc):
Expand Down Expand Up @@ -495,25 +496,51 @@ def snapshot_chain_discovery_calls(self):
def rds_instance_discovery_calls(self):
return []

@staticmethod
def _get_network_interfaces_attachments(client, eni_ids):
result = {}
enis = client.describe_network_interfaces(
NetworkInterfaceIds=eni_ids)
for eni in enis['NetworkInterfaces']:
available = eni.get('Status') == 'available'
if not available:
instance_id = eni.get('Attachment', {}).get(
'InstanceId') or eni.get('Description')
result[eni['NetworkInterfaceId']] = instance_id
return result

def discover_region_ip_addresses(self, region):
instance_map = {}
ec2 = self.session.client('ec2', region)
described_ip_addresses = ec2.describe_addresses()
eni_ids = []
instance_ids = []
for address in described_ip_addresses['Addresses']:
eni_id = address.get('NetworkInterfaceId')
instance_id = address.get('InstanceId')
if instance_id:
instance_ids.append(instance_id)
elif eni_id:
eni_ids.append(eni_id)
if instance_ids:
described_instances = ec2.describe_instances(InstanceIds=instance_ids)
described_instances = ec2.describe_instances(
InstanceIds=instance_ids)
for reservation in described_instances['Reservations']:
for instance in reservation['Instances']:
instance_map.update({instance['InstanceId']: instance.get('State', {}).get('Name')})
instance_map.update({instance['InstanceId']: instance.get(
'State', {}).get('Name')})
eni_instance_map = self._get_network_interfaces_attachments(
ec2, eni_ids)
for address in described_ip_addresses['Addresses']:
available = True
instance_id = address.get('InstanceId')
eni_id = address.get('NetworkInterfaceId')
if instance_id:
available = instance_map.get(instance_id) in ['stopped', 'terminated']
available = instance_map.get(instance_id) == 'terminated'
elif eni_id:
instance_id = eni_instance_map.get(eni_id)
if instance_id:
available = False
ip_resource = IpAddressResource(
cloud_account_id=self.cloud_account_id,
organization_id=self.organization_id,
Expand Down Expand Up @@ -557,7 +584,8 @@ def _collect_s3_objects(self, bucket_name, prefix, report_name):
path = r['Key']
day = path.split(common_prefix)[1].split(report_name)[0]
if day:
for rgx in GROUP_DATES_PATTERNS:
for rgx in [el for el_l in GROUP_DATES_PATTERNS.values()
for el in el_l]:
if re.search(rgx, day):
day = re.sub(rgx, '', day)
break
Expand All @@ -584,25 +612,42 @@ def get_report_files(self):

if not reports:
raise ReportFilesNotFoundException(
'Report files for report {} not found in bucket {}'.format(
report_name, bucket_name))
'Report files for report {} not found in bucket {}. Please '
'check your CUR version and existence of report files in the '
'bucket'.format(report_name, bucket_name))
return reports

def find_reports_by_format(self, s3_objects, format_pattern):
reports = {}
reports = defaultdict(list)
group_dates_patterns = None
cur_version = self.config.get('cur_version')
if cur_version:
group_dates_patterns = GROUP_DATES_PATTERNS.get(cur_version)
if not group_dates_patterns:
group_dates_patterns = GROUP_DATES_PATTERNS[2] + GROUP_DATES_PATTERNS[1]

group_part = None
try:
for report in [f for f in s3_objects['Contents']
if re.search(format_pattern, f['Key'])]:
for group_part in GROUP_DATES_PATTERNS:
report_candidates = [f for f in s3_objects['Contents']
if re.search(format_pattern, f['Key'])]
for group_pattern in group_dates_patterns:
if any(re.search(group_pattern, report['Key'])
for report in report_candidates):
group_part = group_pattern
if not cur_version:
version = [k for k, v in GROUP_DATES_PATTERNS.items()
if group_part in v][0]
LOG.info('Detected CUR version: %s', version)
self.config['cur_version'] = version
break
if group_part:
for report in report_candidates:
group = re.search(group_part, report['Key'])
if group:
common_group = self._group_to_daterange(group.group(0))
if common_group not in reports:
reports[common_group] = []
reports[common_group].append(report)
break
except KeyError:
reports = {}
pass
return reports

@staticmethod
Expand Down
Loading