Skip to content

Commit

Permalink
Merge pull request #285 from powerapi-ng/fix/spawn-dispatcher-pickle-…
Browse files Browse the repository at this point in the history
…error

fix(dispatcher): Fix pickle error on formula factory when using spawn method
  • Loading branch information
gfieni authored Mar 18, 2024
2 parents d069837 + f9b5807 commit 4fdf318
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 85 deletions.
11 changes: 1 addition & 10 deletions src/powerapi/cli/binding_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,13 @@ def process_bindings(self):
# We look for the pusher on each dispatcher in order to replace it by
# the processor
for _, puller in self.pullers:

for current_filter in puller.state.report_filter.filters:
dispatcher = current_filter[1]

number_of_pushers = len(dispatcher.pusher)
pusher_updated = False

for index in range(number_of_pushers):
for index in range(len(dispatcher.pusher)):
if dispatcher.pusher[index] == pusher_actor:
dispatcher.pusher[index] = processor
pusher_updated = True
break

if pusher_updated:
dispatcher.update_state_formula_factory()

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
Expand Down
91 changes: 16 additions & 75 deletions src/powerapi/dispatcher/dispatcher_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,43 +68,28 @@ class DispatcherState(State):
<powerapi.dispatcher.dispatcher_actor.DispatcherState.formula_factory>`
"""

def __init__(self, actor, formula_factory, route_table):
def __init__(self, actor, pushers, route_table):
"""
:param func initial_behaviour: Function that define
the initial_behaviour
:param socket_interface: Communication interface of the actor
:type socket_interface: powerapi.SocketInterface
:param formula_factory: Factory for Formula creation.
:type formula_factory: func((formula_id) -> powerapi.Formula)
:param route_table: initialized route table
:type route_table: powerapi.dispatcher.state.RouteTable
:param actor: Dispatcher actor instance
:param pushers: List of pushers
:param route_table: Route table to use for reports
"""
State.__init__(self, actor)
super().__init__(actor)

#: (dict): Store the formula by id
self.formula_dict = {}

#: (utils.Tree): Tree store of the formula for faster
#: DispatchRule
self.formula_tree = Tree()

#: (func): Factory for formula creation
self.formula_factory = formula_factory

self.pushers = pushers
self.route_table = route_table

def add_formula(self, formula_id):
"""
Create a formula corresponding to the given formula id
and add it in memory
:param tuple formula_id: Define the key corresponding to
a specific Formula
Create a formula corresponding to the given formula id and add it in memory.
:param tuple formula_id: Define the key corresponding to a specific Formula
"""
formula = self.actor.formula_init_function(name=str((self.actor.name,) + formula_id), pushers=self.pushers)
self.supervisor.launch_actor(formula, start_message=False)

formula = self.formula_factory(formula_id)
self.formula_dict[formula_id] = formula
self.formula_tree.add(list(formula_id), formula)

Expand All @@ -131,29 +116,10 @@ def get_corresponding_formula(self, formula_id):
"""
return self.formula_tree.get(formula_id)

def get_all_formula(self):
"""
Get all the Formula created by the Dispatcher
:return: List of the Formula
:rtype: list((formula_id, Formula), ...)
"""
return self.formula_dict.items()

def set_formula_factory(self, formula_factory: Callable):
"""
Set the formula_factory function
:param Callable formula_factory: The new formula_factory
"""
self.formula_factory = formula_factory


class DispatcherActor(Actor):
"""
DispatcherActor class herited from Actor.
Route message to the corresponding Formula, and create new one
if no Formula exist for this message.
Dispatcher Actor.
"""

def __init__(self, name: str, formula_init_function: Callable, pushers: [], route_table: RouteTable,
Expand All @@ -163,50 +129,25 @@ def __init__(self, name: str, formula_init_function: Callable, pushers: [], rout
:param func formula_init_function: Function for creating Formula
:param route_table: initialized route table of the DispatcherActor
:param int level_logger: Define the level of the logger
:param bool timeout: Define the time in millisecond to wait for a
message before run timeout_handler
:param bool timeout: Define the time in millisecond to wait for a message before run timeout_handler
"""
Actor.__init__(self, name, level_logger, timeout)

# (func): Function for creating Formula
self.formula_init_function = formula_init_function

self.pushers = pushers

# (powerapi.DispatcherState): Actor state
self.state = DispatcherState(self, self._create_factory(pushers), route_table)
self.state = DispatcherState(self, pushers, route_table)

def setup(self):
"""
Check if there is a primary group by rule. Set define
StartMessage, PoisonPillMessage and Report handlers
Setup Dispatcher actor report handlers.
"""
Actor.setup(self)
super().setup()

if self.state.route_table.primary_dispatch_rule is None:
raise NoPrimaryDispatchRuleRuleException()

self.add_handler(Report, FormulaDispatcherReportHandler(self.state))
self.add_handler(PoisonPillMessage, DispatcherPoisonPillMessageHandler(self.state))
self.add_handler(StartMessage, StartHandler(self.state))

def _create_factory(self, pushers: []):
"""
Create the full Formula Factory
:return: Formula Factory
:rtype: func(formula_id) -> Formula
"""
formula_init_function = self.formula_init_function

def factory(formula_id):
formula = formula_init_function(name=str((self.name,) + formula_id), pushers=pushers)
self.state.supervisor.launch_actor(formula, start_message=False)
return formula

return factory

def update_state_formula_factory(self):
"""
Update the formula_factory function of the state by using the pusher list
"""
self.state.set_formula_factory(self._create_factory(self.pushers))

0 comments on commit 4fdf318

Please sign in to comment.