From 5e0a5134551e7fcd7b8f9cd535ab8186afaf8089 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Tue, 24 Sep 2024 13:38:14 -0700 Subject: [PATCH] Add new python feature to make Python support conditional (#748) --- .github/workflows/binaries.yml | 40 +++- .github/workflows/ci.yml | 2 +- Cargo.lock | 1 + .../arroyo-udf/arroyo-udf-python/Cargo.toml | 5 +- .../arroyo-udf/arroyo-udf-python/src/lib.rs | 190 ++---------------- .../arroyo-udf-python/src/threaded.rs | 3 +- .../arroyo-udf/arroyo-udf-python/src/types.rs | 173 ++++++++++++++++ crates/arroyo/Cargo.toml | 2 + docker/Dockerfile | 2 +- 9 files changed, 233 insertions(+), 185 deletions(-) create mode 100644 crates/arroyo-udf/arroyo-udf-python/src/types.rs diff --git a/.github/workflows/binaries.yml b/.github/workflows/binaries.yml index 0772e3ea0..108cbab72 100644 --- a/.github/workflows/binaries.yml +++ b/.github/workflows/binaries.yml @@ -19,8 +19,8 @@ jobs: matrix: # see https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories config: - - { runner: buildjet-32vcpu-ubuntu-2204, protoc: linux-x86_64, artifact: linux-x86_64 } - - { runner: buildjet-32vcpu-ubuntu-2204-arm, protoc: linux-aarch_64, artifact: linux-arm64 } + - { runner: buildjet-32vcpu-ubuntu-2204, protoc: linux-x86_64, pyarch: x86_64, artifact: linux-x86_64 } + - { runner: buildjet-32vcpu-ubuntu-2204-arm, protoc: linux-aarch_64, pyarch: aarch64, artifact: linux-arm64 } runs-on: ${{ matrix.config.runner }} services: postgres: @@ -49,20 +49,34 @@ jobs: - name: Update rust run: | rustup update + - name: Install Python 3.12 + run: | + curl -OL https://github.com/indygreg/python-build-standalone/releases/download/20240814/cpython-3.12.5+20240814-${{ matrix.config.pyarch }}-unknown-linux-gnu-install_only.tar.gz + tar xvfz cpython*.tar.gz + sudo cp -r python/bin/* /usr/local/bin/ + sudo cp -r python/include/* /usr/local/include/ + sudo cp -r python/lib/* /usr/local/lib/ + sudo cp -r python/share/* /usr/local/share/ + sudo ldconfig + - name: Run DB migrations run: | cargo install --debug refinery_cli --version $REFINERY_VERSION refinery migrate -e REFINERY_CONFIG -p crates/arroyo-api/migrations - name: Run frontend build run: cd webui && pnpm install && pnpm build - - name: Build Arroyo - run: cargo build --release --package arroyo && strip target/release/arroyo + - name: Create output directory + run: mkdir artifacts + - name: Build Arroyo with Python + run: cargo build --features python --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo-python + - name: Build Arroyo without Python + run: cargo build --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo - uses: actions/upload-artifact@v4 with: name: arroyo-${{ matrix.config.artifact }} - path: target/release/arroyo + path: artifacts/* if-no-files-found: error - + macos: strategy: fail-fast: true @@ -70,7 +84,7 @@ jobs: # see https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories config: - { runner: macos-13, protoc: osx-x86_64, artifact: macos-x86_64 } - - { runner: macos-14, protoc: osx-aarch_64, artifact: macos-m1 } + - { runner: macos-14-xlarge, protoc: osx-aarch_64, artifact: macos-m1 } runs-on: ${{ matrix.config.runner }} steps: - name: Check out @@ -79,6 +93,8 @@ jobs: uses: pnpm/action-setup@v4 with: version: 9.7.1 + - name: Install Python 3.12 via homebrew + run: brew install python@3.12 - name: Install protoc compiler run: | wget https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOC_VERSION/protoc-$PROTOC_VERSION-${{ matrix.config.protoc }}.zip @@ -94,10 +110,14 @@ jobs: refinery migrate -e REFINERY_CONFIG -p crates/arroyo-api/migrations - name: Run frontend build run: cd webui && pnpm install && pnpm build - - name: Build Arroyo - run: cargo build --release --package arroyo && strip target/release/arroyo + - name: Create output directory + run: mkdir artifacts + - name: Build Arroyo with Python + run: PYO3_PYTHON=/opt/homebrew/opt/python@3.12/Frameworks/Python.framework/Versions/3.12/Python cargo build --features python --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo-python + - name: Build Arroyo without Python + run: cargo build --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo - uses: actions/upload-artifact@v4 with: name: arroyo-${{ matrix.config.artifact }} - path: target/release/arroyo + path: artifacts/* if-no-files-found: error diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c69ccf4ad..1da4bc8a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,7 +94,7 @@ jobs: - name: Build run: cargo build --all-features - name: Run Clippy - run: cargo clippy --all-targets --workspace -- -D warnings + run: cargo clippy --all-features --all-targets --workspace -- -D warnings - name: Test run: cargo nextest run -E 'kind(lib)' --all-features - name: Integ postgres diff --git a/Cargo.lock b/Cargo.lock index afc6b4291..dbbba1ae9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -430,6 +430,7 @@ dependencies = [ "arroyo-server-common", "arroyo-storage", "arroyo-types", + "arroyo-udf-python", "arroyo-worker", "async-trait", "clap", diff --git a/crates/arroyo-udf/arroyo-udf-python/Cargo.toml b/crates/arroyo-udf/arroyo-udf-python/Cargo.toml index 3acf4282c..460ff8648 100644 --- a/crates/arroyo-udf/arroyo-udf-python/Cargo.toml +++ b/crates/arroyo-udf/arroyo-udf-python/Cargo.toml @@ -3,11 +3,14 @@ name = "arroyo-udf-python" version = "0.2.0" edition = "2021" +[features] +python-enabled = ["pyo3"] + [dependencies] arroyo-udf-common = { path = "../arroyo-udf-common" } arrow = { workspace = true, features = ["ffi"] } datafusion = { workspace = true } -pyo3 = { version = "0.21"} +pyo3 = { version = "0.21", optional = true} anyhow = "1" tokio = { version = "1", features = ["full"] } itertools = "0.13.0" \ No newline at end of file diff --git a/crates/arroyo-udf/arroyo-udf-python/src/lib.rs b/crates/arroyo-udf/arroyo-udf-python/src/lib.rs index 3f4f3a316..3dbc901d7 100644 --- a/crates/arroyo-udf/arroyo-udf-python/src/lib.rs +++ b/crates/arroyo-udf/arroyo-udf-python/src/lib.rs @@ -1,30 +1,36 @@ +#[cfg(feature = "python-enabled")] mod interpreter; +#[cfg(feature = "python-enabled")] mod pyarrow; +#[cfg(feature = "python-enabled")] mod threaded; +#[cfg(feature = "python-enabled")] +mod types; -use crate::threaded::ThreadedUdfInterpreter; -use anyhow::{anyhow, bail}; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::DataType; use arroyo_udf_common::parse::NullableType; use datafusion::common::Result as DFResult; use datafusion::error::DataFusionError; use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature}; -use pyo3::prelude::*; -use pyo3::types::{PyDict, PyString, PyTuple}; -use pyo3::{Bound, PyAny}; use std::any::Any; use std::fmt::Debug; use std::sync::mpsc::{Receiver, SyncSender}; use std::sync::{Arc, Mutex}; +#[cfg(not(feature = "python-enabled"))] +const NOT_ENABLED_ERROR: &str = + "Python is not enabled in this build of Arroyo. See https://doc.arroyo.dev/udfs/python/udfs \ + for more information on how to obtain a Python-enabled build."; + +#[cfg(feature = "python-enabled")] const UDF_PY_LIB: &str = include_str!("../python/arroyo_udf.py"); #[derive(Debug)] pub struct PythonUDF { pub name: Arc, - pub task_tx: SyncSender>, - pub result_rx: Arc>>>, + pub(crate) task_tx: SyncSender>, + pub(crate) result_rx: Arc>>>, pub definition: Arc, pub signature: Arc, pub arg_types: Arc>, @@ -88,175 +94,17 @@ impl ScalarUDFImpl for PythonUDF { } } -fn extract_type_info(udf: &Bound) -> anyhow::Result<(Vec, NullableType)> { - let attr = udf.getattr("__annotations__")?; - let annotations: &Bound = attr.downcast().map_err(|e| { - anyhow!( - "__annotations__ object is not a dictionary: {}", - e.to_string() - ) - })?; - - // Iterate over annotations dictionary - let (ok, err): (Vec<_>, Vec<_>) = annotations - .iter() - .map(|(k, v)| { - python_type_to_arrow( - k.downcast::().unwrap().to_str().unwrap(), - &v, - false, - ) - }) - .partition(|e| e.is_ok()); - - if !err.is_empty() { - bail!( - "Could not register Python UDF: {}", - err.into_iter() - .map(|t| t.unwrap_err().to_string()) - .collect::>() - .join(", ") - ); - } - - let mut result: Vec<_> = ok.into_iter().map(|t| t.unwrap()).collect(); - - let ret = result - .pop() - .ok_or_else(|| anyhow!("No return type defined for function"))?; - - Ok((result, ret)) -} - impl PythonUDF { + #[allow(unused)] pub async fn parse(body: impl Into) -> anyhow::Result { - ThreadedUdfInterpreter::new(Arc::new(body.into())).await - } -} - -fn python_type_to_arrow( - var_name: &str, - py_type: &Bound, - nullable: bool, -) -> anyhow::Result { - let name = py_type - .getattr("__name__") - .map_err(|e| anyhow!("Could not get name of type for argument {var_name}: {e}"))? - .downcast::() - .map_err(|_| anyhow!("Argument type was not a string"))? - .to_string(); - - if name == "Optional" { - return python_type_to_arrow( - var_name, - &py_type - .getattr("__args__") - .map_err(|_| anyhow!("Optional type does not have arguments"))? - .downcast::() - .map_err(|e| anyhow!("__args__ is not a tuple: {e}"))? - .get_item(0)?, - true, - ); - } - - let data_type = match name.as_str() { - "int" => DataType::Int64, - "float" => DataType::Float64, - "str" => DataType::Utf8, - "bool" => DataType::Boolean, - "list" => bail!("lists are not yet supported"), - other => bail!("Unsupported Python type: {}", other), - }; - - Ok(NullableType::new(data_type, nullable)) -} - -#[cfg(test)] -mod test { - use crate::PythonUDF; - use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, TypeSignature}; - use std::sync::Arc; - - #[tokio::test] - async fn test() { - let udf = r#" -from arroyo_udf import udf - -@udf -def my_add(x: int, y: float) -> float: - return float(x) + y -"#; - - let udf = PythonUDF::parse(udf).await.unwrap(); - assert_eq!(udf.name.as_str(), "my_add"); - if let datafusion::logical_expr::TypeSignature::OneOf(args) = &udf.signature.type_signature + #[cfg(feature = "python-enabled")] { - let ts: Vec<_> = args - .iter() - .map(|e| { - if let TypeSignature::Exact(v) = e { - v - } else { - panic!( - "expected inner typesignature sto be exact, but found {:?}", - e - ) - } - }) - .collect(); - - use arrow::datatypes::DataType::*; - - assert_eq!( - ts, - vec![ - &vec![Int8, Float32], - &vec![Int8, Float64], - &vec![Int16, Float32], - &vec![Int16, Float64], - &vec![Int32, Float32], - &vec![Int32, Float64], - &vec![Int64, Float32], - &vec![Int64, Float64], - &vec![UInt8, Float32], - &vec![UInt8, Float64], - &vec![UInt16, Float32], - &vec![UInt16, Float64], - &vec![UInt32, Float32], - &vec![UInt32, Float64], - &vec![UInt64, Float32], - &vec![UInt64, Float64] - ] - ); - } else { - panic!("Expected oneof type signature"); + crate::threaded::ThreadedUdfInterpreter::new(Arc::new(body.into())).await } - assert_eq!( - udf.return_type.data_type, - arrow::datatypes::DataType::Float64 - ); - assert!(!udf.return_type.nullable); - - let data = vec![ - ColumnarValue::Array(Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3]))), - ColumnarValue::Array(Arc::new(arrow::array::Float64Array::from(vec![ - 1.0, 2.0, 3.0, - ]))), - ]; - - let result = udf.invoke(&data).unwrap(); - if let ColumnarValue::Array(a) = result { - let a = a - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(a.len(), 3); - assert_eq!(a.value(0), 2.0); - assert_eq!(a.value(1), 4.0); - assert_eq!(a.value(2), 6.0); - } else { - panic!("Expected array result"); + #[cfg(not(feature = "python-enabled"))] + { + anyhow::bail!(NOT_ENABLED_ERROR) } } } diff --git a/crates/arroyo-udf/arroyo-udf-python/src/threaded.rs b/crates/arroyo-udf/arroyo-udf-python/src/threaded.rs index a47fdfd06..23765be59 100644 --- a/crates/arroyo-udf/arroyo-udf-python/src/threaded.rs +++ b/crates/arroyo-udf/arroyo-udf-python/src/threaded.rs @@ -1,6 +1,7 @@ use crate::interpreter::SubInterpreter; use crate::pyarrow::Converter; -use crate::{extract_type_info, PythonUDF, UDF_PY_LIB}; +use crate::types::extract_type_info; +use crate::{PythonUDF, UDF_PY_LIB}; use anyhow::anyhow; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::DataType; diff --git a/crates/arroyo-udf/arroyo-udf-python/src/types.rs b/crates/arroyo-udf/arroyo-udf-python/src/types.rs new file mode 100644 index 000000000..609aadcbd --- /dev/null +++ b/crates/arroyo-udf/arroyo-udf-python/src/types.rs @@ -0,0 +1,173 @@ +use anyhow::{anyhow, bail}; +use arrow::datatypes::DataType; +use arroyo_udf_common::parse::NullableType; +use pyo3::prelude::{PyAnyMethods, PyDictMethods, PyStringMethods, PyTupleMethods}; +use pyo3::types::{PyDict, PyString, PyTuple}; +use pyo3::{Bound, PyAny}; + +pub fn extract_type_info(udf: &Bound) -> anyhow::Result<(Vec, NullableType)> { + let attr = udf.getattr("__annotations__")?; + let annotations: &Bound = attr.downcast().map_err(|e| { + anyhow!( + "__annotations__ object is not a dictionary: {}", + e.to_string() + ) + })?; + + // Iterate over annotations dictionary + let (ok, err): (Vec<_>, Vec<_>) = annotations + .iter() + .map(|(k, v)| { + python_type_to_arrow( + k.downcast::().unwrap().to_str().unwrap(), + &v, + false, + ) + }) + .partition(|e| e.is_ok()); + + if !err.is_empty() { + bail!( + "Could not register Python UDF: {}", + err.into_iter() + .map(|t| t.unwrap_err().to_string()) + .collect::>() + .join(", ") + ); + } + + let mut result: Vec<_> = ok.into_iter().map(|t| t.unwrap()).collect(); + + let ret = result + .pop() + .ok_or_else(|| anyhow!("No return type defined for function"))?; + + Ok((result, ret)) +} + +fn python_type_to_arrow( + var_name: &str, + py_type: &Bound, + nullable: bool, +) -> anyhow::Result { + let name = py_type + .getattr("__name__") + .map_err(|e| anyhow!("Could not get name of type for argument {var_name}: {e}"))? + .downcast::() + .map_err(|_| anyhow!("Argument type was not a string"))? + .to_string(); + + if name == "Optional" { + return python_type_to_arrow( + var_name, + &py_type + .getattr("__args__") + .map_err(|_| anyhow!("Optional type does not have arguments"))? + .downcast::() + .map_err(|e| anyhow!("__args__ is not a tuple: {e}"))? + .get_item(0)?, + true, + ); + } + + let data_type = match name.as_str() { + "int" => DataType::Int64, + "float" => DataType::Float64, + "str" => DataType::Utf8, + "bool" => DataType::Boolean, + "list" => bail!("lists are not yet supported"), + other => bail!("Unsupported Python type: {}", other), + }; + + Ok(NullableType::new(data_type, nullable)) +} + +#[cfg(test)] +mod test { + use crate::PythonUDF; + use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, TypeSignature}; + use std::sync::Arc; + + #[tokio::test] + async fn test() { + let udf = r#" +from arroyo_udf import udf + +@udf +def my_add(x: int, y: float) -> float: + return float(x) + y +"#; + + let udf = PythonUDF::parse(udf).await.unwrap(); + assert_eq!(udf.name.as_str(), "my_add"); + if let datafusion::logical_expr::TypeSignature::OneOf(args) = &udf.signature.type_signature + { + let ts: Vec<_> = args + .iter() + .map(|e| { + if let TypeSignature::Exact(v) = e { + v + } else { + panic!( + "expected inner typesignature sto be exact, but found {:?}", + e + ) + } + }) + .collect(); + + use arrow::datatypes::DataType::*; + + assert_eq!( + ts, + vec![ + &vec![Int8, Float32], + &vec![Int8, Float64], + &vec![Int16, Float32], + &vec![Int16, Float64], + &vec![Int32, Float32], + &vec![Int32, Float64], + &vec![Int64, Float32], + &vec![Int64, Float64], + &vec![UInt8, Float32], + &vec![UInt8, Float64], + &vec![UInt16, Float32], + &vec![UInt16, Float64], + &vec![UInt32, Float32], + &vec![UInt32, Float64], + &vec![UInt64, Float32], + &vec![UInt64, Float64] + ] + ); + } else { + panic!("Expected oneof type signature"); + } + + assert_eq!( + udf.return_type.data_type, + arrow::datatypes::DataType::Float64 + ); + assert!(!udf.return_type.nullable); + + let data = vec![ + ColumnarValue::Array(Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3]))), + ColumnarValue::Array(Arc::new(arrow::array::Float64Array::from(vec![ + 1.0, 2.0, 3.0, + ]))), + ]; + + let result = udf.invoke(&data).unwrap(); + if let ColumnarValue::Array(a) = result { + let a = a + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(a.len(), 3); + assert_eq!(a.value(0), 2.0); + assert_eq!(a.value(1), 4.0); + assert_eq!(a.value(2), 6.0); + } else { + panic!("Expected array result"); + } + } +} diff --git a/crates/arroyo/Cargo.toml b/crates/arroyo/Cargo.toml index 8513a0d82..08b77b4c7 100644 --- a/crates/arroyo/Cargo.toml +++ b/crates/arroyo/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [features] profiling = ["tikv-jemallocator/profiling"] +python = ["arroyo-udf-python/python-enabled"] [dependencies] arroyo-types = { path ="../arroyo-types" } @@ -18,6 +19,7 @@ arroyo-node = { path = "../arroyo-node" } arroyo-rpc = { path = "../arroyo-rpc" } arroyo-openapi = { path ="../arroyo-openapi" } arroyo-storage = { path = "../arroyo-storage" } +arroyo-udf-python = { path = "../arroyo-udf/arroyo-udf-python" } clap = { version = "4", features = ["derive"] } tokio = { version = "1", features = ["full"] } diff --git a/docker/Dockerfile b/docker/Dockerfile index 1f9e41a5e..58f4594ac 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -46,7 +46,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry,id=${TARGETPLATFORM}-${P sudo -u postgres psql -c "CREATE USER arroyo WITH PASSWORD 'arroyo' SUPERUSER;" && \ sudo -u postgres createdb arroyo && \ refinery migrate -c refinery.toml -p crates/arroyo-api/migrations && \ - CARGO_NET_GIT_FETCH_WITH_CLI=true CARGO_PROFILE_RELEASE_DEBUG=false cargo build --profile ${PROFILE} --bin arroyo && \ + CARGO_NET_GIT_FETCH_WITH_CLI=true CARGO_PROFILE_RELEASE_DEBUG=false cargo build --features python --profile ${PROFILE} --bin arroyo && \ mv target/*/arroyo /arroyo