Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Add native executor to CI #2855

Merged
merged 7 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ jobs:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 16.0.0]
enable-native-executor: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
- daft-runner: ray
enable-native-executor: 1
- daft-runner: ray
pyarrow-version: 7.0.0
os: ubuntu-20.04
Expand Down Expand Up @@ -115,6 +118,7 @@ jobs:
CARGO_TARGET_DIR: ./target

DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}

- name: Build library and Test with pytest (Windows)
if: ${{ (runner.os == 'Windows') }}
Expand Down
6 changes: 6 additions & 0 deletions tests/cookbook/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
import pandas as pd
import pytest

from daft import context
from daft.datatype import DataType
from daft.expressions import col
from daft.udf import udf
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts):
"""Sums across an entire column for the entire table"""
Expand Down
6 changes: 6 additions & 0 deletions tests/cookbook/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

import pytest

from daft import context
from daft.expressions import col
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize(
"join_strategy", [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True
Expand Down
5 changes: 5 additions & 0 deletions tests/cookbook/test_pandas_cookbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
import pytest

import daft
from daft import context
from daft.datatype import DataType
from daft.expressions import col, lit
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
###
# Idioms: if-then
###
Expand Down
5 changes: 5 additions & 0 deletions tests/cookbook/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
from pyarrow import dataset as pads

import daft
from daft import context
from tests.conftest import assert_df_equals
from tests.cookbook.assets import COOKBOOK_DATA_CSV

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)


Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
import pytest

import daft
from daft import col
from daft import col, context
from daft.context import get_context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.utils import freeze
from tests.utils import sort_arrow_table

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 4])
def test_agg_global(make_df, repartition_nparts):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_approx_count_distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import pytest

import daft
from daft import col
from daft import col, context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

TESTS = [
[[], 0],
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_approx_percentiles_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import pyarrow as pa
import pytest

from daft import col
from daft import col, context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 4])
Expand Down
7 changes: 7 additions & 0 deletions tests/dataframe/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import pytest

from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_simple_concat(make_df):
df1 = make_df({"foo": [1, 2, 3]})
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
import pytest

import daft
from daft import context
from daft.api_annotations import APITypeError
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.utils import pyarrow_supports_fixed_shape_tensor
from tests.conftest import UuidType

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric())


Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_decimals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)

Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import pyarrow as pa
import pytest

from daft import context
from daft.datatype import DataType
from tests.utils import sort_arrow_table

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 5])
def test_distinct_with_nulls(make_df, repartition_nparts):
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_explode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
import pyarrow as pa
import pytest

from daft import context
from daft.expressions import col

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize(
"data",
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


class MockException(Exception):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_joins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import pyarrow as pa
import pytest

from daft import col
from daft import col, context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from tests.utils import sort_arrow_table

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def skip_invalid_join_strategies(join_strategy, join_type):
if (join_strategy == "sort_merge" or join_strategy == "sort_merge_aligned_boundaries") and join_type != "inner":
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_map_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 4])
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_monotonically_increasing_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import pytest

from daft import context
from daft.datatype import DataType

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_monotonically_increasing_id_single_partition(make_df) -> None:
data = {"a": [1, 2, 3, 4, 5]}
Expand Down
7 changes: 7 additions & 0 deletions tests/dataframe/test_pivot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import pytest

from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("repartition_nparts", [1, 2, 5])
def test_pivot(make_df, repartition_nparts):
Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
from PIL import Image

import daft
from daft import context
from tests.utils import ANSI_ESCAPE, TD_STYLE, TH_STYLE

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

ROW_DIVIDER_REGEX = re.compile(r"╭─+┬*─*╮|├╌+┼*╌+┤")
SHOWING_N_ROWS_REGEX = re.compile(r".*\(Showing first (\d+) of (\d+) rows\).*")
UNMATERIALIZED_REGEX = re.compile(r".*\(No data to display: Dataframe not materialized\).*")
Expand Down
7 changes: 7 additions & 0 deletions tests/dataframe/test_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import pytest

from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_sample_fraction(make_df, valid_data: list[dict[str, float]]) -> None:
df = make_df(valid_data)
Expand Down
5 changes: 5 additions & 0 deletions tests/dataframe/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
import pyarrow as pa
import pytest

from daft import context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
###
# Validation tests
###
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_temporals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
import pytz

import daft
from daft import DataType, col
from daft import DataType, col, context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)

Expand Down
6 changes: 6 additions & 0 deletions tests/dataframe/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import pytest

import daft
from daft import context

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def add_1(df):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_unpivot.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import pytest

from daft import col
from daft import col, context
from daft.datatype import DataType

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


@pytest.mark.parametrize("n_partitions", [1, 2, 4])
def test_unpivot(make_df, n_partitions):
Expand Down
7 changes: 6 additions & 1 deletion tests/dataframe/test_wildcard.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import pytest

import daft
from daft import col
from daft import col, context
from daft.exceptions import DaftCoreException

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)


def test_wildcard_select():
df = daft.from_pydict(
Expand Down
1 change: 0 additions & 1 deletion tests/integration/iceberg/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0)
pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="iceberg writes only supported if pyarrow >= 8.0.0")


import tenacity
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.table import Table
Expand Down
Loading
Loading