From b1003dd7cdb72d3295c7c28721b17d9a1b57a9c0 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Fri, 15 Dec 2023 18:12:49 +0100 Subject: [PATCH 1/2] lisa.trace: Add TraceView(process_df=...) parameter FEATURE Allow creating a TraceView that applies some arbitrary post-processing on the event dataframes. This provides the basis for creating arbitrary views such as timestamp shifting, or even task-specific view of the trace. --- lisa/trace.py | 117 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 22 deletions(-) diff --git a/lisa/trace.py b/lisa/trace.py index b584a6c98b..d34c0c820f 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -2123,12 +2123,24 @@ def window(self): return (self.start, self.end) @abc.abstractmethod - def get_view(self, window, **kwargs): + def df_event(self, event, *args, **kwargs): """ - Get a view on a trace cropped time-wise to fit in ``window`` + Return the :class:`pandas.DataFrame` for the given ``event``. + """ + pass - :Variable keyword arguments: Forwarded to the contructor of the view. + @abc.abstractmethod + def get_view(self, *args, **kwargs): """ + Get a view on a trace cropped time-wise to fit in ``window`` and with + event dataframes post processed with ``process_df``. + + :Variable arguments: Forwarded to the contructor of the view. + """ + + @abc.abstractmethod + def _clear_cache(self): + pass def __getitem__(self, window): if not isinstance(window, slice): @@ -2207,8 +2219,13 @@ class TraceView(Loggable, TraceBase): anymore, apart as a data server for the view. :type clear_base_cache: bool - :param window: The time window to base this view on - :type window: tuple(float, float) + :param window: The time window to base this view on. If ``None``, the whole + base trace will be selected. + :type window: tuple(float, float) or None + + :param process_df: Function used to post process the event dataframes + returned by :meth:`TraceBase.df_event`. + :type process_df: typing.Callable[[str, pandas.DataFrame], pandas.DataFrame] or None :Attributes: * ``base_trace``: The original :class`:`Trace` this view is based on. @@ -2241,22 +2258,35 @@ class TraceView(Loggable, TraceBase): mimics a regular :class:`Trace` using :func:`getattr`. """ - def __init__(self, trace, window, clear_base_cache=False): + def __init__(self, trace, window=None, clear_base_cache=False, process_df=None): super().__init__() self.base_trace = trace # evict all the non-raw dataframes from the base cache, as they are # unlikely to be used anymore. if clear_base_cache: - self.base_trace._cache.clear_all_events(raw=False) + self._clear_cache() - t_min, t_max = window + t_min, t_max = window or (None, None) self.start = t_min if t_min is not None else self.base_trace.start self.end = t_max if t_max is not None else self.base_trace.end + self._process_df = process_df or self._default_process_df + + @staticmethod + def _default_process_df(event, df): + return df @property def trace_state(self): - return (self.start, self.end, self.base_trace.trace_state) + f = self._process_df + return ( + self.start, + self.end, + # This likely will be a value that cannot be serialized to JSON if + # it was user-provided. This will prevent caching as it should. + None if f == self._default_process_df else f, + self.base_trace.trace_state, + ) def __getattr__(self, name): return getattr(self.base_trace, name) @@ -2278,9 +2308,11 @@ def df_event(self, event, **kwargs): window = self.window kwargs['window'] = window - return self.base_trace.df_event(event, **kwargs) + df = self.base_trace.df_event(event, **kwargs) + return self._process_df(event, df) - def get_view(self, window, **kwargs): + def get_view(self, window=None, process_df=None, **kwargs): + window = window or (None, None) start = self.start end = self.end @@ -2290,7 +2322,19 @@ def get_view(self, window, **kwargs): if window[1]: end = min(end, window[1]) - return self.base_trace.get_view(window=(start, end), **kwargs) + process_df = process_df or self._default_process_df + def _process_df(event, df): + return process_df(event, self._process_df(event, df)) + + return TraceView( + self, + window=(start, end), + process_df=_process_df, + **kwargs + ) + + def _clear_cache(self): + self.base_trace._clear_cache() # One might be tempted to make that a subclass of collections.abc.Set: don't. # The problem is that Set expects new instances to be created by passing an @@ -2514,6 +2558,10 @@ def from_json_map(cls, mapping): return cls(nf=nf, fmt=fmt) +class _CannotWriteSwapEntry(Exception): + pass + + class _CacheDataSwapEntry: """ Entry in the data swap area of :class:`Trace`. @@ -2558,10 +2606,18 @@ def to_json_map(self): """ Return a mapping suitable for JSON serialization. """ + desc = self.cache_desc_nf.to_json_map() + try: + # Use json.dumps() here to fail early if the descriptor cannot be + # dumped to JSON + desc = json.dumps(desc) + except TypeError as e: + raise _CannotWriteSwapEntry(e) + return { 'version-token': VERSION_TOKEN, 'name': self.name, - 'desc': self.cache_desc_nf.to_json_map(), + 'encoded_desc': desc, } @classmethod @@ -2572,7 +2628,8 @@ def from_json_map(cls, mapping, written=False): if mapping['version-token'] != VERSION_TOKEN: raise TraceCacheSwapVersionError('Version token differ') - cache_desc_nf = _CacheDataDescNF.from_json_map(mapping['desc']) + desc = json.loads(mapping['encoded_desc']) + cache_desc_nf = _CacheDataDescNF.from_json_map(desc) name = mapping['name'] return cls(cache_desc_nf=cache_desc_nf, name=name, written=written) @@ -3072,10 +3129,20 @@ def log_error(e): else: # Update the swap entry on disk if write_meta: - swap_entry.to_path( - self._path_of_swap_entry(swap_entry) - ) - swap_entry.written = True + try: + swap_entry.to_path( + self._path_of_swap_entry(swap_entry) + ) + # We have a swap entry that cannot be written to the swap, + # probably because the descriptor includes something that + # cannot be serialized to JSON. + except _CannotWriteSwapEntry as e: + self.logger.debug(f'Could not write {cache_desc} to swap: {e}') + swap_entry.written = False + return + else: + swap_entry.written = True + self._swap_content[swap_entry.cache_desc_nf] = swap_entry # Assume that reading from the swap will take as much time as @@ -3338,12 +3405,12 @@ def write_swap(self, cache_desc, force=False, write_meta=True): if force or self._should_evict_to_swap(cache_desc, data): self._write_swap(cache_desc, data, write_meta) - def write_swap_all(self): + def write_swap_all(self, **kwargs): """ Attempt to write all cached data to the swap. """ for cache_desc in self._cache.keys(): - self.write_swap(cache_desc) + self.write_swap(cache_desc, **kwargs) def clear_event(self, event, raw=None): """ @@ -3723,6 +3790,9 @@ def __init__(self, # the Trace is almost fully initialized self.plat_info = plat_info.add_trace_src(self) + def _clear_cache(self): + self._cache.clear_all_events(raw=False) + @bothmethod def _resolve_namespaces(self_or_cls, namespaces=None): if not isinstance(self_or_cls, type): @@ -3915,6 +3985,9 @@ class TraceProxy(TraceBase): def get_view(self, *args, **kwargs): return self.base_trace.get_view(*args, **kwargs) + def _clear_cache(self): + self.base_trace._clear_cache() + def __getattr__(self, attr): try: base_trace = self.__dict__['base_trace'] @@ -4605,8 +4678,8 @@ def has_events(self, events, namespaces=None): else: return True - def get_view(self, window, **kwargs): - return TraceView(self, window, **kwargs) + def get_view(self, *args, **kwargs): + return TraceView(self, *args, **kwargs) @property def start(self): From 4b3b14f4dd1098adda287fce0d0695ecbc004a8d Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Fri, 15 Dec 2023 19:17:34 +0100 Subject: [PATCH 2/2] lisa.trace: Add TraceBase.with_time_offset() FEATURE Allow creating a TraceView that shifts all the timestamps by some user-defined amount. --- lisa/trace.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lisa/trace.py b/lisa/trace.py index d34c0c820f..f3c0720fe8 100644 --- a/lisa/trace.py +++ b/lisa/trace.py @@ -2142,6 +2142,25 @@ def get_view(self, *args, **kwargs): def _clear_cache(self): pass + def with_time_offset(self, offset): + """ + Get a view on the trace with time shifted by ``offset``. + + This can be convenient when trying to align mutliple traces coming from + repetition of the same experiment. + + .. note:: Some analysis functions may not handle well negative + timestamps, so it might be a good idea to slice the trace to only + contain positive timestamps after this operation. + """ + + def time_offset(event, df): + df = df.copy(deep=False) + df.index = df.index + offset + return df + + return self.get_view(process_df=time_offset) + def __getitem__(self, window): if not isinstance(window, slice): raise TypeError("Cropping window must be an instance of slice")