Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatch incoming messages #832

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions runtime-sdk/modules/contracts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,5 +793,6 @@ impl<Cfg: Config> module::MigrationHandler for Module<Cfg> {
}

impl<Cfg: Config> module::TransactionHandler for Module<Cfg> {}
impl<Cfg: Config> module::IncomingMessageHandler for Module<Cfg> {}
impl<Cfg: Config> module::BlockHandler for Module<Cfg> {}
impl<Cfg: Config> module::InvariantHandler for Module<Cfg> {}
2 changes: 2 additions & 0 deletions runtime-sdk/modules/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,8 @@ impl<Cfg: Config> module::TransactionHandler for Module<Cfg> {
}
}

impl<Cfg: Config> module::IncomingMessageHandler for Module<Cfg> {}

impl<Cfg: Config> module::BlockHandler for Module<Cfg> {
fn end_block<C: Context>(ctx: &mut C) {
// Update the list of historic block hashes.
Expand Down
20 changes: 20 additions & 0 deletions runtime-sdk/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ pub struct RuntimeBatchContext<'a, R: runtime::Runtime, S: NestedStore> {
max_messages: u32,
/// Emitted messages.
messages: Vec<(roothash::Message, MessageEventHookInvocation)>,
/// Number of processed incoming messages.
in_msgs_processed: usize,

/// Per-context values.
values: BTreeMap<&'static str, Box<dyn Any>>,
Expand Down Expand Up @@ -490,6 +492,7 @@ impl<'a, R: runtime::Runtime, S: NestedStore> RuntimeBatchContext<'a, R, S> {
block_etags: EventTags::new(),
max_messages,
messages: Vec::new(),
in_msgs_processed: 0,
values: BTreeMap::new(),
_runtime: PhantomData,
}
Expand Down Expand Up @@ -521,10 +524,25 @@ impl<'a, R: runtime::Runtime, S: NestedStore> RuntimeBatchContext<'a, R, S> {
block_etags: EventTags::new(),
max_messages: ctx.max_messages,
messages: Vec::new(),
in_msgs_processed: 0,
values: BTreeMap::new(),
_runtime: PhantomData,
}
}

// Load how many roothash messages that this runtime handled in this round. This becomes valid
// after the dispatcher finishes executing incoming blocks, and it doesn't get updated during
// the execution of those incoming messages. The dispatcher calls this in order to report back
// to the node how many messages it got through.
pub fn get_in_msgs_processed(&self) -> usize {
self.in_msgs_processed
}

// Save how many roothash incoming messages that this runtime handled in this round. This is
// for the dispatcher to call, and modules shouldn't need to use this.
pub fn set_in_msgs_processed(&mut self, count: usize) {
self.in_msgs_processed = count;
}
}

impl<'a, R: runtime::Runtime, S: NestedStore> Context for RuntimeBatchContext<'a, R, S> {
Expand Down Expand Up @@ -649,6 +667,7 @@ impl<'a, R: runtime::Runtime, S: NestedStore> Context for RuntimeBatchContext<'a
_ => remaining_messages,
},
messages: Vec::new(),
in_msgs_processed: self.in_msgs_processed,
values: BTreeMap::new(),
_runtime: PhantomData,
};
Expand Down Expand Up @@ -893,6 +912,7 @@ impl<'round, 'store, R: runtime::Runtime, S: Store> Context
_ => remaining_messages,
},
messages: Vec::new(),
in_msgs_processed: 0,
values: BTreeMap::new(),
_runtime: PhantomData,
};
Expand Down
160 changes: 152 additions & 8 deletions runtime-sdk/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use anyhow::anyhow;
use slog::error;
use slog::{error, warn};
use thiserror::Error;

use oasis_core_runtime::{
Expand All @@ -31,14 +31,17 @@ use crate::{
error::{Error as _, RuntimeError},
event::IntoTags,
keymanager::{KeyManagerClient, KeyManagerError},
module::{self, BlockHandler, MethodHandler, TransactionHandler},
module::{self, BlockHandler, IncomingMessageHandler, MethodHandler, TransactionHandler},
modules,
modules::core::API as _,
runtime::Runtime,
schedule_control::ScheduleControlHost,
storage::{self, NestedStore, Prefix},
types,
types::transaction::{AuthProof, Transaction},
types::{
in_msg::IncomingMessageData,
transaction::{AuthProof, Transaction},
},
};

/// Unique module name.
Expand Down Expand Up @@ -169,6 +172,16 @@ impl<R: Runtime> Dispatcher<R> {
}
}

