From 96fa56e7498980a68319516f6f071cbc440bd4c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Tue, 3 Dec 2024 03:46:40 +0100 Subject: [PATCH] rows --- scylla-rust-wrapper/src/query_error.rs | 2 +- scylla-rust-wrapper/src/query_result.rs | 142 +++++++++++++----------- 2 files changed, 81 insertions(+), 63 deletions(-) diff --git a/scylla-rust-wrapper/src/query_error.rs b/scylla-rust-wrapper/src/query_error.rs index db11315a..4c5b99fc 100644 --- a/scylla-rust-wrapper/src/query_error.rs +++ b/scylla-rust-wrapper/src/query_error.rs @@ -15,7 +15,7 @@ pub enum CassErrorResult { Query(#[from] QueryError), #[error(transparent)] ResultMetadataLazyDeserialization(#[from] ResultMetadataAndRowsCountParseError), - #[error("Failed to deserialize rows: {0}")] + #[error("Failed to deserialize first row: {0}")] Deserialization(#[from] DeserializationError), } diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index 05414dd7..cce1aae9 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -12,7 +12,8 @@ use crate::query_error::CassErrorResult; use crate::query_result::Value::{CollectionValue, RegularValue}; use crate::types::*; use crate::uuid::CassUuid; -use scylla::frame::response::result::{ColumnSpec, CqlValue, Row}; +use scylla::deserialize::result::TypedRowIterator; +use scylla::frame::response::result::{ColumnSpec, CqlValue, DeserializedMetadataAndRawRows, Row}; use scylla::transport::query_result::{ColumnSpecs, IntoRowsResultError}; use scylla::transport::PagingStateResponse; use scylla::QueryResult; @@ -27,7 +28,8 @@ pub enum CassResultKind { } pub struct CassRowsResult { - pub rows: Vec, + pub raw_rows: DeserializedMetadataAndRawRows, + pub first_row: Option, pub metadata: Arc, } @@ -58,21 +60,20 @@ impl CassResult { )) }); - // For now, let's eagerly deserialize rows into type-erased CqlValues. - // Lazy deserialization requires a non-trivial refactor that needs to be discussed. - let rows: Vec = rows_result - .rows::() - // SAFETY: this unwrap is safe, because `Row` always - // passes the typecheck, no matter the type of the columns. + let (raw_rows, tracing_id, _) = rows_result.into_inner(); + let first_row = raw_rows + .rows_iter::() .unwrap() - .collect::>()?; - let cass_rows = create_cass_rows_from_rows(rows, &metadata); + .next() + .transpose()? + .map(|row| CassRow::from_row_and_metadata(row, &metadata)); let cass_result = CassResult { - tracing_id: rows_result.tracing_id(), + tracing_id, paging_state_response, kind: CassResultKind::Rows(CassRowsResult { - rows: cass_rows, + raw_rows, + first_row, metadata, }), }; @@ -155,16 +156,13 @@ impl FFI for CassRow { type Ownership = OwnershipBorrowed; } -pub fn create_cass_rows_from_rows( - rows: Vec, - metadata: &Arc, -) -> Vec { - rows.into_iter() - .map(|r| CassRow { - columns: create_cass_row_columns(r, metadata), - result_metadata: metadata.clone(), - }) - .collect() +impl CassRow { + fn from_row_and_metadata(row: Row, metadata: &Arc) -> Self { + Self { + columns: create_cass_row_columns(row, metadata), + result_metadata: Arc::clone(metadata), + } + } } pub enum Value { @@ -307,9 +305,15 @@ fn get_column_value(column: CqlValue, column_type: &Arc) -> Value } } -pub struct CassResultIterator { - result: Arc, - position: Option, +pub struct CassRowsResultIterator { + iterator: TypedRowIterator<'static, 'static, Row>, + result_metadata: Arc, + current_row: Option, +} + +pub enum CassResultIterator { + NonRows, + Rows(CassRowsResultIterator), } pub struct CassRowIterator { @@ -391,16 +395,21 @@ pub unsafe extern "C" fn cass_iterator_next( match &mut iter { CassIterator::CassResultIterator(result_iterator) => { - let new_pos: usize = result_iterator.position.map_or(0, |prev_pos| prev_pos + 1); + let CassResultIterator::Rows(rows_result_iterator) = result_iterator else { + return false as cass_bool_t; + }; - result_iterator.position = Some(new_pos); + let new_row = rows_result_iterator + .iterator + .next() + .and_then(Result::ok) + .map(|row| { + CassRow::from_row_and_metadata(row, &rows_result_iterator.result_metadata) + }); - match &result_iterator.result.kind { - CassResultKind::Rows(rows_result) => { - (new_pos < rows_result.rows.len()) as cass_bool_t - } - CassResultKind::NonRows => false as cass_bool_t, - } + rows_result_iterator.current_row = new_row; + + rows_result_iterator.current_row.is_some() as cass_bool_t } CassIterator::CassRowIterator(row_iterator) => { let new_pos: usize = row_iterator.position.map_or(0, |prev_pos| prev_pos + 1); @@ -493,21 +502,15 @@ pub unsafe extern "C" fn cass_iterator_get_row( // Defined only for result iterator, for other types should return null if let CassIterator::CassResultIterator(result_iterator) = iter { - let iter_position = match result_iterator.position { - Some(pos) => pos, - None => return RefFFI::null(), - }; - - let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result_iterator.result.kind else { + let CassResultIterator::Rows(rows_result_iterator) = result_iterator else { return RefFFI::null(); }; - let row: &CassRow = match rows.get(iter_position) { - Some(row) => row, - None => return RefFFI::null(), - }; - - return RefFFI::as_ptr(row); + return rows_result_iterator + .current_row + .as_ref() + .map(RefFFI::as_ptr) + .unwrap_or(RefFFI::null()); } RefFFI::null() @@ -862,11 +865,17 @@ pub unsafe extern "C" fn cass_iterator_get_materialized_view_meta( pub unsafe extern "C" fn cass_iterator_from_result( result: CassSharedPtr, ) -> CassExclusiveMutPtr { - let result_from_raw = ArcFFI::cloned_from_ptr(result).unwrap(); - - let iterator = CassResultIterator { - result: result_from_raw, - position: None, + let result_from_raw = ArcFFI::as_ref(result).unwrap(); + + let iterator = match &result_from_raw.kind { + CassResultKind::NonRows => CassResultIterator::NonRows, + CassResultKind::Rows(cass_rows_result) => { + CassResultIterator::Rows(CassRowsResultIterator { + iterator: cass_rows_result.raw_rows.rows_iter().unwrap(), + result_metadata: Arc::clone(&cass_rows_result.metadata), + current_row: None, + }) + } }; BoxFFI::into_ptr(Box::new(CassIterator::CassResultIterator(iterator))) @@ -1593,11 +1602,11 @@ pub unsafe extern "C" fn cass_value_secondary_sub_type( pub unsafe extern "C" fn cass_result_row_count(result_raw: CassSharedPtr) -> size_t { let result = ArcFFI::as_ref(&result_raw).unwrap(); - let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result.kind else { + let CassResultKind::Rows(CassRowsResult { raw_rows, .. }) = &result.kind else { return 0; }; - rows.len() as size_t + raw_rows.rows_count() as size_t } #[no_mangle] @@ -1617,11 +1626,14 @@ pub unsafe extern "C" fn cass_result_first_row( ) -> CassBorrowedPtr { let result = ArcFFI::as_ref(&result_raw).unwrap(); - let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result.kind else { + let CassResultKind::Rows(CassRowsResult { first_row, .. }) = &result.kind else { return RefFFI::null(); }; - rows.first().map(RefFFI::as_ptr).unwrap_or(RefFFI::null()) + first_row + .as_ref() + .map(RefFFI::as_ptr) + .unwrap_or(RefFFI::null()) } #[no_mangle] @@ -1661,7 +1673,9 @@ mod tests { use std::{ffi::c_char, ptr::addr_of_mut, sync::Arc}; use scylla::{ - frame::response::result::{ColumnSpec, ColumnType, CqlValue, Row, TableSpec}, + frame::response::result::{ + ColumnSpec, ColumnType, CqlValue, DeserializedMetadataAndRawRows, Row, TableSpec, + }, transport::PagingStateResponse, }; @@ -1676,8 +1690,8 @@ mod tests { }; use super::{ - cass_result_column_count, cass_result_column_type, create_cass_rows_from_rows, CassResult, - CassResultKind, CassResultMetadata, CassRowsResult, CassSharedPtr, + cass_result_column_count, cass_result_column_type, CassResult, CassResultKind, + CassResultMetadata, CassRow, CassRowsResult, CassSharedPtr, }; fn col_spec(name: &'static str, typ: ColumnType<'static>) -> ColumnSpec<'static> { @@ -1697,8 +1711,8 @@ mod tests { ), ])); - let rows = create_cass_rows_from_rows( - vec![Row { + let first_row = Some(CassRow::from_row_and_metadata( + Row { columns: vec![ Some(CqlValue::BigInt(42)), None, @@ -1708,14 +1722,18 @@ mod tests { CqlValue::Float(9999.9999), ])), ], - }], + }, &metadata, - ); + )); CassResult { tracing_id: None, paging_state_response: PagingStateResponse::NoMorePages, - kind: CassResultKind::Rows(CassRowsResult { rows, metadata }), + kind: CassResultKind::Rows(CassRowsResult { + raw_rows: DeserializedMetadataAndRawRows::mock_empty(), + first_row, + metadata, + }), } }