Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2gb parquet file takes 100s to process, even on second attempt (on main) #13785

Closed
TheBuilderJR opened this issue Dec 14, 2024 · 13 comments
Closed
Labels
bug Something isn't working

Comments

@TheBuilderJR
Copy link

TheBuilderJR commented Dec 14, 2024

Describe the bug

I expected based on the published benchmarks to have improvements, but I haven't seen any. I do see statistics are turned on in my parquet files and in theory optimizations in the last few releases should be kicking in, but they don't seem to be? Is there any guide on how to debug this? Are the main optimizations used in the benchmarks still hidden behind feature flags? If so is there a guide on how to turn on these flags to optimize for performance?

To Reproduce

Create a 2gb file (15m rows) of random data, run SELECT * FROM table ORDER by timestamp two times, see both times take over 100s

Expected behavior

Maybe first time is slow, but I expected second time to at least be faster. Ideally first time also utilizes the file statistics to run faster.

Additional context

This is consistent with past versions, but I upgraded 3 major version bumps in one go and expected some sort of noticeable improvement.

@TheBuilderJR TheBuilderJR added the bug Something isn't working label Dec 14, 2024
@TheBuilderJR TheBuilderJR changed the title 8b file takes 100s to process, even on second attempt (on main) 2gb file takes 100s to process, even on second attempt (on main) Dec 14, 2024
@TheBuilderJR TheBuilderJR changed the title 2gb file takes 100s to process, even on second attempt (on main) 2gb parquet file takes 100s to process, even on second attempt (on main) Dec 14, 2024
@Dandandan
Copy link
Contributor

Dandandan commented Dec 14, 2024

Hi @TheBuilderJR thanks for opening the issue.

Is there a way we could reproduce your results?
Did you compare performance to other engines (e.g. Spark, DuckDB)?

Let me try to address some of it:

SELECT * FROM table ORDER by timestamp two times, see both times take over 100s

This is an expensive query because it has to:

  1. Scan all data / all columns (because it has no column selection, no WHERE or LIMIT)
  2. Order the entire dataset based on timestamp

But I expected second time to at least be faster

DataFusion is a stateless query engine by default, so it won't cache anything, so the second query often doesn't run much faster than the first.

Ideally first time also utilizes the file statistics to run faster.
I don't think statistics could be used for a query like this.

I upgraded 3 major version bumps in one go and expected some sort of noticeable improvement.

Recent versions mostly improves aggregation GROUP BY performance, so you could notice this queries that use GROUP BY. The query in the example depends heavilty on Parquet scan and ORDER BY being fast and probably won't see much of an improvement.

@TheBuilderJR
Copy link
Author

Thanks @Dandandan! Alas the data is proprietary, but I think if you just inserted random data with a timestamp, the performance on a mbp air m3 would probably be similar.

This is an expensive query because it has to: ... Order the entire dataset based on timestamp

Is there no way we can default order by certain fields? I know clickhouse uses this to skip lots of unnecessary processing. Can we do the same?

@alamb
Copy link
Contributor

alamb commented Dec 16, 2024

Is there no way we can default order by certain fields? I know clickhouse uses this to skip lots of unnecessary processing. Can we do the same?

You can use the WITH ORDER clause of the CREATE EXTERNAL TABLE if your data is already ordered

https://datafusion.apache.org/user-guide/sql/ddl.html#create-external-table

For example

CREATE EXTERNAL TABLE test (
    c1  VARCHAR NOT NULL,
    c2  INT NOT NULL,
    c3  SMALLINT NOT NULL,
    c4  SMALLINT NOT NULL,
    c5  INT NOT NULL,
    c6  BIGINT NOT NULL,
    c7  SMALLINT NOT NULL,
    c8  INT NOT NULL,
    c9  BIGINT NOT NULL,
    c10 VARCHAR NOT NULL,
    c11 FLOAT NOT NULL,
    c12 DOUBLE NOT NULL,
    c13 VARCHAR NOT NULL
)
STORED AS CSV
-- this line tells DataFusion the data in the file is already ordered by (c2 ASC)
WITH ORDER (c2 ASC)
LOCATION '/path/to/aggregate_test_100.csv'
OPTIONS ('has_header' 'true');