/// Decode a roothash incoming message's data field.
pub fn decode_in_msg(
in_msg: &roothash::IncomingMessage,
) -> Result<types::in_msg::IncomingMessageData, modules::core::Error> {
let data: types::in_msg::IncomingMessageData = cbor::from_slice(&in_msg.data)
.map_err(|e| modules::core::Error::MalformedIncomingMessageData(in_msg.id, e.into()))?;
data.validate_basic()?;
Ok(data)
}

/// Run the dispatch steps inside a transaction context. This includes the before call hooks,
/// the call itself and after call hooks. The after call hooks are called regardless if the call
/// succeeds or not.
Expand Down Expand Up @@ -406,6 +419,52 @@ impl<R: Runtime> Dispatcher<R> {
}
}

/// Execute the given roothash incoming message. This includes executing the embedded
/// transaction if there is one.
pub fn execute_in_msg<C: BatchContext>(
ctx: &mut C,
in_msg: &roothash::IncomingMessage,
data: &IncomingMessageData,
tx: &Option<Transaction>,
) -> Result<(), RuntimeError> {
R::Modules::execute_in_msg(ctx, in_msg, data, tx)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that returning an error here will cause a batch abort.

if let Some(tx) = tx {
let tx_size = match data
.tx
.as_ref()
.unwrap_or_else(|| panic!("incoming message {} has tx but no data.tx", in_msg.id))
.len()
.try_into()
{
Ok(tx_size) => tx_size,
Err(err) => {
warn!(ctx.get_logger("dispatcher"), "incoming message transaction too large"; "id" => in_msg.id, "err" => ?err);
return Ok(());
}
};
// Use the ID as index.
let index = in_msg.id.try_into().unwrap();
// todo: put result tags in block
Self::execute_tx(ctx, tx_size, tx.clone(), index)?;
}
Ok(())
}

/// Prefetch prefixes for the given roothash incoming message. This includes prefetching the
/// prefixes for the embedded transaction if there is one.
pub fn prefetch_in_msg(
prefixes: &mut BTreeSet<Prefix>,
in_msg: &roothash::IncomingMessage,
data: &IncomingMessageData,
tx: &Option<Transaction>,
) -> Result<(), RuntimeError> {
R::Modules::prefetch_in_msg(prefixes, in_msg, data, tx)?;
if let Some(tx) = tx {
Self::prefetch_tx(prefixes, tx.clone())?;
}
Ok(())
}

