Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP, ENH: add df->rec converter #886

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions darshan-util/pydarshan/darshan/backend/api_def_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@


header = """/* from darshan-logutils.h */

struct darshan_file_category_counters {
int64_t count; /* number of files in this category */
int64_t total_read_volume_bytes; /* total read traffic volume */
int64_t total_write_volume_bytes;/* total write traffic volume */
int64_t max_read_volume_bytes; /* maximum read traffic volume to 1 file */
int64_t max_write_volume_bytes; /* maximum write traffic volume to 1 file */
int64_t total_max_offset_bytes; /* summation of max_offsets */
int64_t max_offset_bytes; /* largest max_offset */
int64_t nprocs; /* how many procs accessed (-1 for "all") */
};

struct darshan_derived_metrics {
int64_t total_bytes;
double unique_io_total_time_by_slowest;
double unique_rw_only_time_by_slowest;
double unique_md_only_time_by_slowest;
int unique_io_slowest_rank;
double shared_io_total_time_by_slowest;
double agg_perf_by_slowest;
double agg_time_by_slowest;
struct darshan_file_category_counters category_counters[7];
};

struct darshan_mnt_info
{
char mnt_type[3015];
Expand All @@ -23,6 +47,20 @@
int partial_flag;
};

/* opaque accumulator reference */
struct darshan_accumulator_st;
typedef struct darshan_accumulator_st* darshan_accumulator;

/* NOTE: darshan_module_id is technically an enum in the C API, but we'll
* just use an int for now (equivalent type) to avoid warnings from cffi
* that we have not defined explicit enum values. We don't need that
* functionality.
*/
int darshan_accumulator_create(int darshan_module_id, int64_t, darshan_accumulator*);
int darshan_accumulator_inject(darshan_accumulator, void*, int);
int darshan_accumulator_emit(darshan_accumulator, struct darshan_derived_metrics*, void* aggregation_record);
int darshan_accumulator_destroy(darshan_accumulator);

/* from darshan-log-format.h */
typedef uint64_t darshan_record_id;

Expand Down
51 changes: 51 additions & 0 deletions darshan-util/pydarshan/darshan/backend/cffi_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,54 @@ def _log_get_heatmap_record(log):
libdutil.darshan_free(buf[0])

return rec


def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None):
"""
Pack the DataFrames-format PyDarshan data back into
a C buffer of records that can be consumed by darshan-util
C code.

Parameters
----------
rec_dict: dict
Dictionary containing the counter and fcounter dataframes.

mod_name: str
Name of the darshan module.

rec_index_of_interest: int or None
If ``None``, use all records in the dataframe. Otherwise,
repack only the the record at the provided integer index.

