Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): Adds support for read_iceberg with metadata_location to Daft-SQL #3701

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ log/

# helix editor
.helix

# uv
uv.lock
12 changes: 9 additions & 3 deletions daft/io/_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# isort: dont-add-import: from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from daft import context
from daft.api_annotations import PublicAPI
Expand Down Expand Up @@ -53,7 +53,7 @@ def get_first_property_value(*property_names: str) -> Optional[Any]:

@PublicAPI
def read_iceberg(
table: "pyiceberg.table.Table",
table: Union[str, "pyiceberg.table.Table"],
snapshot_id: Optional[int] = None,
io_config: Optional["IOConfig"] = None,
) -> DataFrame:
Expand All @@ -75,15 +75,21 @@ def read_iceberg(
official project for Python.

Args:
table (pyiceberg.table.Table): `PyIceberg Table <https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table>`__ created using the PyIceberg library
table (str or pyiceberg.table.Table): `PyIceberg Table <https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table>`__ created using the PyIceberg library
snapshot_id (int, optional): Snapshot ID of the table to query
io_config (IOConfig, optional): A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in `table` are ignored.

Returns:
DataFrame: a DataFrame with the schema converted from the specified Iceberg table
"""
import pyiceberg

from daft.iceberg.iceberg_scan import IcebergScanOperator

# support for read_iceberg('path/to/metadata.json')
if isinstance(table, str):
table = pyiceberg.table.StaticTable.from_metadata(metadata_location=table)

io_config = (
_convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config
)
Expand Down
43 changes: 43 additions & 0 deletions src/daft-scan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,46 @@ pub fn delta_scan<T: IntoGlobPath>(
) -> DaftResult<LogicalPlanBuilder> {
panic!("Delta Lake scan requires the 'python' feature to be enabled.")
}

/// Creates a logical scan operator from a Python IcebergScanOperator.
/// ex:
/// ```python
/// iceberg_table = pyiceberg.table.StaticTable.from_metadata(metadata_location)
/// iceberg_scan = daft.iceberg.iceberg_scan.IcebergScanOperator(iceberg_table, snapshot_id, storage_config)
/// ```
#[cfg(feature = "python")]
pub fn iceberg_scan<T: AsRef<str>>(
metadata_location: T,
snapshot_id: Option<usize>,
io_config: Option<IOConfig>,
) -> DaftResult<LogicalPlanBuilder> {
use pyo3::IntoPyObjectExt;
let storage_config: StorageConfig = io_config.unwrap_or_default().into();
let scan_operator = Python::with_gil(|py| -> DaftResult<ScanOperatorHandle> {
// iceberg_table = pyiceberg.table.StaticTable.from_metadata(metadata_location)
let iceberg_table_module = PyModule::import(py, "pyiceberg.table")?;
let iceberg_static_table = iceberg_table_module.getattr("StaticTable")?;
let iceberg_table =
iceberg_static_table.call_method1("from_metadata", (metadata_location.as_ref(),))?;
// iceberg_scan = daft.iceberg.iceberg_scan.IcebergScanOperator(iceberg_table, snapshot_id, storage_config)
let iceberg_scan_module = PyModule::import(py, "daft.iceberg.iceberg_scan")?;
let iceberg_scan_class = iceberg_scan_module.getattr("IcebergScanOperator")?;
let iceberg_scan = iceberg_scan_class
.call1((iceberg_table, snapshot_id, storage_config))?
.into_py_any(py)?;
Ok(ScanOperatorHandle::from_python_scan_operator(
iceberg_scan,
py,
)?)
})?;
LogicalPlanBuilder::table_scan(scan_operator.into(), None)
}

#[cfg(not(feature = "python"))]
pub fn iceberg_scan<T: AsRef<str>>(
uri: T,
snapshot_id: Option<usize>,
io_config: Option<IOConfig>,
) -> DaftResult<LogicalPlanBuilder> {
panic!("Iceberg scan requires the 'python' feature to be enabled.")
}
6 changes: 6 additions & 0 deletions src/daft-scan/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ impl Default for StorageConfig {
}
}

impl From<IOConfig> for StorageConfig {
fn from(io_config: IOConfig) -> Self {
Self::new_internal(true, Some(io_config))
}
}

#[cfg(feature = "python")]
#[pymethods]
impl StorageConfig {
Expand Down
1 change: 1 addition & 0 deletions src/daft-sql/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub trait SQLFunction: Send + Sync {
.collect::<SQLPlannerResult<Vec<_>>>()
}

// nit cleanup: argument consistency with SQLTableFunction
fn to_expr(&self, inputs: &[FunctionArg], planner: &SQLPlanner) -> SQLPlannerResult<ExprRef>;

/// Produce the docstrings for this SQL function, parametrized by an alias which is the function name to invoke this in SQL
Expand Down
56 changes: 11 additions & 45 deletions src/daft-sql/src/table_provider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
pub mod read_csv;
pub mod read_json;
pub mod read_parquet;
mod read_csv;
mod read_deltalake;
mod read_iceberg;
mod read_json;
mod read_parquet;

use std::{collections::HashMap, sync::Arc};

use daft_logical_plan::LogicalPlanBuilder;
use once_cell::sync::Lazy;
use read_csv::ReadCsvFunction;
use read_deltalake::ReadDeltalakeFunction;
use read_iceberg::SqlReadIceberg;
use read_json::ReadJsonFunction;
use read_parquet::ReadParquetFunction;
use sqlparser::ast::TableFunctionArgs;
Expand All @@ -20,10 +25,10 @@ use crate::{
pub(crate) static SQL_TABLE_FUNCTIONS: Lazy<SQLTableFunctions> = Lazy::new(|| {
let mut functions = SQLTableFunctions::new();
functions.add_fn("read_csv", ReadCsvFunction);
functions.add_fn("read_deltalake", ReadDeltalakeFunction);
functions.add_fn("read_iceberg", SqlReadIceberg);
functions.add_fn("read_json", ReadJsonFunction);
functions.add_fn("read_parquet", ReadParquetFunction);
#[cfg(feature = "python")]
functions.add_fn("read_deltalake", ReadDeltalakeFunction);
functions
});

Expand Down Expand Up @@ -70,50 +75,11 @@ impl<'a> SQLPlanner<'a> {
}
}

// nit cleanup: switch param order and rename to `to_logical_plan` for consistency with SQLFunction.
pub(crate) trait SQLTableFunction: Send + Sync {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder>;
}

pub struct ReadDeltalakeFunction;

#[cfg(feature = "python")]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
let (uri, io_config) = match args.args.as_slice() {
[uri] => (uri, None),
[uri, io_config] => {
let args = planner.parse_function_args(&[io_config.clone()], &["io_config"], 0)?;
let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?;

(uri, io_config)
}
_ => unsupported_sql_err!("Expected one or two arguments"),
};
let uri = planner.plan_function_arg(uri)?;

let Some(uri) = uri.as_literal().and_then(|lit| lit.as_str()) else {
unsupported_sql_err!("Expected a string literal for the first argument");
};

daft_scan::builder::delta_scan(uri, io_config, true).map_err(From::from)
}
}

#[cfg(not(feature = "python"))]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
unsupported_sql_err!("`read_deltalake` function is not supported. Enable the `python` feature to use this function.")
}
}
44 changes: 44 additions & 0 deletions src/daft-sql/src/table_provider/read_deltalake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use daft_logical_plan::LogicalPlanBuilder;
use sqlparser::ast::TableFunctionArgs;

use super::{expr_to_iocfg, SQLTableFunction};
use crate::{error::SQLPlannerResult, unsupported_sql_err, SQLPlanner};

pub(super) struct ReadDeltalakeFunction;

#[cfg(feature = "python")]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
let (uri, io_config) = match args.args.as_slice() {
[uri] => (uri, None),
[uri, io_config] => {
let args = planner.parse_function_args(&[io_config.clone()], &["io_config"], 0)?;
let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?;
(uri, io_config)
}
_ => unsupported_sql_err!("Expected one or two arguments"),
};
let uri = planner.plan_function_arg(uri)?;

let Some(uri) = uri.as_literal().and_then(|lit| lit.as_str()) else {
unsupported_sql_err!("Expected a string literal for the first argument");
};

daft_scan::builder::delta_scan(uri, io_config, true).map_err(From::from)
}
}

#[cfg(not(feature = "python"))]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
unsupported_sql_err!("`read_deltalake` function is not supported. Enable the `python` feature to use this function.")
}
}
74 changes: 74 additions & 0 deletions src/daft-sql/src/table_provider/read_iceberg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use common_io_config::IOConfig;
use daft_logical_plan::LogicalPlanBuilder;
use sqlparser::ast::TableFunctionArgs;

use super::SQLTableFunction;
use crate::{
error::{PlannerError, SQLPlannerResult},
functions::{self, SQLFunctionArguments},
SQLPlanner,
};

/// The Daft-SQL `read_iceberg` table-value function.
pub(super) struct SqlReadIceberg;

/// The Daft-SQL `read_iceberg` table-value function arguments.
struct SqlReadIcebergArgs {
metadata_location: String,
snapshot_id: Option<usize>,
io_config: Option<IOConfig>,
}

impl SqlReadIcebergArgs {
/// Like a TryFrom<SQLFunctionArguments> but from TalbeFunctionArgs directly and passing the planner.
fn try_from(planner: &SQLPlanner, args: &TableFunctionArgs) -> SQLPlannerResult<Self> {
planner.plan_function_args(&args.args, &["snapshot_id", "io_config"], 1)
}
}

impl TryFrom<SQLFunctionArguments> for SqlReadIcebergArgs {
type Error = PlannerError;

/// This is required to use `planner.plan_function_args`
fn try_from(args: SQLFunctionArguments) -> Result<Self, Self::Error> {
let metadata_location: String = args
.try_get_positional(0)?
.expect("read_iceberg requires a path");
let snapshot_id: Option<usize> = args.try_get_named("snapshot_id")?;
let io_config: Option<IOConfig> = functions::args::parse_io_config(&args)?.into();
Ok(Self {
metadata_location,
snapshot_id,
io_config,
})
}
}

/// Translates the `read_iceberg` table-value function to a logical scan operator.
#[cfg(feature = "python")]
impl SQLTableFunction for SqlReadIceberg {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
let args = SqlReadIcebergArgs::try_from(planner, args)?;
Ok(daft_scan::builder::iceberg_scan(
args.metadata_location,
args.snapshot_id,
args.io_config,
)?)
}
}

/// Translates the `read_iceberg` table-value function to a logical scan operator (errors without python feature).
#[cfg(not(feature = "python"))]
impl SQLTableFunction for SqlReadIceberg {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
crate::unsupported_sql_err!("`read_iceberg` function is not supported. Enable the `python` feature to use this function.")
}
}
Empty file.
20 changes: 20 additions & 0 deletions tests/sql/test_table_functions/test_read_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

import daft


@pytest.mark.skip(
"invoke manually via `uv run tests/sql/test_table_functions/test_read_iceberg.py <metadata_location>`"
)
def test_read_iceberg(metadata_location):
df = daft.sql(f"SELECT * FROM read_iceberg('{metadata_location}')")
print(df.collect())


if __name__ == "__main__":
import sys

if len(sys.argv) < 2:
print("usage: test_read_iceberg.py <metadata_location>")
sys.exit(1)
test_read_iceberg(metadata_location=sys.argv[1])
Loading