Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TraceBase.with_time_offset() #2155

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 114 additions & 22 deletions lisa/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2123,12 +2123,43 @@ def window(self):
return (self.start, self.end)

@abc.abstractmethod
def get_view(self, window, **kwargs):
def df_event(self, event, *args, **kwargs):
"""
Get a view on a trace cropped time-wise to fit in ``window``
Return the :class:`pandas.DataFrame` for the given ``event``.
"""
pass

@abc.abstractmethod
def get_view(self, *args, **kwargs):
"""
Get a view on a trace cropped time-wise to fit in ``window`` and with
event dataframes post processed with ``process_df``.

:Variable arguments: Forwarded to the contructor of the view.
"""

@abc.abstractmethod
def _clear_cache(self):
pass

:Variable keyword arguments: Forwarded to the contructor of the view.
def with_time_offset(self, offset):
"""
Get a view on the trace with time shifted by ``offset``.

This can be convenient when trying to align mutliple traces coming from
repetition of the same experiment.

.. note:: Some analysis functions may not handle well negative
timestamps, so it might be a good idea to slice the trace to only
contain positive timestamps after this operation.
"""

def time_offset(event, df):
df = df.copy(deep=False)
df.index = df.index + offset
return df

return self.get_view(process_df=time_offset)

def __getitem__(self, window):
if not isinstance(window, slice):
Expand Down Expand Up @@ -2207,8 +2238,13 @@ class TraceView(Loggable, TraceBase):
anymore, apart as a data server for the view.
:type clear_base_cache: bool

:param window: The time window to base this view on
:type window: tuple(float, float)
:param window: The time window to base this view on. If ``None``, the whole
base trace will be selected.
:type window: tuple(float, float) or None

:param process_df: Function used to post process the event dataframes
returned by :meth:`TraceBase.df_event`.
:type process_df: typing.Callable[[str, pandas.DataFrame], pandas.DataFrame] or None