Returns
-------
buf: Raw char array containing a buffer of record(s) or a single record.
"""
counters_df = rec_dict["counters"]
fcounters_df = rec_dict["fcounters"]
counters_n_cols = counters_df.shape[1]
fcounters_n_cols = fcounters_df.shape[1]
if rec_index_of_interest is None:
num_recs = counters_df.shape[0]
# newer pandas versions can support ...
# but we use a slice for now
rec_index_of_interest = slice(0, counters_df.shape[0])
else:
num_recs = 1
# id and rank columns are duplicated
# in counters and fcounters
rec_arr = np.recarray(shape=(num_recs), dtype=[("id", "<u8", (1,)),
("rank", "<i8", (1,)),
("counters", "<i8", (counters_n_cols - 2,)),
("fcounters", "<f8", (fcounters_n_cols - 2,))])
rec_arr.fcounters = fcounters_df.iloc[rec_index_of_interest, 2:].to_numpy()
rec_arr.counters = counters_df.iloc[rec_index_of_interest, 2:].to_numpy()
if num_recs > 1:
rec_arr.id = counters_df.iloc[rec_index_of_interest, 0].to_numpy().reshape((num_recs, 1))
rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1].to_numpy().reshape((num_recs, 1))
else:
rec_arr.id = counters_df.iloc[rec_index_of_interest, 0]
rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1]
buf = rec_arr.tobytes()
return buf
113 changes: 113 additions & 0 deletions darshan-util/pydarshan/darshan/tests/test_cffi_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from numpy.testing import assert_array_equal, assert_allclose
import darshan
import darshan.backend.cffi_backend as backend
from darshan.backend.cffi_backend import ffi, libdutil, _structdefs
from darshan.log_utils import get_log_path

def test_get_lib_version():
Expand Down Expand Up @@ -159,3 +160,115 @@ def test_log_get_generic_record(dtype):
# make sure the returned key/column names agree
assert actual_counter_names == expected_counter_names
assert actual_fcounter_names == expected_fcounter_names


@pytest.mark.parametrize("log_name", [
"imbalanced-io.darshan",
"e3sm_io_heatmap_only.darshan",
])
@pytest.mark.parametrize("module, index", [
("POSIX", 0),
("POSIX", 3),
("POSIX", 5),
("MPI-IO", 0),
("MPI-IO", 2),
# less records available for STDIO testing
# with these logs
("STDIO", 0),
])
def test_df_to_rec(log_name, index, module):
# test for packing a dataframe into a C-style record
# this is perhaps nothing more than a "round-trip" test
log_path = get_log_path(log_name)
with darshan.DarshanReport(log_path, read_all=True) as report:
report.mod_read_all_records(module, dtype="pandas")
rec_dict = report.records[module][0]

# id and rank are not formally included in the reconsituted
# (f)counters "buffer" so truncate a bit on comparison
expected_fcounters = rec_dict["fcounters"].iloc[index, 2:]
expected_counters = rec_dict["counters"].iloc[index, 2:].astype(np.int64)
expected_id = rec_dict["counters"].iloc[index, 0].astype(np.uint64)
expected_rank = rec_dict["counters"].iloc[index, 1]

# retrive the "re-packed"/actual record data:
rbuf = backend._df_to_rec(rec_dict, module, index)
rec_buf = ffi.from_buffer(_structdefs[module].replace("**", "*"), rbuf)
actual_fcounters = np.frombuffer(ffi.buffer(rec_buf[0].fcounters))
actual_counters = np.frombuffer(ffi.buffer(rec_buf[0].counters), dtype=np.int64)
actual_id = rec_buf[0].base_rec.id
actual_rank = rec_buf[0].base_rec.rank


assert_allclose(actual_fcounters, expected_fcounters)
assert_allclose(actual_counters, expected_counters)
assert actual_id == expected_id
assert actual_rank == expected_rank


@pytest.mark.parametrize("python_filter, expected_counts", [
# whether to do an initial filtering
# of the DataFrame in Python before
# packing it back into C records
(True, [18, 12, 2, 1]),
(False, [1026, 12, 2, 1]) # see gh-867
])
def test_reverse_record_array(python_filter, expected_counts):
# pack pandas DataFrame objects back into
# a contiguous buffer of several records
# and then use the darshan-util C lib accumulator
# on that record array, and compare the results
# with those discussed in gh-867 from Perl report
log_path = get_log_path("imbalanced-io.darshan")
with darshan.DarshanReport(log_path, read_all=True) as report:
nprocs = report.metadata['job']['nprocs']
modules = report.modules
report.mod_read_all_records("POSIX", dtype="pandas")
rec_dict = report.records["POSIX"][0]
counters_df = rec_dict["counters"]
fcounters_df = rec_dict["fcounters"]
if python_filter:
# gh-867 and the perl report filtered files that were
# only stat'd rather than opened, so demo the same filtering
# here at Python layer, then feed back to C accum stuff
fcounters_df = fcounters_df[counters_df["POSIX_OPENS"] > 0]
counters_df = counters_df[counters_df["POSIX_OPENS"] > 0]
rec_dict["counters"] = counters_df
rec_dict["fcounters"] = fcounters_df
num_recs = rec_dict["fcounters"].shape[0]
record_array = backend._df_to_rec(rec_dict, "POSIX")

# need to deal with the low-level C stuff to set up
# accumulator infrastructure to receive the repacked
# records
darshan_accumulator = ffi.new("darshan_accumulator *")
r = libdutil.darshan_accumulator_create(modules["POSIX"]['idx'],
nprocs,
darshan_accumulator)
assert r == 0
r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array, num_recs)
assert r_i == 0
derived_metrics = ffi.new("struct darshan_derived_metrics *")
summation_record = ffi.new(_structdefs["POSIX"].replace("**", "*"))
r = libdutil.darshan_accumulator_emit(darshan_accumulator[0],
derived_metrics,
summation_record)
assert r == 0
r = libdutil.darshan_accumulator_destroy(darshan_accumulator[0])
assert r == 0

# the indices into category_counters are pretty opaque.. we should just
# move everything to Python "eventually"... (also to avoid all the junk above after filtering..)
# 0 = total
# 1 = RO
# 2 = WO
# 3 = R/W
actual_total_files = derived_metrics.category_counters[0].count
actual_ro_files = derived_metrics.category_counters[1].count
actual_wo_files = derived_metrics.category_counters[2].count
actual_rw_files = derived_metrics.category_counters[3].count
assert_array_equal([actual_total_files,
actual_ro_files,
actual_wo_files,
actual_rw_files],
expected_counts)