fn handle_last_round_messages<C: Context>(ctx: &mut C) -> Result<(), modules::core::Error> {
let message_events = ctx.runtime_round_results().messages.clone();

Expand Down Expand Up @@ -521,6 +580,8 @@ impl<R: Runtime> Dispatcher<R> {
// Run end block hooks.
R::Modules::end_block(&mut ctx);

let in_msgs_count = ctx.get_in_msgs_processed();

// Commit the context and retrieve the emitted messages.
let (block_tags, messages) = ctx.commit();
let (messages, handlers) = messages.into_iter().unzip();
Expand All @@ -534,7 +595,7 @@ impl<R: Runtime> Dispatcher<R> {
block_tags: block_tags.into_tags(),
batch_weight_limits: None,
tx_reject_hashes: vec![],
in_msgs_count: 0, // TODO: Support processing incoming messages.
in_msgs_count,
})
}
}
Expand All @@ -544,16 +605,30 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
&self,
rt_ctx: transaction::Context<'_>,
batch: &TxnBatch,
_in_msgs: &[roothash::IncomingMessage],
in_msgs: &[roothash::IncomingMessage],
) -> Result<ExecuteBatchResult, RuntimeError> {
self.execute_batch_common(
rt_ctx,
|ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
// If prefetch limit is set enable prefetch.
let prefetch_enabled = R::PREFETCH_LIMIT > 0;

let mut txs = Vec::with_capacity(batch.len());
let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
let mut in_msgs_parsed = Vec::with_capacity(in_msgs.len());
for in_msg in in_msgs {
let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| {
warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err);
IncomingMessageData::noop()
});
let tx = data.tx.as_ref().and_then(|tx| Self::decode_tx(ctx, tx).map_err(|err| {
warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed"; "id" => in_msg.id, "err" => ?err);
}).ok());
if prefetch_enabled {
Self::prefetch_in_msg(&mut prefixes, in_msg, &data, &tx)?;
}
in_msgs_parsed.push((in_msg, data, tx));
}
let mut txs = Vec::with_capacity(batch.len());
for tx in batch.iter() {
let tx_size = tx.len().try_into().map_err(|_| {
Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
Expand All @@ -576,6 +651,12 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
.prefetch_prefixes(prefixes.into_iter().collect(), R::PREFETCH_LIMIT);
}

// Execute incoming messages.
for (in_msg, data, tx) in in_msgs_parsed {
Self::execute_in_msg(ctx, in_msg, &data, &tx)?;
}
ctx.set_in_msgs_processed(in_msgs.len());

// Execute the batch.
let mut results = Vec::with_capacity(batch.len());
for (index, (tx_size, tx)) in txs.into_iter().enumerate() {
Expand All @@ -591,19 +672,81 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
&self,
rt_ctx: transaction::Context<'_>,
batch: &mut TxnBatch,
_in_msgs: &[roothash::IncomingMessage],
in_msgs: &[roothash::IncomingMessage],
) -> Result<ExecuteBatchResult, RuntimeError> {
let cfg = R::SCHEDULE_CONTROL;
let mut tx_reject_hashes = Vec::new();

let mut result = self.execute_batch_common(
rt_ctx,
|ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
let mut new_batch = Vec::new();

// Execute incoming messages.
let in_msg_txs = Vec::new(); // todo: more efficient way to do this
let in_msgs_gas_limit = R::Core::remaining_in_msgs_gas(ctx);
let mut in_msgs_processed = 0usize;
for in_msg in in_msgs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a certain per-block gas limit reserved for incoming messages and we should only include so many. Additionally we could also include a per-msg gas limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a per-block gas limit

let data = Self::decode_in_msg(in_msg).unwrap_or_else(|err| {
warn!(ctx.get_logger("dispatcher"), "incoming message data malformed"; "id" => in_msg.id, "err" => ?err);
IncomingMessageData::noop()
});
let tx = match data.tx.as_ref() {
Some(tx) => {
if new_batch.len() >= cfg.max_tx_count {
// This next message has a transaction, but we'll exceed the
// maximum transaction count, so leave it for the next round and
// stop.
break;
}
let remaining_gas = R::Core::remaining_in_msgs_gas(ctx);
if remaining_gas < cfg.min_remaining_gas {
// This next message has a transaction, but we won't have
// enough gas to execute it, so leave it for the next
// round and stop.
break;
}
match Self::decode_tx(ctx, tx) {
Ok(tx) => {
if tx.auth_info.fee.gas > in_msgs_gas_limit {
// The transaction is too large to execute under our
// current parameters, so skip over it.
warn!(ctx.get_logger("dispatcher"), "incoming message transaction fee gas exceeds round gas limit";
"id" => in_msg.id,
"tx_gas" => tx.auth_info.fee.gas,
"in_msgs_gas_limit" => in_msgs_gas_limit,
);
// Actually don't skip the message entirely, just don't
// execute the transaction.
None
} else if tx.auth_info.fee.gas > remaining_gas {
// The transaction is too large to execute in this round,
// so leave it for the next round and stop.
break;
} else {
Some(tx)
}
}
Err(err) => {
warn!(ctx.get_logger("dispatcher"), "incoming message transaction malformed";
"id" => in_msg.id,
"err" => ?err,
);
None
}
}
}
None => None,
};
Self::execute_in_msg(ctx, in_msg, &data, &tx)?;
in_msgs_processed += 1;
}
ctx.set_in_msgs_processed(in_msgs_processed);

// Schedule and execute the batch.
//
// The idea is to keep scheduling transactions as long as we have some space
// available in the block as determined by gas use.
let mut new_batch = Vec::new();
let mut results = Vec::with_capacity(batch.len());
let mut requested_batch_len = cfg.initial_batch_size;
'batch: loop {
Expand All @@ -612,6 +755,7 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));

for raw_tx in batch.drain(..) {
// todo: skip copies of incoming message txs
// If we don't have enough gas for processing even the cheapest transaction
// we are done. Same if we reached the runtime-imposed maximum tx count.
let remaining_gas = R::Core::remaining_batch_gas(ctx);
Expand Down
Loading