Skip to content

Commit

Permalink
docs: Add example and improve readme
Browse files Browse the repository at this point in the history
  • Loading branch information
pall-j committed Jan 9, 2025
1 parent 56d859c commit 4324067
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 13 deletions.
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ process_data(
myjobpackage
├── __init__.py
├── entrypoint.py # Databricks Notebook
── processing.py
└── tests
├── __init__.py
├── test_processing.py
└── data
── tables
├── example_input.ndjson
── expected_output.ndjson
└── schema
├── example_input.json
└── expected_output.json
── processing.py
tests
├── __init__.py
├── test_processing.py
└── data
── tables
├── example_input.ndjson
── expected_output.ndjson
└── schema
├── example_input.json
└── expected_output.json
```

**Data Format**
Expand Down Expand Up @@ -265,6 +265,8 @@ def test_process_data(
`assertDataFrameEqual` (this can be adjusted using the `checkRowOrder`
parameter).

**ℹ️ For complete example, please look at [example](https://github.com/datamole-ai/pysparkdt/blob/main/example).**

**⚠️ Note on running tests in parallel**

With the setup above, the metastore is shared on the module scope.
Expand Down
Empty file.
23 changes: 23 additions & 0 deletions example/myjobpackage/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Databricks notebook source
import sys
from pathlib import Path

MODULE_DIR = Path.cwd().parent
sys.path.append(MODULE_DIR.as_posix())

# COMMAND ----------

from myjobpackage.processing import process_data

# COMMAND ----------

input_table = dbutils.widgets.get('input_table') # noqa: F821
output_table = dbutils.widgets.get('output_table') # noqa: F821

# COMMAND ----------

process_data(
session=spark, # noqa: F821
input_table=input_table,
output_table=output_table,
)
File renamed without changes.
Empty file added example/tests/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions example/tests/data/tables/example_input.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "feature": 0.5876}
{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "feature": 0.42}
2 changes: 2 additions & 0 deletions example/tests/data/tables/expected_output.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "result": 58.76}
{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "result": 42}
30 changes: 30 additions & 0 deletions example/tests/data/tables/schema/example_input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"type": "struct",
"fields":
[
{
"name": "id",
"type": "long",
"nullable": false,
"metadata": {}
},
{
"name": "time_utc",
"type": "timestamp",
"nullable": false,
"metadata": {}
},
{
"name": "name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "feature",
"type": "double",
"nullable": true,
"metadata": {}
}
]
}
30 changes: 30 additions & 0 deletions example/tests/data/tables/schema/expected_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"type": "struct",
"fields":
[
{
"name": "id",
"type": "long",
"nullable": false,
"metadata": {}
},
{
"name": "time_utc",
"type": "timestamp",
"nullable": false,
"metadata": {}
},
{
"name": "name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "result",
"type": "double",
"nullable": true,
"metadata": {}
}
]
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import os

from myjobpackage.processing import process_data
from pyspark.sql import SparkSession
from pyspark.testing import assertDataFrameEqual
from pytest import fixture

from pysparkdt import reinit_local_metastore, spark_base

from .processing import process_data

DATA_DIR = f'{os.path.dirname(__file__)}/data'
JSON_TABLES_DIR = f'{DATA_DIR}/tables'
TMP_DIR = f'{DATA_DIR}/tmp'
Expand Down
35 changes: 35 additions & 0 deletions tests/test_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os

from pyspark.sql import SparkSession
from pyspark.testing import assertDataFrameEqual
from pytest import fixture

from example.myjobpackage.processing import process_data
from pysparkdt import reinit_local_metastore, spark_base

DATA_DIR = f'{os.path.dirname(__file__)}/data'
JSON_TABLES_DIR = f'{DATA_DIR}/tables'
TMP_DIR = f'{DATA_DIR}/tmp'
METASTORE_DIR = f'{TMP_DIR}/metastore'


@fixture(scope='module')
def spark():
yield from spark_base(METASTORE_DIR)


def test_process_data(
spark: SparkSession,
):
reinit_local_metastore(spark, JSON_TABLES_DIR)
process_data(
session=spark,
input_table='example_input',
output_table='output',
)
output = spark.read.format('delta').table('output')
expected = spark.read.format('delta').table('expected_output')
assertDataFrameEqual(
actual=output.select(sorted(output.columns)),
expected=expected.select(sorted(expected.columns)),
)

0 comments on commit 4324067

Please sign in to comment.