Skip to content

Commit

Permalink
MAINT: Link export functions to each objdata provider
Browse files Browse the repository at this point in the history
  • Loading branch information
tnatt committed Mar 9, 2025
1 parent 7a48f69 commit 6a71213
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 20 deletions.
27 changes: 21 additions & 6 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from io import BufferedIOBase, BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Final
from typing import Any, Callable, Final

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -73,6 +73,21 @@ def export_metadata_file(file: Path, metadata: dict) -> None:
logger.info("Yaml file on: %s", file)


def export_object_to_file(
file: Path | BytesIO,
object_export_function: Callable[[Path | BytesIO], None],
) -> None:
"""
Export a object to file or memory buffer using a provided export function.
"""

if isinstance(file, Path):
file.parent.mkdir(parents=True, exist_ok=True)

object_export_function(file)


# TODO: Remove this function when AggregatedData.export() is removed
def export_file(
obj: types.Inferrable,
file: Path | BytesIO,
Expand Down Expand Up @@ -159,21 +174,21 @@ def md5sum_stream(stream: BufferedIOBase) -> str:
return hash_md5.hexdigest()


def compute_md5(obj: types.Inferrable, file_suffix: str, fmt: str = "") -> str:
def compute_md5(object_export_function: Callable[[Path | BytesIO], None]) -> str:
"""Compute an MD5 sum for an object."""
memory_stream = BytesIO()
export_file(obj, memory_stream, file_suffix, fmt=fmt)
object_export_function(memory_stream)
return md5sum(memory_stream)


def compute_md5_using_temp_file(
obj: types.Inferrable, file_suffix: str, fmt: str = ""
object_export_function: Callable[[Path | BytesIO], None],
) -> str:
"""Compute an MD5 sum using a temporary file."""
with NamedTemporaryFile(buffering=0, suffix=file_suffix) as tf:
with NamedTemporaryFile(buffering=0, suffix=".tmp") as tf:
logger.info("Compute MD5 sum for tmp file")
tempfile = Path(tf.name)
export_file(obj=obj, file=tempfile, fmt=fmt)
object_export_function(tempfile)
return md5sum(tempfile)


Expand Down
4 changes: 2 additions & 2 deletions src/fmu/dataio/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ def _set_metadata(
objdata = objectdata_provider_factory(obj=obj, dataio=etemp)

try:
checksum_md5 = _utils.compute_md5(obj, objdata.extension)
checksum_md5 = _utils.compute_md5(objdata.export_to_file)
except Exception as e:
logger.debug(
f"Exception {e} occured when trying to compute md5 from memory stream "
f"for an object of type {type(obj)}. Will use tempfile instead."
)
checksum_md5 = _utils.compute_md5_using_temp_file(obj, objdata.extension)
checksum_md5 = _utils.compute_md5_using_temp_file(objdata.export_to_file)

template["tracklog"] = [fields.Tracklog.initialize()[0]]
template["file"] = {
Expand Down
10 changes: 6 additions & 4 deletions src/fmu/dataio/dataio.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
from ._models.fmu_results.standard_result import StandardResult
from ._utils import (
detect_inside_rms, # dataio_examples,
export_file,
export_metadata_file,
export_object_to_file,
prettyprint_dict,
read_metadata_from_file,
some_config_from_env,
Expand Down Expand Up @@ -859,7 +859,7 @@ def _export_without_metadata(self, obj: types.Inferrable) -> str:
).get_metadata()

assert filemeta.absolute_path is not None # for mypy
export_file(obj, file=filemeta.absolute_path, fmt=objdata.fmt)
export_object_to_file(filemeta.absolute_path, objdata.export_to_file)
return str(filemeta.absolute_path)

def _export_with_standard_result(
Expand All @@ -873,6 +873,7 @@ def _export_with_standard_result(
)

fmudata = self._get_fmu_provider() if self._fmurun else None
objdata = objectdata_provider_factory(obj, self)

metadata = generate_export_metadata(
obj=obj, dataio=self, fmudata=fmudata, standard_result=standard_result
Expand All @@ -881,7 +882,7 @@ def _export_with_standard_result(
outfile = Path(metadata["file"]["absolute_path"])
metafile = outfile.parent / f".{outfile.name}.yml"

export_file(obj, outfile, fmt=metadata["data"].get("format", ""))
export_object_to_file(outfile, objdata.export_to_file)
logger.info("Actual file is: %s", outfile)

export_metadata_file(metafile, metadata)
Expand Down Expand Up @@ -1001,7 +1002,8 @@ def export(
outfile = Path(metadata["file"]["absolute_path"])
metafile = outfile.parent / f".{outfile.name}.yml"

export_file(obj, outfile, fmt=metadata["data"].get("format", ""))
objdata = objectdata_provider_factory(obj, self)
export_object_to_file(outfile, objdata.export_to_file)
logger.info("Actual file is: %s", outfile)

export_metadata_file(metafile, metadata)
Expand Down
10 changes: 2 additions & 8 deletions src/fmu/dataio/providers/_filedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,13 @@ def _get_share_folders(self) -> Path:
def _compute_md5(self) -> str:
"""Compute an MD5 sum using a temporary file."""
try:
return compute_md5(
obj=self.obj,
file_suffix=self.objdata.extension,
fmt=self.objdata.fmt,
)
return compute_md5(self.objdata.export_to_file)
except Exception as e:
logger.debug(
f"Exception {e} occured when trying to compute md5 from memory stream "
f"for an object of type {type(self.obj)}. Will use tempfile instead."
)
return compute_md5_using_temp_file(
self.obj, self.objdata.extension, fmt=self.objdata.fmt
)
return compute_md5_using_temp_file(self.objdata.export_to_file)

def _add_filename_to_path(self, path: Path) -> Path:
stem = self._get_filestem()
Expand Down
7 changes: 7 additions & 0 deletions src/fmu/dataio/providers/objectdata/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
)

if TYPE_CHECKING:
from io import BytesIO
from pathlib import Path

from pydantic import BaseModel

from fmu.dataio._models.fmu_results.data import (
Expand Down Expand Up @@ -145,6 +148,10 @@ def layout(self) -> Layout:
def table_index(self) -> list[str] | None:
raise NotImplementedError

@abstractmethod
def export_to_file(self, file: Path | BytesIO) -> None:
raise NotImplementedError

@abstractmethod
def get_geometry(self) -> Geometry | None:
raise NotImplementedError
Expand Down
15 changes: 15 additions & 0 deletions src/fmu/dataio/providers/objectdata/_faultroom.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Final

from fmu.dataio._definitions import ExportFolder, ValidFormats
Expand All @@ -18,6 +20,8 @@
)

if TYPE_CHECKING:
from io import BytesIO

from fmu.dataio.readers import FaultRoomSurface

logger: Final = null_logger(__name__)
Expand Down Expand Up @@ -97,3 +101,14 @@ def get_spec(self) -> FaultRoomSurfaceSpecification:
properties=self.obj.properties,
name=self.obj.name,
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

serialized_json = json.dumps(self.obj.storage, indent=4)

if isinstance(file, Path):
with open(file, "w", encoding="utf-8") as stream:
stream.write(serialized_json)
else:
file.write(serialized_json.encode("utf-8"))
16 changes: 16 additions & 0 deletions src/fmu/dataio/providers/objectdata/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Final

import pandas as pd
Expand Down Expand Up @@ -114,6 +116,9 @@
)

if TYPE_CHECKING:
from io import BytesIO
from pathlib import Path

from fmu.dataio.dataio import ExportData
from fmu.dataio.types import Inferrable

Expand Down Expand Up @@ -214,3 +219,14 @@ def get_bbox(self) -> None:

def get_spec(self) -> None:
"""Derive data.spec for dict."""

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

serialized_json = json.dumps(self.obj)

if isinstance(file, Path):
with open(file, "w", encoding="utf-8") as stream:
stream.write(serialized_json)
else:
file.write(serialized_json.encode("utf-8"))
20 changes: 20 additions & 0 deletions src/fmu/dataio/providers/objectdata/_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Final

import pyarrow as pa
import pyarrow.parquet as pq

from fmu.dataio._definitions import (
STANDARD_TABLE_INDEX_COLUMNS,
ExportFolder,
Expand All @@ -18,6 +21,9 @@
)

if TYPE_CHECKING:
from io import BytesIO
from pathlib import Path

import pandas as pd
import pyarrow

Expand Down Expand Up @@ -180,6 +186,15 @@ def get_spec(self) -> TableSpecification:
size=int(self.obj.size),
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

logger.info(
"Exporting dataframe to csv. Note: index columns will not be "
"preserved unless calling 'reset_index()' on the dataframe."
)
self.obj.to_csv(file, index=False)


@dataclass
class ArrowTableDataProvider(ObjectDataProvider):
Expand Down Expand Up @@ -229,3 +244,8 @@ def get_spec(self) -> TableSpecification:
num_rows=self.obj.num_rows,
size=self.obj.num_columns * self.obj.num_rows,
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

pq.write_table(self.obj, where=pa.output_stream(file))
59 changes: 59 additions & 0 deletions src/fmu/dataio/providers/objectdata/_xtgeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from ._base import ObjectDataProvider

if TYPE_CHECKING:
from io import BytesIO
from pathlib import Path

import pandas as pd
import xtgeo

Expand Down Expand Up @@ -119,6 +122,11 @@ def get_spec(self) -> SurfaceSpecification:
undef=1.0e30,
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

self.obj.to_file(file, fformat="irap_binary")


@dataclass
class PolygonsDataProvider(ObjectDataProvider):
Expand Down Expand Up @@ -175,6 +183,26 @@ def get_spec(self) -> PolygonsSpecification:
).size
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

if self.extension == ".pol":
self.obj.to_file(file)

elif self.extension == ".csv":
obj = self.obj.copy() # to not modify incoming instance!
if "xtgeo" not in self.fmt:
obj.xname = "X"
obj.yname = "Y"
obj.zname = "Z"
df = obj.get_dataframe(copy=False).rename(
columns={obj.pname: "ID"},
)
else:
df = obj.get_dataframe(copy=False)

df.to_csv(file, index=False)


@dataclass
class PointsDataProvider(ObjectDataProvider):
Expand Down Expand Up @@ -236,6 +264,22 @@ def get_spec(self) -> PointSpecification:
size=int(df.size),
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

if self.extension == ".pol":
self.obj.to_file(file)

elif self.extension == ".csv":
obj = self.obj.copy() # to not modify incoming instance!
if "xtgeo" not in self.fmt:
obj.xname = "X"
obj.yname = "Y"
obj.zname = "Z"

df = obj.get_dataframe(copy=False)
df.to_csv(file, index=False)


@dataclass
class CubeDataProvider(ObjectDataProvider):
Expand Down Expand Up @@ -319,6 +363,11 @@ def get_spec(self) -> CubeSpecification:
undef=npfloat_to_float(required["undef"]),
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

self.obj.to_file(file, fformat="segy")


@dataclass
class CPGridDataProvider(ObjectDataProvider):
Expand Down Expand Up @@ -404,6 +453,11 @@ def _get_zonation(self) -> list[ZoneDefinition]:
key=lambda x: x.min_layer_idx,
)

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

self.obj.to_file(file, fformat="roff")


@dataclass
class CPGridPropertyDataProvider(ObjectDataProvider):
Expand Down Expand Up @@ -457,3 +511,8 @@ def get_geometry(self) -> Geometry | None:
lack_of_geometry_warn()

return Geometry(name=name, relative_path=relpath) if name and relpath else None

def export_to_file(self, file: Path | BytesIO) -> None:
"""Export the object to file or memory buffer"""

self.obj.to_file(file, fformat="roff")

0 comments on commit 6a71213

Please sign in to comment.