Skip to content

Commit

Permalink
feat(carbon-core): add type alias for processor input types
Browse files Browse the repository at this point in the history
  • Loading branch information
KellianDev committed Nov 20, 2024
1 parent 2f3b58f commit 383bc7c
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 56 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ This will parse the my_program.json IDL file and generate the corresponding deco
### Implementing Processors

```rs
use carbon_core::account::{AccountDecoder, AccountMetadata, DecodedAccount};
use carbon_core::account::{AccountDecoder, AccountMetadata, AccountProcessorInputType, DecodedAccount};
use crate::MyCustomAccountData;

struct MyAccountProcessor;

#[async_trait]
impl Processor for MyAccountProcessor {
type InputType = (AccountMetadata, DecodedAccount<MyCustomAccountData>);
type InputType = AccountProcessorInputType<MyCustomAccountData>;

async fn process(
&mut self,
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ pub trait AccountDecoder<'a> {
) -> Option<DecodedAccount<Self::AccountType>>;
}

/// The input type for the account processor.
///
/// - `T`: The account type, as determined by the decoder.
pub type AccountProcessorInputType<T> = (AccountMetadata, DecodedAccount<T>);

/// A processing pipe that decodes and processes Solana account updates.
///
/// `AccountPipe` combines an `AccountDecoder` and a `Processor` to manage account
Expand All @@ -128,8 +133,7 @@ pub trait AccountDecoder<'a> {
/// - `processor`: A `Processor` that handles the processing logic for decoded accounts.
pub struct AccountPipe<T: Send> {
pub decoder: Box<dyn for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static>,
pub processor:
Box<dyn Processor<InputType = (AccountMetadata, DecodedAccount<T>)> + Send + Sync>,
pub processor: Box<dyn Processor<InputType = AccountProcessorInputType<T>> + Send + Sync>,
}

/// A trait for processing account updates in the pipeline asynchronously.
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ pub trait InstructionDecoder<'a> {
) -> Option<DecodedInstruction<Self::InstructionType>>;
}

/// The input type for the instruction processor.
///
/// - `T`: The instruction type
pub type InstructionProcessorInputType<T> = (
InstructionMetadata,
DecodedInstruction<T>,
Vec<NestedInstruction>,
);

/// A processing pipeline for instructions, using a decoder and processor.
///
/// The `InstructionPipe` structure enables the processing of decoded instructions,
Expand All @@ -96,17 +105,8 @@ pub trait InstructionDecoder<'a> {
pub struct InstructionPipe<T: Send> {
pub decoder:
Box<dyn for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static>,
pub processor: Box<
dyn Processor<
InputType = (
InstructionMetadata,
DecodedInstruction<T>,
Vec<NestedInstruction>,
),
> + Send
+ Sync
+ 'static,
>,
pub processor:
Box<dyn Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static>,
}

/// An async trait for processing instructions within nested contexts.
Expand Down
34 changes: 10 additions & 24 deletions crates/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@
//! - Proper metric collection and flushing are essential for monitoring pipeline performance, especially in production environments.
use crate::{
account::{AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, DecodedAccount},
account::{
AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
},
account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
collection::InstructionDecoderCollection,
datasource::{AccountDeletion, Datasource, Update, UpdateType},
error::{CarbonResult, Error},
instruction::{
DecodedInstruction, InstructionDecoder, InstructionMetadata, InstructionPipe,
InstructionPipes, NestedInstruction,
InstructionDecoder, InstructionMetadata, InstructionPipe, InstructionPipes,
InstructionProcessorInputType,
},
metrics::{Metrics, MetricsCollection},
processor::Processor,
schema::TransactionSchema,
transaction::{TransactionMetadata, TransactionPipe, TransactionPipes},
transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
transformers,
};
use core::time;
Expand Down Expand Up @@ -687,10 +689,7 @@ impl PipelineBuilder {
pub fn account<T: Send + Sync + 'static>(
mut self,
decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = (AccountMetadata, DecodedAccount<T>)>
+ Send
+ Sync
+ 'static,
processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
) -> Self {
log::trace!(
"account(self, decoder: {:?}, processor: {:?})",
Expand Down Expand Up @@ -755,15 +754,7 @@ impl PipelineBuilder {
pub fn instruction<T: Send + Sync + 'static>(
mut self,
decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
processor: impl Processor<
InputType = (
InstructionMetadata,
DecodedInstruction<T>,
Vec<NestedInstruction>,
),
> + Send
+ Sync
+ 'static,
processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
) -> Self {
log::trace!(
"instruction(self, decoder: {:?}, processor: {:?})",
Expand Down Expand Up @@ -797,13 +788,8 @@ impl PipelineBuilder {
pub fn transaction<T, U>(
mut self,
schema: Option<TransactionSchema<T>>,
processor: impl Processor<
InputType = (
TransactionMetadata,
Vec<(InstructionMetadata, DecodedInstruction<T>)>,
Option<U>,
),
> + Send
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
+ Send
+ Sync
+ 'static,
) -> Self
Expand Down
30 changes: 13 additions & 17 deletions crates/core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ pub struct TransactionMetadata {
pub fee_payer: Pubkey,
}

/// The input type for the transaction processor.
///
/// - `T`: The instruction type, implementing `InstructionDecoderCollection`.
/// - `U`: The output type for the matched data, if schema-matching, implementing `DeserializeOwned`.
pub type TransactionProcessorInputType<T, U> = (
TransactionMetadata,
Vec<(InstructionMetadata, DecodedInstruction<T>)>,
Option<U>,
);

/// A pipe for processing transactions based on a defined schema and processor.
///
/// The `TransactionPipe` parses a transaction's instructions, optionally checks them against the schema,
Expand All @@ -59,16 +69,7 @@ pub struct TransactionMetadata {
/// - `U`: The output type for the matched data, if schema-matching, implementing `DeserializeOwned`.
pub struct TransactionPipe<T: InstructionDecoderCollection, U> {
schema: Option<TransactionSchema<T>>,
processor: Box<
dyn Processor<
InputType = (
TransactionMetadata,
Vec<(InstructionMetadata, DecodedInstruction<T>)>,
Option<U>,
),
> + Send
+ Sync,
>,
processor: Box<dyn Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync>,
}

/// Represents a parsed transaction, including its metadata and parsed instructions.
Expand All @@ -91,13 +92,8 @@ impl<T: InstructionDecoderCollection, U> TransactionPipe<T, U> {
pub fn new(
schema: Option<TransactionSchema<T>>,
processor: impl Processor<
InputType = (
TransactionMetadata,
Vec<(InstructionMetadata, DecodedInstruction<T>)>,
Option<U>,
),
> + Send
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
+ Send
+ Sync
+ 'static,
) -> Self {
Expand Down

0 comments on commit 383bc7c

Please sign in to comment.