Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing #1

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,18 @@ jobs:

- name: Ensure the project builds
run: cargo build
test:
needs: install
runs-on: ubuntu-latest
steps:
- name: Use cashed cargo
uses: actions/cache@v3
with:
path: ~/.cargo
key: ${{ runner.os }}-rust-${{ hashFiles('rust-toolchain.toml') }}

- name: Checkout the source code
uses: actions/checkout@v3

- name: Ensure the project builds
run: cargo test -- --test-threads=1
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license = "GPL-3.0-only"

[workspace]
members = [
"bin/processor",
"bin/server",
"bin/tracker",
"routes",
Expand Down
13 changes: 13 additions & 0 deletions bin/processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "processor"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log = "0.4"
shared = { path = "../../shared" }
env_logger = "0.10.1"
polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.1.0" }
types = { path = "../../types" }
84 changes: 84 additions & 0 deletions bin/processor/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// This file is part of RegionX.
//
// RegionX is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// RegionX is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with RegionX. If not, see <https://www.gnu.org/licenses/>.

use shared::{
config::config,
consumption::{delete_consumption, get_consumption, write_batch_consumption},
registry::registered_paras,
};
use std::collections::BTreeMap;
use types::WeightConsumption;

const LOG_TARGET: &str = "processor";

fn main() {
env_logger::init();

let outputs = config().outputs;
let paras = registered_paras();

paras.iter().for_each(|para| {
let mut processed = BTreeMap::new();

log::info!(
target: LOG_TARGET,
"{}-{} - Processing consumption.",
para.relay_chain,
para.para_id,
);

(0..outputs).for_each(|output_index| {
let consumption = if let Ok(data) = get_consumption(para.clone(), Some(output_index)) {
data
} else {
log::error!(
target: LOG_TARGET,
"{}-{} - Failed to get consumption.",
para.relay_chain,
para.para_id,
);
vec![]
};

consumption.into_iter().for_each(|data| {
processed.entry(data.block_number).or_insert(data);
});
});

let processed: Vec<WeightConsumption> = processed.values().cloned().collect();

log::info!(
target: LOG_TARGET,
"{}-{} - Writing processed consumption. Total blocks tracked: {}",
para.relay_chain,
para.para_id,
processed.len()
);

if let Err(e) = write_batch_consumption(para.clone(), processed) {
log::error!(
target: LOG_TARGET,
"{}-{} - Failed to write batch consumption: {:?}",
para.relay_chain,
para.para_id,
e,
);

return;
}

(0..outputs).for_each(|output_index| delete_consumption(para.clone(), output_index));
});
}
24 changes: 17 additions & 7 deletions bin/tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,31 @@ async fn track_weight_consumption(para: Parachain, rpc_index: usize) {
let Some(rpc) = para.rpcs.get(rpc_index) else {
log::error!(
target: LOG_TARGET,
"Parachain {}-{} doesn't have an rpc with index: {}",
"{}-{} - doesn't have an rpc with index: {}",
para.relay_chain, para.para_id, rpc_index,
);
return;
};

log::info!("Starting to track consumption for: {}-{}", para.relay_chain, para.para_id);
log::info!("{}-{} - Starting to track consumption.", para.relay_chain, para.para_id);
let result = OnlineClient::<PolkadotConfig>::from_url(rpc).await;

if let Ok(api) = result {
if let Err(err) = track_blocks(api, para, rpc_index).await {
if let Err(err) = track_blocks(api, para.clone(), rpc_index).await {
log::error!(
target: LOG_TARGET,
"Failed to track new block: {:?}",
"{}-{} - Failed to track new block: {:?}",
para.relay_chain,
para.para_id,
err
);
}
} else {
log::error!(
target: LOG_TARGET,
"Failed to create online client: {:?}",
"{}-{} - Failed to create online client: {:?}",
para.relay_chain,
para.para_id,
result
);
}
Expand All @@ -110,7 +114,13 @@ async fn track_blocks(
para: Parachain,
rpc_index: usize,
) -> Result<(), Box<dyn std::error::Error>> {
log::info!("Subsciribing to finalized blocks for: {}", para.para_id);
log::info!(
target: LOG_TARGET,
"{}-{} - Subsciribing to finalized blocks",
para.relay_chain,
para.para_id
);

let mut blocks_sub = api
.blocks()
.subscribe_finalized()
Expand All @@ -136,7 +146,7 @@ async fn note_new_block(
let timestamp = timestamp_at(api.clone(), block.hash()).await?;
let consumption = weight_consumption(api, block_number, timestamp).await?;

write_consumption(para, consumption, rpc_index)?;
write_consumption(para, consumption, Some(rpc_index))?;

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
output_directory = "out/out"
output_directory = "out/"
registry = "registry.json"
chaindata = "chaindata.json"
outputs = 2

#[payment_info]
#rpc_url = "wss://rococo-rpc.polkadot.io"
Expand Down
56 changes: 55 additions & 1 deletion registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,60 @@
],
"para_id": 2000,
"relay_chain": "Polkadot",
"expiry_timestamp": 1706728108
"expiry_timestamp": 1709740246
},
{
"name": "Polkadot",
"rpcs": [
"wss://rpc-polkadot.luckyfriday.io",
"wss://polkadot-rpc.dwellir.com"
],
"para_id": 0,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "Moonbeam",
"rpcs": [
"wss://moonbeam-rpc.dwellir.com",
"wss://wss.api.moonbeam.network",
"wss://1rpc.io/glmr",
"wss://moonbeam.unitedbloc.com"
],
"para_id": 2004,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "Astar",
"rpcs": [
"wss://astar.public.curie.radiumblock.co/ws",
"wss://rpc.astar.network",
"wss://astar-rpc.dwellir.com",
"wss://1rpc.io/astr"
],
"para_id": 2006,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "HydraDX",
"rpcs": [
"wss://hydradx-rpc.dwellir.com",
"wss://rpc.hydradx.cloud"
],
"para_id": 2034,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "Zeitgeist",
"rpcs": [
"wss://zeitgeist-rpc.dwellir.com",
"wss://main.rpc.zeitgeist.pm/ws"
],
"para_id": 2092,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
}
]
1 change: 1 addition & 0 deletions routes/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ output_directory = "mock-out"
registry = "mock-parachains.json"
chaindata = "../chaindata.json"
free_mode = true
outputs = 1

[payment_info]
rpc_url = "wss://rococo-rpc.polkadot.io"
Expand Down
Empty file added routes/mock-out/out/placeholder
Empty file.
2 changes: 1 addition & 1 deletion routes/src/consumption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn consumption(
let (start, end) = (start.unwrap_or_default(), end.unwrap_or(Timestamp::MAX));

// By default query the consumption that was collected from rpc index 0.
let weight_consumptions: Vec<WeightConsumption> = get_consumption(para, 0)
let weight_consumptions: Vec<WeightConsumption> = get_consumption(para, None)
.map_err(|_| Error::ConsumptionDataNotFound)?
.into_iter()
.filter(|consumption| consumption.timestamp >= start && consumption.timestamp <= end)
Expand Down
6 changes: 4 additions & 2 deletions routes/src/extend_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn extend_subscription(data: Json<ExtendSubscriptionData>) -> Result<(

log::info!(
target: LOG_TARGET,
"Attempting to extend subscription for para: {}:{}",
"{}-{} - Attempting to extend subscription for para",
relay_chain, para_id
);

Expand Down Expand Up @@ -76,7 +76,9 @@ pub async fn extend_subscription(data: Json<ExtendSubscriptionData>) -> Result<(
if let Err(err) = update_registry(paras) {
log::error!(
target: LOG_TARGET,
"Failed to extend subscription for para: {:?}",
"{}-{} Failed to extend subscription for para: {:?}",
para.relay_chain,
para.para_id,
err
);
}
Expand Down
8 changes: 5 additions & 3 deletions routes/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn register_para(registration_data: Json<RegistrationData>) -> Result<

log::info!(
target: LOG_TARGET,
"Attempting to register para: {}:{}",
"{}-{} - Attempting to register para",
relay_chain, para_id
);

Expand All @@ -71,12 +71,14 @@ pub async fn register_para(registration_data: Json<RegistrationData>) -> Result<

para.expiry_timestamp = current_timestamp() + subscription_duration;

paras.push(para);
paras.push(para.clone());

if let Err(err) = update_registry(paras) {
log::error!(
target: LOG_TARGET,
"Failed to register para: {:?}",
"{}-{} - Failed to register para: {:?}",
para.relay_chain,
para.para_id,
err
);
}
Expand Down
2 changes: 1 addition & 1 deletion routes/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl MockEnvironment {

for (para, weight_consumptions) in &mock.weight_consumptions {
weight_consumptions.iter().for_each(|consumption| {
write_consumption(para.clone(), consumption.clone(), 0)
write_consumption(para.clone(), consumption.clone(), None)
.expect("Failed to write conusumption data");
});
}
Expand Down
9 changes: 1 addition & 8 deletions scripts/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
TRACKER_LOGS_0="logs/tracker-logs-0.out"
TRACKER_LOGS_1="logs/tracker-logs-1.out"

WATCHDOG_LOGS_0="logs/watchdog-logs-0.out"
WATCHDOG_LOGS_1="logs/watchdog-logs-1.out"

TRACKER="./target/release/tracker"
WATCHDOG="scripts/watchdog.sh"

reset_env() {
PIDS=$(pgrep -f "$TRACKER|$WATCHDOG")
PIDS=$(pgrep -f "$TRACKER")

if [ -z "$PIDS" ]; then
echo "Process not found."
Expand All @@ -27,6 +23,3 @@ reset_env
# start the tracker again
nohup sh -c 'RUST_LOG=INFO ./target/release/tracker --rpc-index 0' > $TRACKER_LOGS_0 2>&1 &
nohup sh -c 'RUST_LOG=INFO ./target/release/tracker --rpc-index 1' > $TRACKER_LOGS_1 2>&1 &
# start the watchdog
nohup sh -c 'scripts/watchdog.sh "$1"' _ 0 > $WATCHDOG_LOGS_0 2>&1 &
nohup sh -c 'scripts/watchdog.sh "$1"' _ 1 > $WATCHDOG_LOGS_1 2>&1 &
3 changes: 3 additions & 0 deletions scripts/process.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

sh -c 'RUST_LOG=INFO ./target/release/processor' >> logs/processor.out 2>&1
Empty file modified scripts/reset_env.sh
100644 → 100755
Empty file.
Loading
Loading