Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure IbusMsg #33

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ chrono = { version = "0.4", features = ["serde"] }
convert_case = "0.6"
criterion = "0.4"
crossbeam-channel = "0.5"
derive_more = { version="1.0.0", features = ["from"] }
derive-new = "0.5"
enum-as-inner = "0.6"
fletcher = "1.0"
Expand Down
49 changes: 27 additions & 22 deletions holo-bfd/src/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use holo_protocol::{
InstanceChannelsTx, InstanceShared, MessageReceiver, ProtocolInstance,
};
use holo_utils::bfd::{PathType, State};
use holo_utils::ibus::IbusMsg;
use holo_utils::ibus::{BfdSessionMsg, IbusMsg, InterfaceMsg};
use holo_utils::ip::AddressFamily;
use holo_utils::protocol::Protocol;
use holo_utils::task::Task;
Expand Down Expand Up @@ -144,7 +144,7 @@ impl ProtocolInstance for Master {

async fn init(&mut self) {
// Request information about all interfaces.
let _ = self.tx.ibus.send(IbusMsg::InterfaceDump);
let _ = self.tx.ibus.send(InterfaceMsg::Dump.into());
}

async fn process_ibus_msg(&mut self, msg: IbusMsg) {
Expand Down Expand Up @@ -229,27 +229,32 @@ async fn process_ibus_msg(
msg: IbusMsg,
) -> Result<(), Error> {
match msg {
// BFD peer registration.
IbusMsg::BfdSessionReg {
client_id,
sess_key,
client_config,
} => events::process_client_peer_reg(
master,
sess_key,
client_id,
client_config,
)?,
// BFD peer unregistration.
IbusMsg::BfdSessionUnreg {
sess_key,
client_id,
} => events::process_client_peer_unreg(master, sess_key, client_id)?,
// Interface update notification.
IbusMsg::InterfaceUpd(msg) => {
southbound::process_iface_update(master, msg);
// BFD Session
IbusMsg::BfdSession(bfd_msg) => match bfd_msg {
BfdSessionMsg::Registration {
sess_key,
client_id,
client_config,
} => events::process_client_peer_reg(
master,
sess_key,
client_id,
client_config,
)?,
BfdSessionMsg::Unregistration {
sess_key,
client_id,
} => {
events::process_client_peer_unreg(master, sess_key, client_id)?
}
_ => {}
},

// Interface
IbusMsg::Interface(InterfaceMsg::Update(msg)) => {
southbound::process_iface_update(master, msg)
}
// Ignore other events.

_ => {}
}

Expand Down
6 changes: 3 additions & 3 deletions holo-bfd/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use generational_arena::{Arena, Index};
use holo_northbound::yang::control_plane_protocol::bfd;
use holo_protocol::InstanceChannelsTx;
use holo_utils::bfd::{ClientCfg, ClientId, SessionKey, State};
use holo_utils::ibus::IbusMsg;
use holo_utils::ibus::BfdSessionMsg;
use holo_utils::ip::{IpAddrExt, IpAddrKind};
use holo_utils::socket::{UdpSocket, TTL_MAX};
use holo_utils::task::{IntervalTask, TimeoutTask};
Expand Down Expand Up @@ -141,11 +141,11 @@ impl Session {

// Notify protocol clients about the state transition if necessary.
if self.should_notify_clients(old_state) && !self.clients.is_empty() {
let msg = IbusMsg::BfdStateUpd {
let msg = BfdSessionMsg::Update {
sess_key: self.key.clone(),
state,
};
let _ = tx.ibus.send(msg);
let _ = tx.ibus.send(msg.into());
}

// Send YANG notification.
Expand Down
66 changes: 41 additions & 25 deletions holo-bgp/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use holo_protocol::{
InstanceChannelsTx, InstanceShared, MessageReceiver, ProtocolInstance,
};
use holo_utils::bgp::AfiSafi;
use holo_utils::ibus::IbusMsg;
use holo_utils::ibus::{
IbusMsg, NexthopMsg, PolicyMsg, RouteRedistributeMsg, RouterIdMsg,
};
use holo_utils::ip::AddressFamily;
use holo_utils::policy::PolicyType;
use holo_utils::protocol::Protocol;
Expand Down Expand Up @@ -447,36 +449,50 @@ async fn process_ibus_msg(
msg: IbusMsg,
) -> Result<(), Error> {
match msg {
IbusMsg::NexthopUpd { addr, metric } => {
// Nexthop
IbusMsg::Nexthop(NexthopMsg::Update { addr, metric }) => {
// Nexthop tracking update notification.
southbound::rx::process_nht_update(instance, addr, metric);
}
IbusMsg::RouterIdUpdate(router_id) => {

// Router ID
IbusMsg::RouterId(RouterIdMsg::Update(router_id)) => {
// Router ID update notification.
southbound::rx::process_router_id_update(instance, router_id).await;
}
IbusMsg::PolicyMatchSetsUpd(match_sets) => {
// Update the local copy of the policy match sets.
instance.shared.policy_match_sets = match_sets;
}
IbusMsg::PolicyUpd(policy) => {
// Update the local copy of the policy definition.
instance
.shared
.policies
.insert(policy.name.clone(), policy.clone());
}
IbusMsg::PolicyDel(policy_name) => {
// Remove the local copy of the policy definition.
instance.shared.policies.remove(&policy_name);
}
IbusMsg::RouteRedistributeAdd(msg) => {
// Route redistribute update notification.
southbound::rx::process_route_add(instance, msg);
}
IbusMsg::RouteRedistributeDel(msg) => {
// Route redistribute delete notification.
southbound::rx::process_route_del(instance, msg);

// policy
IbusMsg::Policy(policy_msg) => match policy_msg {
PolicyMsg::MatchSetsUpdate(match_sets) => {
// Update the local copy of the policy match sets.
instance.shared.policy_match_sets = match_sets;
}
PolicyMsg::Update(policy) => {
// Update the local copy of the policy definition.
instance
.shared
.policies
.insert(policy.name.clone(), policy.clone());
}
PolicyMsg::Delete(policy_name) => {
// Remove the local copy of the policy definition.
instance.shared.policies.remove(&policy_name);
}
},

// route redistribute
IbusMsg::RouteRedistribute(route_redistribute_msg) => {
match route_redistribute_msg {
RouteRedistributeMsg::Add(msg) => {
// Route redistribute update notification.
southbound::rx::process_route_add(instance, msg);
}
RouteRedistributeMsg::Delete(msg) => {
// Route redistribute delete notification.
southbound::rx::process_route_del(instance, msg);
}
_ => {}
}
}
// Ignore other events.
_ => {}
Expand Down
13 changes: 8 additions & 5 deletions holo-bgp/src/northbound/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use holo_northbound::configuration::{
};
use holo_northbound::yang::control_plane_protocol::bgp;
use holo_utils::bgp::AfiSafi;
use holo_utils::ibus::IbusMsg;
use holo_utils::ibus::RouteRedistributeMsg;
use holo_utils::ip::{AddressFamily, IpAddrKind};
use holo_utils::policy::{ApplyPolicyCfg, DefaultPolicyType};
use holo_utils::protocol::Protocol;
Expand Down Expand Up @@ -1363,10 +1363,13 @@ impl Provider for Instance {
}
}
Event::RedistributeRequest(protocol, af) => {
let _ = self.tx.ibus.send(IbusMsg::RouteRedistributeDump {
protocol,
af: Some(af),
});
let _ = self.tx.ibus.send(
RouteRedistributeMsg::Dump {
protocol,
af: Some(af),
}
.into(),
);
}
Event::RedistributeDelete(protocol, afi_safi) => {
let Some((mut instance, _)) = self.as_up() else {
Expand Down
26 changes: 12 additions & 14 deletions holo-bgp/src/southbound/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::collections::BTreeSet;
use std::net::IpAddr;

use holo_utils::ibus::{IbusMsg, IbusSender};
use holo_utils::ibus::{IbusSender, NexthopMsg, RouteIpMsg, RouterIdMsg};
use holo_utils::protocol::Protocol;
use holo_utils::southbound::{
Nexthop, RouteKeyMsg, RouteMsg, RouteOpaqueAttrs,
Expand All @@ -19,7 +19,7 @@ use crate::rib::LocalRoute;
// ===== global functions =====

pub(crate) fn router_id_query(ibus_tx: &IbusSender) {
let _ = ibus_tx.send(IbusMsg::RouterIdQuery);
let _ = ibus_tx.send(RouterIdMsg::Query.into());
}

pub(crate) fn route_install(
Expand All @@ -41,38 +41,36 @@ pub(crate) fn route_install(
.collect::<BTreeSet<_>>();

// Install route.
let msg = RouteMsg {
let msg = RouteIpMsg::Add(RouteMsg {
protocol: Protocol::BGP,
prefix: prefix.into(),
distance: distance.into(),
metric: route.attrs.base.value.med.unwrap_or(0),
tag: None,
opaque_attrs: RouteOpaqueAttrs::None,
nexthops: nexthops.clone(),
};
let msg = IbusMsg::RouteIpAdd(msg);
let _ = ibus_tx.send(msg);
});
let _ = ibus_tx.send(msg.into());
}

pub(crate) fn route_uninstall(
ibus_tx: &IbusSender,
prefix: impl Into<IpNetwork>,
) {
// Uninstall route.
let msg = RouteKeyMsg {
let msg = RouteIpMsg::Delete(RouteKeyMsg {
protocol: Protocol::BGP,
prefix: prefix.into(),
};
let msg = IbusMsg::RouteIpDel(msg);
let _ = ibus_tx.send(msg);
});
let _ = ibus_tx.send(msg.into());
}

pub(crate) fn nexthop_track(ibus_tx: &IbusSender, addr: IpAddr) {
let msg = IbusMsg::NexthopTrack(addr);
let _ = ibus_tx.send(msg);
let msg = NexthopMsg::Track(addr);
let _ = ibus_tx.send(msg.into());
}

pub(crate) fn nexthop_untrack(ibus_tx: &IbusSender, addr: IpAddr) {
let msg = IbusMsg::NexthopUntrack(addr);
let _ = ibus_tx.send(msg);
let msg = NexthopMsg::Untrack(addr);
let _ = ibus_tx.send(msg.into());
}
Loading
Loading