diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_events.py b/edx/analytics/tasks/warehouse/load_internal_reporting_events.py index df53039153..ed47db2086 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_events.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_events.py @@ -10,6 +10,7 @@ import datetime import json import logging +import random import re from importlib import import_module @@ -42,6 +43,8 @@ EVENT_TABLE_NAME = 'event_records' +NUM_SUBSETS = 10 + # Define pattern to extract a course_id from a string by looking # explicitly for a version string and two plus-delimiters. # TODO: avoid this hack by finding a more opaque-keys-respectful way. @@ -454,6 +457,7 @@ def __init__(self, *args, **kwargs): self.record_class = getattr(local_module, self.event_record_type) if not self.record_class: raise ValueError("No event record class found: {}".format(self.event_record_type)) + random.seed(self.event_record_type) def get_event_record_class(self): return self.record_class @@ -515,6 +519,8 @@ def multi_output_reducer(self, _key, values, output_file): """ Write values to the appropriate file as determined by the key. """ + bytes_written = 0 + num_values_written = 0 for value in values: # Assume that the value is a dict containing the relevant sparse data, # either raw or encoded in a json string. @@ -527,7 +533,14 @@ def multi_output_reducer(self, _key, values, output_file): output_file.write('\n') # WARNING: This line ensures that Hadoop knows that our process is not sitting in an infinite loop. # Do not remove it. - self.incr_counter(self.counter_category_name, 'Raw Bytes Written', len(value) + 1) + bytes_written = bytes_written + len(value) + 1 + num_values_written = num_values_written + 1 + if num_values_written % 100 == 0: + self.incr_counter(self.counter_category_name, 'Raw Bytes Written', bytes_written) + bytes_written = 0 + num_values_written = 0 + if bytes_written > 0: + self.incr_counter(self.counter_category_name, 'Raw Bytes Written', bytes_written) def output_path_for_key(self, key): """ @@ -535,15 +548,15 @@ def output_path_for_key(self, key): Mix them together by date, but identify with different files for each project/environment. - Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}.tsv + Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}_{subset}.tsv """ - date_received, project = key + date_received, project, subset = key return url_path_join( self.output_root, EVENT_TABLE_NAME, 'dt={date}'.format(date=date_received), - '{project}.tsv'.format(project=project), + '{project}_{subset}.tsv'.format(project=project, subset=subset), ) def extra_modules(self): @@ -920,7 +933,10 @@ def mapper(self, line): self.add_event_info(event_dict, event_mapping, event) record = self.get_event_record_class()(**event_dict) - key = (date_received, project_name) + + # spread data across multiple keys arbitrarily. + subset = "{subset:02d}".format(subset=random.randint(0, NUM_SUBSETS)) + key = (date_received, project_name, subset) self.incr_counter(self.counter_category_name, 'Output From Mapper', 1) @@ -1282,7 +1298,9 @@ def mapper(self, line): self.add_calculated_event_entry(event_dict, 'org_id', org_id) record = self.get_event_record_class()(**event_dict) - key = (date_received, project_name) + # spread data across multiple keys arbitrarily. + subset = "{subset:02d}".format(subset=random.randint(0, NUM_SUBSETS)) + key = (date_received, project_name, subset) self.incr_counter(self.counter_category_name, 'Output From Mapper', 1) @@ -1351,14 +1369,14 @@ def output_path_for_key(self, key): """ Output based on project. - Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}.tsv, + Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}_{subset}.tsv, but output_root is assumed to be set externally to {warehouse_path}/event_records/dt={CCYY-MM-DD}. """ - _date_received, project = key + _date_received, project, subset = key return url_path_join( self.output_root, - '{project}.tsv'.format(project=project), + '{project}_{subset}.tsv'.format(project=project, subset=subset), )