Skip to content

Commit

Permalink
Merge pull request #557 from elfenpiff/iox2-555-health-monitoring-exa…
Browse files Browse the repository at this point in the history
…mple

[#555] health monitoring example
  • Loading branch information
elfenpiff authored Dec 19, 2024
2 parents dc18bff + b0484ea commit fcde364
Show file tree
Hide file tree
Showing 12 changed files with 554 additions and 2 deletions.
1 change: 1 addition & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* Support dynamic data with reallocation for publish-subscribe communication [#532](https://github.com/eclipse-iceoryx/iceoryx2/issues/532)
* Add benchmark for iceoryx2 queues [#535](https://github.com/eclipse-iceoryx/iceoryx2/issues/535)
* Add auto event mission for create, drop and dead notifiers [#550](https://github.com/eclipse-iceoryx/iceoryx2/issues/550)
* Introduce health monitoring example [#555](https://github.com/eclipse-iceoryx/iceoryx2/issues/555)

### Bugfixes

Expand Down
18 changes: 18 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ path = "rust/event_based_communication/publisher.rs"
name = "event_based_comm_subscriber"
path = "rust/event_based_communication/subscriber.rs"

# event based communication

[[example]]
name = "health_monitoring_publisher_1"
path = "rust/health_monitoring/publisher_1.rs"

[[example]]
name = "health_monitoring_publisher_2"
path = "rust/health_monitoring/publisher_2.rs"

[[example]]
name = "health_monitoring_subscriber"
path = "rust/health_monitoring/subscriber.rs"

[[example]]
name = "health_monitoring_central_daemon"
path = "rust/health_monitoring/central_daemon.rs"

# publish_subscribe

[[example]]
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ These types are demonstrated in the complex data types example.
| event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. |
| event based communication | [C++](cxx/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. |
| event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. |
| health monitoring | [Rust](rust/health_monitoring) | A central daemon creates the communication resources and monitors all nodes. When the central daemon crashes other nodes can take over and use the decentral API to monitor the nodes. |
| publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). |
| publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. |
| publish subscribe with user header | [C](c/publish_subscribe_with_user_header) [C++](cxx/publish_subscribe_with_user_header) [Rust](rust/publish_subscribe_with_user_header) | Add a user header to the payload (samples) to transfer additional information. |
| service attributes | [Rust](rust/service_attributes) | Creates a service with custom attributes that are available to every endpoint. If the attributes are not compatible the service will not open. |
| service attributes | [C++](cxx/service_attributes) [Rust](rust/service_attributes) | Creates a service with custom attributes that are available to every endpoint. If the attributes are not compatible the service will not open. |
24 changes: 24 additions & 0 deletions examples/rust/_examples_common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,27 @@ mod transmission_data;
pub use custom_header::CustomHeader;
pub use pubsub_event::PubSubEvent;
pub use transmission_data::TransmissionData;

use iceoryx2::{
prelude::*,
service::port_factory::{event, publish_subscribe},
};

pub type ServiceTuple = (
event::PortFactory<ipc::Service>,
publish_subscribe::PortFactory<ipc::Service, u64, ()>,
);

pub fn open_service(
node: &Node<ipc::Service>,
service_name: &ServiceName,
) -> Result<ServiceTuple, Box<dyn std::error::Error>> {
let service_pubsub = node
.service_builder(service_name)
.publish_subscribe::<u64>()
.open()?;

let service_event = node.service_builder(service_name).event().open()?;

Ok((service_event, service_pubsub))
}
2 changes: 2 additions & 0 deletions examples/rust/_examples_common/pubsub_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum PubSubEvent {
SentSample = 4,
ReceivedSample = 5,
SentHistory = 6,
ProcessDied = 7,
Unknown,
}

Expand All @@ -39,6 +40,7 @@ impl From<EventId> for PubSubEvent {
4 => PubSubEvent::SentSample,
5 => PubSubEvent::ReceivedSample,
6 => PubSubEvent::SentHistory,
7 => PubSubEvent::ProcessDied,
_ => PubSubEvent::Unknown,
}
}
Expand Down
57 changes: 57 additions & 0 deletions examples/rust/health_monitoring/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

load("@rules_rust//rust:defs.bzl", "rust_binary")

rust_binary(
name = "publisher_1",
srcs = [
"publisher_1.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)

rust_binary(
name = "publisher_2",
srcs = [
"publisher_2.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)

rust_binary(
name = "subscriber",
srcs = [
"subscriber.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)

rust_binary(
name = "central_daemon",
srcs = [
"central_daemon.rs",
],
deps = [
"//iceoryx2:iceoryx2",
"//examples/rust:examples-common",
],
)
120 changes: 120 additions & 0 deletions examples/rust/health_monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Health Monitoring

This example demonstrates how to create a robust system using iceoryx2.
A central daemon pre-creates all communication resources to ensure that every
required resource, such as memory, is available as soon as the application
starts.
Additionally, the subscriber is immediately informed if one of the processes
it depends on has crashed. Even if the central daemon itself crashes,
communication can continue without any restrictions. Thanks to the
decentralized API of iceoryx2, the subscriber can take over the role of the
central daemon and continue monitoring all processes.

The communication must also be reliable, and we expect publishers to provide
updates at regular intervals. If a publisher misses a deadline, we want to be
informed immediately. This situation can occur if the system is under heavy
load or if a process has crashed.

This example is more advanced and consists of four components:

* `central_daemon` - Must run first. It creates all communication resources and
monitors all nodes/processes.
* `publisher_1` - Sends data at a specific frequency on `service_1`.
* `publisher_2` - Sends data at a specific frequency on `service_2`.
* `subscriber` - Connects to `service_1` and `service_2` and expects new samples
within a specific time. If no sample arrives, it proactively checks for dead
nodes.

```ascii
+----------------+ creates ...........................
| central_daemon | ----------> : communication resources :
+----------------+ ...........................
| ^
| opens |
| +-----------------+--------------+
| | | |
| +-------------+ +-------------+ +------------+
| | publisher_1 | | publisher_2 | | subscriber |
| +-------------+ +-------------+ +------------+
| ^ ^ ^
| monitores | | |
+-------------+-------------------+-----------------+
```

## Running The Example

> [!CAUTION]
> Every payload you transmit with iceoryx2 must be compatible with shared
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation -> `#[repr(C)]`
> * not use pointers to manage their internal structure
>
> Data types like `String` or `Vec` will cause undefined behavior and may
> result in segmentation faults. We provide alternative data types that are
> compatible with shared memory. See the
> [complex data type example](../complex_data_types) for guidance on how to
> use them.
For this example, you need to open five separate terminals.

## Terminal 1: Central Daemon - Create All Communication Resources

Run the central daemon, which sets up all communication resources and monitors
processes.

```sh
cargo run --example health_monitoring_central_daemon
```

## Terminal 2: Publisher 1

Run the first publisher, which sends data on `service_1`.

```sh
cargo run --example health_monitoring_publisher_1
```

## Terminal 3: Publisher 2

Run the second publisher, which sends data on `service_2`.

```sh
cargo run --example health_monitoring_publisher_2
```

## Terminal 4: Subscriber

Run the subscriber, which listens to both `service_1` and `service_2`.

```sh
cargo run --example health_monitoring_subscriber
```

## Terminal 5: Simulate Process Crashes

Send a `SIGKILL` signal to `publisher_1` to simulate a fatal crash. This
ensures that the process is unable to clean up any resources.

```sh
killall -9 health_monitoring_publisher_1
```

After running this command:

1. The `central_daemon` will detect that the process has crashed and print:
```ascii
detected dead node: Some(NodeName { value: "publisher 1" })
```
The event service is configured to emit a `PubSub::ProcessDied` event when a
process is identified as dead.

2. On the `subscriber` side, you will see the message:
```ascii
ServiceName { value: "service_1" }: process died!
```

3. Since `publisher_1` is no longer sending messages, the subscriber will also
regularly print another message indicating that `service_1` has violated
the contract because no new samples are being received.
95 changes: 95 additions & 0 deletions examples/rust/health_monitoring/central_daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use core::time::Duration;
use examples_common::PubSubEvent;
use iceoryx2::{node::NodeView, prelude::*};

const CYCLE_TIME: Duration = Duration::from_millis(100);

fn main() -> Result<(), Box<dyn std::error::Error>> {
let service_name_1 = ServiceName::new("service_1")?;
let service_name_2 = ServiceName::new("service_2")?;

let node = NodeBuilder::new()
.name(&"watchdog and resource creator".try_into()?)
.create::<ipc::Service>()?;

// The central daemon is responsible to create all services before hand and the other processes
// just open the communication resources and start communicating.
let _service_pubsub_1 = node
.service_builder(&service_name_1)
.publish_subscribe::<u64>()
// We use here open_or_create so that, in case of a crash of the central daemon, it can
// be restarted.
.open_or_create()?;

let _service_event_1 = node
.service_builder(&service_name_1)
.event()
// Whenever a new notifier is created the PublisherConnected event is emitted. this makes
// sense since in this example a notifier is always created after a new publisher was
// created.
// The task of the notifier/event is it to inform and wake up other processes when certain
// system event have happened.
.notifier_created_event(PubSubEvent::PublisherConnected.into())
.notifier_dropped_event(PubSubEvent::PublisherDisconnected.into())
// This event is emitted when either the central daemon or a decentralized process detects
// a dead node and cleaned up all of its stale resources succesfully.
.notifier_dead_event(PubSubEvent::ProcessDied.into())
.open_or_create()?;

let _service_pubsub_2 = node
.service_builder(&service_name_2)
.publish_subscribe::<u64>()
.open_or_create()?;

let _service_event_2 = node
.service_builder(&service_name_2)
.event()
.notifier_created_event(PubSubEvent::PublisherConnected.into())
.notifier_dropped_event(PubSubEvent::PublisherDisconnected.into())
.notifier_dead_event(PubSubEvent::ProcessDied.into())
.open_or_create()?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let _cycle_guard = waitset.attach_interval(CYCLE_TIME);

println!("Central daemon up and running.");
waitset.wait_and_process(|_| {
// The only task of our central daemon is it to monitor all running nodes and cleanup their
// resources if a process has died.
//
// Since we added the notifier_dead_event to the service, all listeners, that are waiting
// on a service where one participant has died, will be woken up and they receive
// the PubSubEvent::ProcessDied
find_and_cleanup_dead_nodes();
CallbackProgression::Continue
})?;

Ok(())
}

fn find_and_cleanup_dead_nodes() {
Node::<ipc::Service>::list(Config::global_config(), |node_state| {
if let NodeState::Dead(state) = node_state {
println!(
"detected dead node: {:?}",
state.details().as_ref().map(|v| v.name())
);
state.remove_stale_resources().expect("");
}

CallbackProgression::Continue
})
.expect("");
}
Loading

0 comments on commit fcde364

Please sign in to comment.