Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Add partitioning for JSON output. #814

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions edx/analytics/tasks/warehouse/load_internal_reporting_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datetime
import json
import logging
import random
import re
from importlib import import_module

Expand Down Expand Up @@ -42,6 +43,8 @@

EVENT_TABLE_NAME = 'event_records'

NUM_SUBSETS = 10

# Define pattern to extract a course_id from a string by looking
# explicitly for a version string and two plus-delimiters.
# TODO: avoid this hack by finding a more opaque-keys-respectful way.
Expand Down Expand Up @@ -454,6 +457,7 @@ def __init__(self, *args, **kwargs):
self.record_class = getattr(local_module, self.event_record_type)
if not self.record_class:
raise ValueError("No event record class found: {}".format(self.event_record_type))
random.seed(self.event_record_type)

def get_event_record_class(self):
return self.record_class
Expand Down Expand Up @@ -515,6 +519,8 @@ def multi_output_reducer(self, _key, values, output_file):
"""
Write values to the appropriate file as determined by the key.
"""
bytes_written = 0
num_values_written = 0
for value in values:
# Assume that the value is a dict containing the relevant sparse data,
# either raw or encoded in a json string.
Expand All @@ -527,23 +533,30 @@ def multi_output_reducer(self, _key, values, output_file):
output_file.write('\n')
# WARNING: This line ensures that Hadoop knows that our process is not sitting in an infinite loop.
# Do not remove it.
self.incr_counter(self.counter_category_name, 'Raw Bytes Written', len(value) + 1)
bytes_written = bytes_written + len(value) + 1
num_values_written = num_values_written + 1
if num_values_written % 100 == 0:
self.incr_counter(self.counter_category_name, 'Raw Bytes Written', bytes_written)
bytes_written = 0
num_values_written = 0
if bytes_written > 0:
self.incr_counter(self.counter_category_name, 'Raw Bytes Written', bytes_written)

def output_path_for_key(self, key):
"""
Output based on date and project.

Mix them together by date, but identify with different files for each project/environment.

Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}.tsv
Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}_{subset}.tsv
"""
date_received, project = key
date_received, project, subset = key

return url_path_join(
self.output_root,
EVENT_TABLE_NAME,
'dt={date}'.format(date=date_received),
'{project}.tsv'.format(project=project),
'{project}_{subset}.tsv'.format(project=project, subset=subset),
)

def extra_modules(self):
Expand Down Expand Up @@ -920,7 +933,10 @@ def mapper(self, line):
self.add_event_info(event_dict, event_mapping, event)

record = self.get_event_record_class()(**event_dict)
key = (date_received, project_name)

# spread data across multiple keys arbitrarily.
subset = "{subset:02d}".format(subset=random.randint(0, NUM_SUBSETS))
key = (date_received, project_name, subset)

self.incr_counter(self.counter_category_name, 'Output From Mapper', 1)

Expand Down Expand Up @@ -1282,7 +1298,9 @@ def mapper(self, line):
self.add_calculated_event_entry(event_dict, 'org_id', org_id)

record = self.get_event_record_class()(**event_dict)
key = (date_received, project_name)
# spread data across multiple keys arbitrarily.
subset = "{subset:02d}".format(subset=random.randint(0, NUM_SUBSETS))
key = (date_received, project_name, subset)

self.incr_counter(self.counter_category_name, 'Output From Mapper', 1)

Expand Down Expand Up @@ -1351,14 +1369,14 @@ def output_path_for_key(self, key):
"""
Output based on project.

Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}.tsv,
Output is in the form {warehouse_path}/event_records/dt={CCYY-MM-DD}/{project}_{subset}.tsv,
but output_root is assumed to be set externally to {warehouse_path}/event_records/dt={CCYY-MM-DD}.
"""
_date_received, project = key
_date_received, project, subset = key

return url_path_join(
self.output_root,
'{project}.tsv'.format(project=project),
'{project}_{subset}.tsv'.format(project=project, subset=subset),
)


Expand Down