Skip to content

Commit

Permalink
rows
Browse files Browse the repository at this point in the history
  • Loading branch information
muzarski committed Dec 4, 2024
1 parent 10a7c42 commit 96fa56e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 63 deletions.
2 changes: 1 addition & 1 deletion scylla-rust-wrapper/src/query_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum CassErrorResult {
Query(#[from] QueryError),
#[error(transparent)]
ResultMetadataLazyDeserialization(#[from] ResultMetadataAndRowsCountParseError),
#[error("Failed to deserialize rows: {0}")]
#[error("Failed to deserialize first row: {0}")]
Deserialization(#[from] DeserializationError),
}

Expand Down
142 changes: 80 additions & 62 deletions scylla-rust-wrapper/src/query_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::query_error::CassErrorResult;
use crate::query_result::Value::{CollectionValue, RegularValue};
use crate::types::*;
use crate::uuid::CassUuid;
use scylla::frame::response::result::{ColumnSpec, CqlValue, Row};
use scylla::deserialize::result::TypedRowIterator;
use scylla::frame::response::result::{ColumnSpec, CqlValue, DeserializedMetadataAndRawRows, Row};
use scylla::transport::query_result::{ColumnSpecs, IntoRowsResultError};
use scylla::transport::PagingStateResponse;
use scylla::QueryResult;
Expand All @@ -27,7 +28,8 @@ pub enum CassResultKind {
}

pub struct CassRowsResult {
pub rows: Vec<CassRow>,
pub raw_rows: DeserializedMetadataAndRawRows,
pub first_row: Option<CassRow>,
pub metadata: Arc<CassResultMetadata>,
}

Expand Down Expand Up @@ -58,21 +60,20 @@ impl CassResult {
))
});

// For now, let's eagerly deserialize rows into type-erased CqlValues.
// Lazy deserialization requires a non-trivial refactor that needs to be discussed.
let rows: Vec<Row> = rows_result
.rows::<Row>()
// SAFETY: this unwrap is safe, because `Row` always
// passes the typecheck, no matter the type of the columns.
let (raw_rows, tracing_id, _) = rows_result.into_inner();
let first_row = raw_rows
.rows_iter::<Row>()
.unwrap()
.collect::<Result<_, _>>()?;
let cass_rows = create_cass_rows_from_rows(rows, &metadata);
.next()
.transpose()?
.map(|row| CassRow::from_row_and_metadata(row, &metadata));

let cass_result = CassResult {
tracing_id: rows_result.tracing_id(),
tracing_id,
paging_state_response,
kind: CassResultKind::Rows(CassRowsResult {
rows: cass_rows,
raw_rows,
first_row,
metadata,
}),
};
Expand Down Expand Up @@ -155,16 +156,13 @@ impl FFI for CassRow {
type Ownership = OwnershipBorrowed;
}

pub fn create_cass_rows_from_rows(
rows: Vec<Row>,
metadata: &Arc<CassResultMetadata>,
) -> Vec<CassRow> {
rows.into_iter()
.map(|r| CassRow {
columns: create_cass_row_columns(r, metadata),
result_metadata: metadata.clone(),
})
.collect()
impl CassRow {
fn from_row_and_metadata(row: Row, metadata: &Arc<CassResultMetadata>) -> Self {
Self {
columns: create_cass_row_columns(row, metadata),
result_metadata: Arc::clone(metadata),
}
}
}

pub enum Value {
Expand Down Expand Up @@ -307,9 +305,15 @@ fn get_column_value(column: CqlValue, column_type: &Arc<CassDataType>) -> Value
}
}