@TheBuilderJR
Copy link
Author

Thanks @alamb. Is there a way to do this via code? This is currently how I write my parquet files via datafusion:

df.write_parquet(
            file_path.to_str().ok_or(anyhow!("Invalid file path"))?,
            datafusion::dataframe::DataFrameWriteOptions::default().with_single_file_output(true),
            None
        ).await?;

@alamb
Copy link
Contributor

alamb commented Dec 19, 2024

Thanks @alamb. Is there a way to do this via code? This is currently how I write my parquet files via datafusion:

I think you can do it programatically by creating a ListingTable as shown here: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html#example

And set FileSortOrder on the ListingOptions: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html#structfield.file_sort_order

@TheBuilderJR
Copy link
Author

@alamb oh I meant on the write path. I don't see anything in the dataframe.write_parquet API https://docs.rs/datafusion/latest/datafusion/config/struct.TableParquetOptions.html

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

@alamb oh I meant on the write path. I don't see anything in the dataframe.write_parquet API https://docs.rs/datafusion/latest/datafusion/config/struct.TableParquetOptions.html

Perhaps https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.sort_by would work?

@Dandandan
Copy link
Contributor

Dandandan commented Dec 20, 2024

Hmm... it looks like DataFrameWriteOptions is missing an order by / sort by like available in SQL.

@alamb
Copy link
Contributor

alamb commented Dec 20, 2024

Hmm... it looks like DataFrameWriteOptions is missing an order by / sort by like available in SQL.

@TheBuilderJR if you are you willing to file a ticket for that feature I suspect someone would implement it pretty quickly

@zhuqi-lucas
Copy link
Contributor

I create a issue for the improvement, i will create a PR soon, thanks.
#13873

@TheBuilderJR
Copy link
Author

@alamb @zhuqi-lucas

Thank you for the quick turnaround. I've rebased on top of your changes but still seem to see growth in query times as the data size grows for a relatively simple ordered query

SELECT * FROM revenue_logs ORDER BY timestamp_utc ASC LIMIT 15

Here is the code for my read path

                    let config = ListingTableConfig::new_with_multi_paths(
                        paths_str
                            .into_iter()
                            .map(|p| ListingTableUrl::parse(&p))
                            .collect::<Result<Vec<_>, _>>()? // Collect into Result<Vec<ListingTableUrl>, _> and propagate errors
                    )
                        .with_schema(Arc::new(schema.clone()))
                        .infer(&ctx.state()).await?;

                    let config = ListingTableConfig {
                        options: Some(ListingOptions {
                            file_sort_order: vec![vec![col("timestamp_utc").sort(true, true)]],
                            ..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default())))
                        }),
                        ..config
                    };

                    let listing_table = ListingTable::try_new(config)?;
                    ctx.register_table(table_name, Arc::new(listing_table))?;

Here is the code for my write path

        df
            .clone()
            .write_parquet(
                file_path.to_str().ok_or(anyhow!("Invalid file path"))?,
                datafusion::dataframe::DataFrameWriteOptions
                    ::default()
                    .with_single_file_output(true)
                    .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
                None
            ).await?;

Is this expected? I would have imagined the cost should be constant since you can use the sort constraint to always scan a constant number of rows.

@TheBuilderJR
Copy link
Author

Nvm this seems to work. It plateaus at a certain point. Thanks everyone!

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Dec 28, 2024

Thank you @TheBuilderJR for checking the merged PR, and further improvement will including in:

#13891

After that, we don't need to add read option with the ordered info manually, when we select with order by column, if the column is written with order, the sort order info will automatically loading from parquet metadata.

So the optimizer will using the info to optimize.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants