Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Jan 16, 2025
1 parent 4fbd139 commit f8df09c
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 80 deletions.
2 changes: 1 addition & 1 deletion daft/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
```python
df = daft.from_pydict({"foo": [1, 2, 3]})
daft.catalog.register_named_table(
daft.catalog.register_table(
"my_table",
df,
)
Expand Down
31 changes: 14 additions & 17 deletions src/daft-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ pub mod global_catalog {

use lazy_static::lazy_static;

use crate::{DaftMetaCatalog, DataCatalog};
use crate::{DaftCatalog, DataCatalog};

lazy_static! {
pub(crate) static ref GLOBAL_DAFT_META_CATALOG: RwLock<DaftMetaCatalog> =
RwLock::new(DaftMetaCatalog::new_from_env());
pub(crate) static ref GLOBAL_DAFT_META_CATALOG: RwLock<DaftCatalog> =
RwLock::new(DaftCatalog::new_from_env());
}

/// Register a DataCatalog with the global DaftMetaCatalog
Expand Down Expand Up @@ -51,7 +51,7 @@ static DEFAULT_CATALOG_NAME: &str = "default";
/// Users of Daft can register various [`DataCatalog`] with Daft, enabling
/// discovery of tables across various [`DataCatalog`] implementations.
#[derive(Debug, Clone, Default)]
pub struct DaftMetaCatalog {
pub struct DaftCatalog {
/// Map of catalog names to the DataCatalog impls.
///
/// NOTE: The default catalog is always named "default"
Expand All @@ -61,11 +61,11 @@ pub struct DaftMetaCatalog {
named_tables: HashMap<String, LogicalPlanBuilder>,
}

impl DaftMetaCatalog {
impl DaftCatalog {
/// Create a `DaftMetaCatalog` from the current environment
pub fn new_from_env() -> Self {
// TODO: Parse a YAML file to produce the catalog
DaftMetaCatalog {
DaftCatalog {
data_catalogs: default::Default::default(),
named_tables: default::Default::default(),
}
Expand Down Expand Up @@ -96,7 +96,7 @@ impl DaftMetaCatalog {
}

/// Registers a LogicalPlan with a name in the DaftMetaCatalog
pub fn register_named_table(
pub fn register_table(
&mut self,
name: &str,
view: impl Into<LogicalPlanBuilder>,
Expand All @@ -110,9 +110,8 @@ impl DaftMetaCatalog {
Ok(())
}

/// Check if a named table is registered in the DaftMetaCatalog
///
pub fn contains_named_table(&self, name: &str) -> bool {
/// Check if a named table is registered in the DaftCatalog
pub fn contains_table(&self, name: &str) -> bool {
self.named_tables.contains_key(name)
}

Check warning on line 116 in src/daft-catalog/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-catalog/src/lib.rs#L114-L116

Added lines #L114 - L116 were not covered by tests

Expand Down Expand Up @@ -201,26 +200,24 @@ mod tests {

#[test]
fn test_register_and_unregister_named_table() {
let mut catalog = DaftMetaCatalog::new_from_env();
let mut catalog = DaftCatalog::new_from_env();
let plan = LogicalPlanBuilder::from(mock_plan());

// Register a table
assert!(catalog
.register_named_table("test_table", plan.clone())
.is_ok());
assert!(catalog.register_table("test_table", plan.clone()).is_ok());

// Try to register a table with invalid name
assert!(catalog
.register_named_table("invalid name", plan.clone())
.register_table("invalid name", plan.clone())
.is_err());
}

#[test]
fn test_read_registered_table() {
let mut catalog = DaftMetaCatalog::new_from_env();
let mut catalog = DaftCatalog::new_from_env();
let plan = LogicalPlanBuilder::from(mock_plan());

catalog.register_named_table("test_table", plan).unwrap();
catalog.register_table("test_table", plan).unwrap();

assert!(catalog.read_table("test_table").is_ok());
assert!(catalog.read_table("non_existent_table").is_err());
Expand Down
2 changes: 1 addition & 1 deletion src/daft-catalog/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn py_register_table(
global_catalog::GLOBAL_DAFT_META_CATALOG
.write()
.unwrap()
.register_named_table(table_identifier, logical_plan.builder.clone())?;
.register_table(table_identifier, logical_plan.builder.clone())?;

Check warning on line 64 in src/daft-catalog/src/python.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-catalog/src/python.rs#L64

Added line #L64 was not covered by tests
Ok(table_identifier.to_string())
}

Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,14 @@ impl Session {

{
let catalog = self.catalog.read().unwrap();
if !replace && catalog.contains_named_table(&name) {
if !replace && catalog.contains_table(&name) {
return Err(Status::internal("Dataframe view already exists"));

Check warning on line 269 in src/daft-connect/src/execute.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/execute.rs#L269

Added line #L269 was not covered by tests
}
}

let mut catalog = self.catalog.write().unwrap();

catalog.register_named_table(&name, input).map_err(|e| {
catalog.register_table(&name, input).map_err(|e| {
Status::internal(textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"))

Check warning on line 276 in src/daft-connect/src/execute.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/execute.rs#L276

Added line #L276 was not covered by tests
})?;

Expand Down
6 changes: 3 additions & 3 deletions src/daft-connect/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::{Arc, RwLock},
};

use daft_catalog::DaftMetaCatalog;
use daft_catalog::DaftCatalog;
use daft_micropartition::partitioning::InMemoryPartitionSetCache;
use uuid::Uuid;

Expand All @@ -19,7 +19,7 @@ pub struct Session {
/// MicroPartitionSet associated with this session
/// this will be filled up as the user runs queries
pub(crate) psets: Arc<InMemoryPartitionSetCache>,
pub(crate) catalog: Arc<RwLock<DaftMetaCatalog>>,
pub(crate) catalog: Arc<RwLock<DaftCatalog>>,
}

impl Session {
Expand All @@ -39,7 +39,7 @@ impl Session {
id,
server_side_session_id,
psets: Arc::new(InMemoryPartitionSetCache::empty()),
catalog: Arc::new(RwLock::new(DaftMetaCatalog::default())),
catalog: Arc::new(RwLock::new(DaftCatalog::default())),
}
}

Expand Down
43 changes: 0 additions & 43 deletions src/daft-sql/src/catalog.rs

This file was deleted.

10 changes: 5 additions & 5 deletions src/daft-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
mod tests {
use std::sync::Arc;

use daft_catalog::DaftMetaCatalog;
use daft_catalog::DaftCatalog;
use daft_core::prelude::*;
use daft_dsl::{col, lit, Expr, OuterReferenceColumn, Subquery};
use daft_logical_plan::{
Expand Down Expand Up @@ -112,11 +112,11 @@ mod tests {

#[fixture]
fn planner() -> SQLPlanner<'static> {
let mut catalog = DaftMetaCatalog::default();
let mut catalog = DaftCatalog::default();

catalog.register_named_table("tbl1", tbl_1());
catalog.register_named_table("tbl2", tbl_2());
catalog.register_named_table("tbl3", tbl_3());
catalog.register_table("tbl1", tbl_1());
catalog.register_table("tbl2", tbl_2());
catalog.register_table("tbl3", tbl_3());

SQLPlanner::new(catalog)
}
Expand Down
8 changes: 4 additions & 4 deletions src/daft-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use common_error::{DaftError, DaftResult};
use daft_algebra::boolean::combine_conjunction;
use daft_catalog::DaftMetaCatalog;
use daft_catalog::DaftCatalog;
use daft_core::prelude::*;
use daft_dsl::{
col,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl Relation {
/// Context that is shared across a query and its subqueries
#[derive(Default)]
struct PlannerContext {
catalog: DaftMetaCatalog,
catalog: DaftCatalog,
cte_map: HashMap<String, Relation>,
}

Expand All @@ -92,7 +92,7 @@ pub struct SQLPlanner<'a> {
}

impl<'a> SQLPlanner<'a> {
pub fn new(catalog: DaftMetaCatalog) -> Self {
pub fn new(catalog: DaftCatalog) -> Self {
let context = Rc::new(RefCell::new(PlannerContext {
catalog,
..Default::default()
Expand Down Expand Up @@ -138,7 +138,7 @@ impl<'a> SQLPlanner<'a> {
Ref::map(self.context.borrow(), |i| &i.cte_map)
}

fn catalog(&self) -> Ref<'_, DaftMetaCatalog> {
fn catalog(&self) -> Ref<'_, DaftCatalog> {
Ref::map(self.context.borrow(), |i| &i.catalog)
}

Expand Down
8 changes: 4 additions & 4 deletions src/daft-sql/src/python.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use common_daft_config::PyDaftPlanningConfig;
use daft_catalog::DaftMetaCatalog;
use daft_catalog::DaftCatalog;
use daft_dsl::python::PyExpr;
use daft_logical_plan::{LogicalPlanBuilder, PyLogicalPlanBuilder};
use pyo3::prelude::*;
Expand Down Expand Up @@ -69,7 +69,7 @@ pub fn list_sql_functions() -> Vec<SQLFunctionStub> {
#[pyclass(module = "daft.daft")]
#[derive(Debug, Clone)]
pub struct PyCatalog {
catalog: DaftMetaCatalog,
catalog: DaftCatalog,
}

#[pymethods]
Expand All @@ -78,7 +78,7 @@ impl PyCatalog {
#[staticmethod]
pub fn new() -> Self {
Self {
catalog: DaftMetaCatalog::default(),
catalog: DaftCatalog::default(),
}
}

Expand All @@ -89,7 +89,7 @@ impl PyCatalog {
dataframe: &mut PyLogicalPlanBuilder,
) -> PyResult<()> {
let plan = dataframe.builder.build();
self.catalog.register_named_table(name, plan)?;
self.catalog.register_table(name, plan)?;
Ok(())
}

Expand Down

0 comments on commit f8df09c

Please sign in to comment.