diff --git a/README.md b/README.md index 8e5e404..5b0603a 100644 --- a/README.md +++ b/README.md @@ -129,14 +129,14 @@ This will parse the my_program.json IDL file and generate the corresponding deco ### Implementing Processors ```rs -use carbon_core::account::{AccountDecoder, AccountMetadata, DecodedAccount}; +use carbon_core::account::{AccountDecoder, AccountMetadata, AccountProcessorInputType, DecodedAccount}; use crate::MyCustomAccountData; struct MyAccountProcessor; #[async_trait] impl Processor for MyAccountProcessor { - type InputType = (AccountMetadata, DecodedAccount); + type InputType = AccountProcessorInputType; async fn process( &mut self, diff --git a/crates/core/src/account.rs b/crates/core/src/account.rs index b423199..87f9451 100644 --- a/crates/core/src/account.rs +++ b/crates/core/src/account.rs @@ -112,6 +112,11 @@ pub trait AccountDecoder<'a> { ) -> Option>; } +/// The input type for the account processor. +/// +/// - `T`: The account type, as determined by the decoder. +pub type AccountProcessorInputType = (AccountMetadata, DecodedAccount); + /// A processing pipe that decodes and processes Solana account updates. /// /// `AccountPipe` combines an `AccountDecoder` and a `Processor` to manage account @@ -128,8 +133,7 @@ pub trait AccountDecoder<'a> { /// - `processor`: A `Processor` that handles the processing logic for decoded accounts. pub struct AccountPipe { pub decoder: Box AccountDecoder<'a, AccountType = T> + Send + Sync + 'static>, - pub processor: - Box)> + Send + Sync>, + pub processor: Box> + Send + Sync>, } /// A trait for processing account updates in the pipeline asynchronously. diff --git a/crates/core/src/instruction.rs b/crates/core/src/instruction.rs index 0df427b..96d1298 100644 --- a/crates/core/src/instruction.rs +++ b/crates/core/src/instruction.rs @@ -80,6 +80,15 @@ pub trait InstructionDecoder<'a> { ) -> Option>; } +/// The input type for the instruction processor. +/// +/// - `T`: The instruction type +pub type InstructionProcessorInputType = ( + InstructionMetadata, + DecodedInstruction, + Vec, +); + /// A processing pipeline for instructions, using a decoder and processor. /// /// The `InstructionPipe` structure enables the processing of decoded instructions, @@ -96,17 +105,8 @@ pub trait InstructionDecoder<'a> { pub struct InstructionPipe { pub decoder: Box InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static>, - pub processor: Box< - dyn Processor< - InputType = ( - InstructionMetadata, - DecodedInstruction, - Vec, - ), - > + Send - + Sync - + 'static, - >, + pub processor: + Box> + Send + Sync + 'static>, } /// An async trait for processing instructions within nested contexts. diff --git a/crates/core/src/pipeline.rs b/crates/core/src/pipeline.rs index c696a0c..aa663b8 100644 --- a/crates/core/src/pipeline.rs +++ b/crates/core/src/pipeline.rs @@ -38,19 +38,21 @@ //! - Proper metric collection and flushing are essential for monitoring pipeline performance, especially in production environments. use crate::{ - account::{AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, DecodedAccount}, + account::{ + AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType, + }, account_deletion::{AccountDeletionPipe, AccountDeletionPipes}, collection::InstructionDecoderCollection, datasource::{AccountDeletion, Datasource, Update, UpdateType}, error::{CarbonResult, Error}, instruction::{ - DecodedInstruction, InstructionDecoder, InstructionMetadata, InstructionPipe, - InstructionPipes, NestedInstruction, + InstructionDecoder, InstructionMetadata, InstructionPipe, InstructionPipes, + InstructionProcessorInputType, }, metrics::{Metrics, MetricsCollection}, processor::Processor, schema::TransactionSchema, - transaction::{TransactionMetadata, TransactionPipe, TransactionPipes}, + transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType}, transformers, }; use core::time; @@ -687,10 +689,7 @@ impl PipelineBuilder { pub fn account( mut self, decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static, - processor: impl Processor)> - + Send - + Sync - + 'static, + processor: impl Processor> + Send + Sync + 'static, ) -> Self { log::trace!( "account(self, decoder: {:?}, processor: {:?})", @@ -755,15 +754,7 @@ impl PipelineBuilder { pub fn instruction( mut self, decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static, - processor: impl Processor< - InputType = ( - InstructionMetadata, - DecodedInstruction, - Vec, - ), - > + Send - + Sync - + 'static, + processor: impl Processor> + Send + Sync + 'static, ) -> Self { log::trace!( "instruction(self, decoder: {:?}, processor: {:?})", @@ -797,13 +788,8 @@ impl PipelineBuilder { pub fn transaction( mut self, schema: Option>, - processor: impl Processor< - InputType = ( - TransactionMetadata, - Vec<(InstructionMetadata, DecodedInstruction)>, - Option, - ), - > + Send + processor: impl Processor> + + Send + Sync + 'static, ) -> Self diff --git a/crates/core/src/transaction.rs b/crates/core/src/transaction.rs index 5cf296f..2c62118 100644 --- a/crates/core/src/transaction.rs +++ b/crates/core/src/transaction.rs @@ -47,6 +47,16 @@ pub struct TransactionMetadata { pub fee_payer: Pubkey, } +/// The input type for the transaction processor. +/// +/// - `T`: The instruction type, implementing `InstructionDecoderCollection`. +/// - `U`: The output type for the matched data, if schema-matching, implementing `DeserializeOwned`. +pub type TransactionProcessorInputType = ( + TransactionMetadata, + Vec<(InstructionMetadata, DecodedInstruction)>, + Option, +); + /// A pipe for processing transactions based on a defined schema and processor. /// /// The `TransactionPipe` parses a transaction's instructions, optionally checks them against the schema, @@ -59,16 +69,7 @@ pub struct TransactionMetadata { /// - `U`: The output type for the matched data, if schema-matching, implementing `DeserializeOwned`. pub struct TransactionPipe { schema: Option>, - processor: Box< - dyn Processor< - InputType = ( - TransactionMetadata, - Vec<(InstructionMetadata, DecodedInstruction)>, - Option, - ), - > + Send - + Sync, - >, + processor: Box> + Send + Sync>, } /// Represents a parsed transaction, including its metadata and parsed instructions. @@ -91,13 +92,8 @@ impl TransactionPipe { pub fn new( schema: Option>, - processor: impl Processor< - InputType = ( - TransactionMetadata, - Vec<(InstructionMetadata, DecodedInstruction)>, - Option, - ), - > + Send + processor: impl Processor> + + Send + Sync + 'static, ) -> Self {