pub struct CassResultIterator {
result: Arc<CassResult>,
position: Option<usize>,
pub struct CassRowsResultIterator {
iterator: TypedRowIterator<'static, 'static, Row>,
result_metadata: Arc<CassResultMetadata>,
current_row: Option<CassRow>,
}

pub enum CassResultIterator {
NonRows,
Rows(CassRowsResultIterator),
}

pub struct CassRowIterator {
Expand Down Expand Up @@ -391,16 +395,21 @@ pub unsafe extern "C" fn cass_iterator_next(

match &mut iter {
CassIterator::CassResultIterator(result_iterator) => {
let new_pos: usize = result_iterator.position.map_or(0, |prev_pos| prev_pos + 1);
let CassResultIterator::Rows(rows_result_iterator) = result_iterator else {
return false as cass_bool_t;
};

result_iterator.position = Some(new_pos);
let new_row = rows_result_iterator
.iterator
.next()
.and_then(Result::ok)
.map(|row| {
CassRow::from_row_and_metadata(row, &rows_result_iterator.result_metadata)
});

match &result_iterator.result.kind {
CassResultKind::Rows(rows_result) => {
(new_pos < rows_result.rows.len()) as cass_bool_t
}
CassResultKind::NonRows => false as cass_bool_t,
}
rows_result_iterator.current_row = new_row;

rows_result_iterator.current_row.is_some() as cass_bool_t
}
CassIterator::CassRowIterator(row_iterator) => {
let new_pos: usize = row_iterator.position.map_or(0, |prev_pos| prev_pos + 1);
Expand Down Expand Up @@ -493,21 +502,15 @@ pub unsafe extern "C" fn cass_iterator_get_row(

// Defined only for result iterator, for other types should return null
if let CassIterator::CassResultIterator(result_iterator) = iter {
let iter_position = match result_iterator.position {
Some(pos) => pos,
None => return RefFFI::null(),
};

let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result_iterator.result.kind else {
let CassResultIterator::Rows(rows_result_iterator) = result_iterator else {
return RefFFI::null();
};

let row: &CassRow = match rows.get(iter_position) {
Some(row) => row,
None => return RefFFI::null(),
};

return RefFFI::as_ptr(row);
return rows_result_iterator
.current_row
.as_ref()
.map(RefFFI::as_ptr)
.unwrap_or(RefFFI::null());
}

RefFFI::null()
Expand Down Expand Up @@ -862,11 +865,17 @@ pub unsafe extern "C" fn cass_iterator_get_materialized_view_meta(
pub unsafe extern "C" fn cass_iterator_from_result(
result: CassSharedPtr<CassResult>,
) -> CassExclusiveMutPtr<CassIterator> {
let result_from_raw = ArcFFI::cloned_from_ptr(result).unwrap();

let iterator = CassResultIterator {
result: result_from_raw,
position: None,
let result_from_raw = ArcFFI::as_ref(result).unwrap();

let iterator = match &result_from_raw.kind {
CassResultKind::NonRows => CassResultIterator::NonRows,
CassResultKind::Rows(cass_rows_result) => {
CassResultIterator::Rows(CassRowsResultIterator {
iterator: cass_rows_result.raw_rows.rows_iter().unwrap(),
result_metadata: Arc::clone(&cass_rows_result.metadata),
current_row: None,
})
}
};

BoxFFI::into_ptr(Box::new(CassIterator::CassResultIterator(iterator)))
Expand Down Expand Up @@ -1593,11 +1602,11 @@ pub unsafe extern "C" fn cass_value_secondary_sub_type(
pub unsafe extern "C" fn cass_result_row_count(result_raw: CassSharedPtr<CassResult>) -> size_t {
let result = ArcFFI::as_ref(&result_raw).unwrap();

let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result.kind else {
let CassResultKind::Rows(CassRowsResult { raw_rows, .. }) = &result.kind else {
return 0;
};

rows.len() as size_t
raw_rows.rows_count() as size_t
}

#[no_mangle]
Expand All @@ -1617,11 +1626,14 @@ pub unsafe extern "C" fn cass_result_first_row(
) -> CassBorrowedPtr<CassRow> {
let result = ArcFFI::as_ref(&result_raw).unwrap();

let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result.kind else {
let CassResultKind::Rows(CassRowsResult { first_row, .. }) = &result.kind else {
return RefFFI::null();
};

rows.first().map(RefFFI::as_ptr).unwrap_or(RefFFI::null())
first_row
.as_ref()
.map(RefFFI::as_ptr)
.unwrap_or(RefFFI::null())
}

#[no_mangle]
Expand Down Expand Up @@ -1661,7 +1673,9 @@ mod tests {
use std::{ffi::c_char, ptr::addr_of_mut, sync::Arc};

use scylla::{
frame::response::result::{ColumnSpec, ColumnType, CqlValue, Row, TableSpec},
frame::response::result::{
ColumnSpec, ColumnType, CqlValue, DeserializedMetadataAndRawRows, Row, TableSpec,
},
transport::PagingStateResponse,
};

Expand All @@ -1676,8 +1690,8 @@ mod tests {
};

use super::{
cass_result_column_count, cass_result_column_type, create_cass_rows_from_rows, CassResult,
CassResultKind, CassResultMetadata, CassRowsResult, CassSharedPtr,
cass_result_column_count, cass_result_column_type, CassResult, CassResultKind,
CassResultMetadata, CassRow, CassRowsResult, CassSharedPtr,
};

fn col_spec(name: &'static str, typ: ColumnType<'static>) -> ColumnSpec<'static> {
Expand All @@ -1697,8 +1711,8 @@ mod tests {
),
]));

let rows = create_cass_rows_from_rows(
vec![Row {
let first_row = Some(CassRow::from_row_and_metadata(
Row {
columns: vec![
Some(CqlValue::BigInt(42)),
None,
Expand All @@ -1708,14 +1722,18 @@ mod tests {
CqlValue::Float(9999.9999),
])),
],
}],
},
&metadata,
);
));

CassResult {
tracing_id: None,
paging_state_response: PagingStateResponse::NoMorePages,
kind: CassResultKind::Rows(CassRowsResult { rows, metadata }),
kind: CassResultKind::Rows(CassRowsResult {
raw_rows: DeserializedMetadataAndRawRows::mock_empty(),
first_row,
metadata,
}),
}
}

Expand Down

0 comments on commit 96fa56e

Please sign in to comment.