Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expand ListingSchemaProvider to support register and deregister table #3150

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
59 changes: 42 additions & 17 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::errors::DeltaResult;
use crate::open_table_with_storage_options;
use crate::storage::*;
use crate::table::builder::ensure_table_uri;
use crate::DeltaTableBuilder;
use crate::{storage::*, DeltaTable};

const DELTA_LOG_FOLDER: &str = "_delta_log";

Expand All @@ -36,7 +36,7 @@ pub struct ListingSchemaProvider {
/// Underlying object store
store: Arc<dyn ObjectStore>,
/// A map of table names to a fully quilfied storage location
tables: DashMap<String, String>,
tables: DashMap<String, Arc<dyn TableProvider>>,
/// Options used to create underlying object stores
storage_options: StorageOptions,
}
Expand Down Expand Up @@ -73,6 +73,7 @@ impl ListingSchemaProvider {
parent = p;
}
}

for table in tables.into_iter() {
let table_name = normalize_table_name(table)?;
let table_path = table
Expand All @@ -81,9 +82,33 @@ impl ListingSchemaProvider {
.to_string();
if !self.table_exist(&table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
self.tables.insert(table_name.to_string(), table_url);
let Ok(delta_table) = DeltaTableBuilder::from_uri(table_url)
.with_storage_options(self.storage_options.0.clone())
.build()
else {
continue;
};
let _ = self.register_table(table_name, Arc::new(delta_table));
}
}
Ok(())
}

/// Tables are not initialized in Eddytor but have a reference setup. To initialize the delta
/// table, the `load()` function must be called on the delta table. This function helps with
/// that and ensures the DashMap is updated
pub async fn load_delta(&self, table_name: &str) -> datafusion::common::Result<()> {
if let Some(mut table) = self.tables.get_mut(&table_name.to_string()) {
if let Some(delta_table) = table.value().as_any().downcast_ref::<DeltaTable>() {
// If table has not yet been loaded, we remove it from the tables map and add it again
if delta_table.state.is_none() {
let mut delta_table = delta_table.clone();
delta_table.load().await?;
*table = Arc::from(delta_table);
}
}
}

Ok(())
}
}
Expand Down Expand Up @@ -112,31 +137,31 @@ impl SchemaProvider for ListingSchemaProvider {
}

async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
let Some(provider) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
Ok(Some(provider))
}

fn register_table(
&self,
_name: String,
_table: Arc<dyn TableProvider>,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
if !self.table_exist(name.as_str()) {
self.tables.insert(name, table.clone());
}
Ok(Some(table))
}

fn deregister_table(
&self,
_name: &str,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support deregistering tables".to_owned(),
))
if let Some(table) = self.tables.remove(name) {
return Ok(Some(table.1));
}
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
Expand Down
Loading