:Attributes:
* ``base_trace``: The original :class`:`Trace` this view is based on.
Expand Down Expand Up @@ -2241,22 +2277,35 @@ class TraceView(Loggable, TraceBase):
mimics a regular :class:`Trace` using :func:`getattr`.
"""

def __init__(self, trace, window, clear_base_cache=False):
def __init__(self, trace, window=None, clear_base_cache=False, process_df=None):
super().__init__()
self.base_trace = trace

# evict all the non-raw dataframes from the base cache, as they are
# unlikely to be used anymore.
if clear_base_cache:
self.base_trace._cache.clear_all_events(raw=False)
self._clear_cache()

t_min, t_max = window
t_min, t_max = window or (None, None)
self.start = t_min if t_min is not None else self.base_trace.start
self.end = t_max if t_max is not None else self.base_trace.end
self._process_df = process_df or self._default_process_df

@staticmethod
def _default_process_df(event, df):
return df

@property
def trace_state(self):
return (self.start, self.end, self.base_trace.trace_state)
f = self._process_df
return (
self.start,
self.end,
# This likely will be a value that cannot be serialized to JSON if
# it was user-provided. This will prevent caching as it should.
None if f == self._default_process_df else f,
self.base_trace.trace_state,
)

def __getattr__(self, name):
return getattr(self.base_trace, name)
Expand All @@ -2278,9 +2327,11 @@ def df_event(self, event, **kwargs):
window = self.window
kwargs['window'] = window

return self.base_trace.df_event(event, **kwargs)
df = self.base_trace.df_event(event, **kwargs)
return self._process_df(event, df)

def get_view(self, window, **kwargs):
def get_view(self, window=None, process_df=None, **kwargs):
window = window or (None, None)
start = self.start
end = self.end

Expand All @@ -2290,7 +2341,19 @@ def get_view(self, window, **kwargs):
if window[1]:
end = min(end, window[1])

return self.base_trace.get_view(window=(start, end), **kwargs)
process_df = process_df or self._default_process_df
def _process_df(event, df):
return process_df(event, self._process_df(event, df))

return TraceView(
self,
window=(start, end),
process_df=_process_df,
**kwargs
)

def _clear_cache(self):
self.base_trace._clear_cache()

# One might be tempted to make that a subclass of collections.abc.Set: don't.
# The problem is that Set expects new instances to be created by passing an
Expand Down Expand Up @@ -2514,6 +2577,10 @@ def from_json_map(cls, mapping):
return cls(nf=nf, fmt=fmt)


class _CannotWriteSwapEntry(Exception):
pass


class _CacheDataSwapEntry:
"""
Entry in the data swap area of :class:`Trace`.
Expand Down Expand Up @@ -2558,10 +2625,18 @@ def to_json_map(self):
"""
Return a mapping suitable for JSON serialization.
"""
desc = self.cache_desc_nf.to_json_map()
try:
# Use json.dumps() here to fail early if the descriptor cannot be
# dumped to JSON
desc = json.dumps(desc)
except TypeError as e:
raise _CannotWriteSwapEntry(e)

return {
'version-token': VERSION_TOKEN,
'name': self.name,
'desc': self.cache_desc_nf.to_json_map(),
'encoded_desc': desc,
}

@classmethod
Expand All @@ -2572,7 +2647,8 @@ def from_json_map(cls, mapping, written=False):
if mapping['version-token'] != VERSION_TOKEN:
raise TraceCacheSwapVersionError('Version token differ')

cache_desc_nf = _CacheDataDescNF.from_json_map(mapping['desc'])
desc = json.loads(mapping['encoded_desc'])
cache_desc_nf = _CacheDataDescNF.from_json_map(desc)
name = mapping['name']
return cls(cache_desc_nf=cache_desc_nf, name=name, written=written)

Expand Down Expand Up @@ -3072,10 +3148,20 @@ def log_error(e):
else:
# Update the swap entry on disk
if write_meta:
swap_entry.to_path(
self._path_of_swap_entry(swap_entry)
)
swap_entry.written = True
try:
swap_entry.to_path(
self._path_of_swap_entry(swap_entry)
)
# We have a swap entry that cannot be written to the swap,
# probably because the descriptor includes something that
# cannot be serialized to JSON.
except _CannotWriteSwapEntry as e:
self.logger.debug(f'Could not write {cache_desc} to swap: {e}')
swap_entry.written = False
return
else:
swap_entry.written = True

self._swap_content[swap_entry.cache_desc_nf] = swap_entry

# Assume that reading from the swap will take as much time as
Expand Down Expand Up @@ -3338,12 +3424,12 @@ def write_swap(self, cache_desc, force=False, write_meta=True):
if force or self._should_evict_to_swap(cache_desc, data):
self._write_swap(cache_desc, data, write_meta)

def write_swap_all(self):
def write_swap_all(self, **kwargs):
"""
Attempt to write all cached data to the swap.
"""
for cache_desc in self._cache.keys():
self.write_swap(cache_desc)
self.write_swap(cache_desc, **kwargs)

def clear_event(self, event, raw=None):
"""
Expand Down Expand Up @@ -3723,6 +3809,9 @@ def __init__(self,
# the Trace is almost fully initialized
self.plat_info = plat_info.add_trace_src(self)

def _clear_cache(self):
self._cache.clear_all_events(raw=False)

@bothmethod
def _resolve_namespaces(self_or_cls, namespaces=None):
if not isinstance(self_or_cls, type):
Expand Down Expand Up @@ -3915,6 +4004,9 @@ class TraceProxy(TraceBase):
def get_view(self, *args, **kwargs):
return self.base_trace.get_view(*args, **kwargs)

def _clear_cache(self):
self.base_trace._clear_cache()

def __getattr__(self, attr):
try:
base_trace = self.__dict__['base_trace']
Expand Down Expand Up @@ -4605,8 +4697,8 @@ def has_events(self, events, namespaces=None):
else:
return True

def get_view(self, window, **kwargs):
return TraceView(self, window, **kwargs)
def get_view(self, *args, **kwargs):
return TraceView(self, *args, **kwargs)

@property
def start(self):
Expand Down