From 948ca99939da084273f494a29ca7bcb1eefedbd9 Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Wed, 4 Jan 2023 17:26:08 -0700 Subject: [PATCH 1/9] WIP, ENH: add df->rec converter * add a per-record converter from DataFrame to darshan-util C record format that passes tests for single records (rows) with counters and fcounters (but not rank & id yet) * add a full df->multi-record converter; this one still leads to segfaults with the C inject func with stuff like: ``` ../darshan-logutils-accumulator.c:145: darshan_accumulator_inject: Assertion `rank < acc->job_nprocs' failed.` ``` * one of the two added tests passes, the other xfails, feel free to push in fixes * as discussed in meeting earlier today (Rob R. seems to agree), this likely shouldn't be a long-term solution--just temporary until we can hoist all analysis infrastructure to the Python level * some of the accumulator API prototypes are copied in here from other PRs for testing purposes only.. since the main purpose of doing something like this would be to allow filtering DataFrames and then passing the raw records to the C accum interface.. --- .../pydarshan/darshan/backend/api_def_c.py | 38 ++++++++++++ .../pydarshan/darshan/backend/cffi_backend.py | 61 +++++++++++++++++++ .../pydarshan/darshan/tests/test_cffi_misc.py | 59 ++++++++++++++++++ 3 files changed, 158 insertions(+) diff --git a/darshan-util/pydarshan/darshan/backend/api_def_c.py b/darshan-util/pydarshan/darshan/backend/api_def_c.py index 5fce60b5e..50ae9cae3 100644 --- a/darshan-util/pydarshan/darshan/backend/api_def_c.py +++ b/darshan-util/pydarshan/darshan/backend/api_def_c.py @@ -8,6 +8,30 @@ header = """/* from darshan-logutils.h */ + +struct darshan_file_category_counters { + int64_t count; /* number of files in this category */ + int64_t total_read_volume_bytes; /* total read traffic volume */ + int64_t total_write_volume_bytes;/* total write traffic volume */ + int64_t max_read_volume_bytes; /* maximum read traffic volume to 1 file */ + int64_t max_write_volume_bytes; /* maximum write traffic volume to 1 file */ + int64_t total_max_offset_bytes; /* summation of max_offsets */ + int64_t max_offset_bytes; /* largest max_offset */ + int64_t nprocs; /* how many procs accessed (-1 for "all") */ +}; + +struct darshan_derived_metrics { + int64_t total_bytes; + double unique_io_total_time_by_slowest; + double unique_rw_only_time_by_slowest; + double unique_md_only_time_by_slowest; + int unique_io_slowest_rank; + double shared_io_total_time_by_slowest; + double agg_perf_by_slowest; + double agg_time_by_slowest; + struct darshan_file_category_counters category_counters[7]; +}; + struct darshan_mnt_info { char mnt_type[3015]; @@ -23,6 +47,20 @@ int partial_flag; }; +/* opaque accumulator reference */ +struct darshan_accumulator_st; +typedef struct darshan_accumulator_st* darshan_accumulator; + +/* NOTE: darshan_module_id is technically an enum in the C API, but we'll + * just use an int for now (equivalent type) to avoid warnings from cffi + * that we have not defined explicit enum values. We don't need that + * functionality. + */ +int darshan_accumulator_create(int darshan_module_id, int64_t, darshan_accumulator*); +int darshan_accumulator_inject(darshan_accumulator, void*, int); +int darshan_accumulator_emit(darshan_accumulator, struct darshan_derived_metrics*, void* aggregation_record); +int darshan_accumulator_destroy(darshan_accumulator); + /* from darshan-log-format.h */ typedef uint64_t darshan_record_id; diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index 982fb2e86..557e2b694 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -657,3 +657,64 @@ def _log_get_heatmap_record(log): libdutil.darshan_free(buf[0]) return rec + + +def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): + mod_type = _structdefs[mod_name] + counters_n_cols = rec_dict["counters"].shape[1] + fcounters_n_cols = rec_dict["fcounters"].shape[1] + size = (counters_n_cols + + fcounters_n_cols - + # id and rank columns are duplicated + # in counters and fcounters + 2) + rbuf_orig = np.empty(shape=(size,), dtype=np.float64) + rbuf = ffi.cast(mod_type, ffi.from_buffer(rbuf_orig)) + rbuf[0] = ffi.from_buffer(rbuf_orig) + for counter_name, dtype, cols in zip(["counters", "fcounters"], + [np.int64, np.float64], + [counters_n_cols, fcounters_n_cols]): + df = rec_dict[counter_name] + record = df.iloc[rec_index_of_interest, ...] + data = np.ascontiguousarray(record[:cols - 2].to_numpy(), dtype=dtype) + if counter_name == "fcounters": + rbuf[0].fcounters = data.tolist() + else: + rbuf[0].counters = data.tolist() + #rbuf[0].base_rec.rank = np.int64(record.loc["rank"]) + #rbuf[0].base_rec.id = np.uint64(record.id) + return rbuf + + +def _df_to_rec_array(rec_dict, mod_name): + # same _df_to_rec, but first allocate a contiguous + # buffer into which we can add ALL the records + # in the pandas data structures + # TODO: looping like this in Python/pandas is + # inefficient for many reasons--we really should + # be doing the data analysis on the DataFrame + # long term rather than moving data backwards + # like this to the C layer + counters_df = rec_dict["counters"] + fcounters_df = rec_dict["fcounters"] + num_recs_counters = counters_df.shape[0] + num_recs_fcounters = fcounters_df.shape[0] + size = (counters_df.shape[1] + + fcounters_df.shape[1] - + # id and rank columns are duplicated + # in counters and fcounters + 2) + if num_recs_counters != num_recs_fcounters: + raise ValueError(f"{num_recs_counters} counter records " + f"but {num_recs_fcounters} fcounter records") + record_array = np.empty(shape=size * num_recs_counters, dtype=np.float64) + # NOTE: we may be able to slurp the data in as a single + # buffer rather than iterating, but moving data from Python back to C + # layer shouldn't be relied upon long-term anyway + pos = 0 + for i, rec_num in enumerate(range(num_recs_counters)): + record_array[pos:pos + size] = np.frombuffer(ffi.buffer(_df_to_rec(rec_dict=rec_dict, + mod_name=mod_name, + rec_index_of_interest=i))) + pos += size + return record_array diff --git a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py index 4c95d52f2..f7ba8d772 100644 --- a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py +++ b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py @@ -9,6 +9,7 @@ from numpy.testing import assert_array_equal, assert_allclose import darshan import darshan.backend.cffi_backend as backend +from darshan.backend.cffi_backend import ffi, libdutil from darshan.log_utils import get_log_path def test_get_lib_version(): @@ -159,3 +160,61 @@ def test_log_get_generic_record(dtype): # make sure the returned key/column names agree assert actual_counter_names == expected_counter_names assert actual_fcounter_names == expected_fcounter_names + + +@pytest.mark.parametrize("log_name", [ + "imbalanced-io.darshan", + "e3sm_io_heatmap_only.darshan", + ]) +@pytest.mark.parametrize("module, index", [ + ("POSIX", 0), + ("POSIX", 3), + ("POSIX", 5), + # less records available for STDIO testing + # with these logs + ("STDIO", 0), + ]) +def test_df_to_rec(log_name, index, module): + # test for packing a dataframe into a C-style record + # this is perhaps nothing more than a "round-trip" test + log_path = get_log_path(log_name) + with darshan.DarshanReport(log_path, read_all=True) as report: + report.mod_read_all_records(module, dtype="pandas") + rec_dict = report.records[module][0] + # id and rank are not formally included in the reconsituted + # (f)counters "buffer" so truncate a bit on comparison + expected_fcounters = rec_dict["fcounters"].iloc[index, :-2] + expected_counters = rec_dict["counters"].iloc[index, :-2].astype(np.int64) + rbuf = backend._df_to_rec(rec_dict, module, index) + actual_fcounters = np.frombuffer(ffi.buffer(rbuf[0].fcounters)) + actual_counters = np.frombuffer(ffi.buffer(rbuf[0].counters), dtype=np.int64) + assert_allclose(actual_fcounters, expected_fcounters) + assert_allclose(actual_counters, expected_counters) + + +@pytest.mark.xfail(run=False, + reason="Segfault: darshan-logutils-accumulator.c:145: darshan_accumulator_inject: Assertion `rank < acc->job_nprocs' failed") +def test_reverse_record_array(): + # pack pandas DataFrame objects back into + # a contiguous buffer of several records + # and then use the darshan-util C lib accumulator + # on that record array, and compare the results + # with those discussed in gh-867 from Perl report + log_path = get_log_path("imbalanced-io.darshan") + with darshan.DarshanReport(log_path, read_all=True) as report: + report.mod_read_all_records("POSIX", dtype="pandas") + rec_dict = report.records["POSIX"][0] + record_array = backend._df_to_rec_array(rec_dict, "POSIX") + num_recs = rec_dict["fcounters"].shape[0] + + # need to deal with the low-level C stuff... + log_handle = backend.log_open(log_path) + jobrec = ffi.new("struct darshan_job *") + libdutil.darshan_log_get_job(log_handle['handle'], jobrec) + modules = backend.log_get_modules(log_handle) + darshan_accumulator = ffi.new("darshan_accumulator *") + r = libdutil.darshan_accumulator_create(modules["POSIX"]['idx'], + jobrec[0].nprocs, + darshan_accumulator) + record_array = ffi.from_buffer(record_array) + r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array[0], num_recs) From f730f3a4e37034324ec3407146964dc118c17186 Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Thu, 12 Jan 2023 15:48:00 -0700 Subject: [PATCH 2/9] MAINT: PR 886 revisions * ` _df_to_rec()` has been simplified and is far more robust/correct now that we use use a `recarray`; a separate function for multiple records was no longer needed so it was purged * cleanups in `test_df_to_rec()` and, add checks for id/rank as well now that they are properly re-packed * `test_reverse_record_array()` now demonstrates the Python filter df -> C accum workflow much more clearly; it still has some kind of memory issue/discrepancy with gh-867, but much closer now... --- .../pydarshan/darshan/backend/cffi_backend.py | 105 +++++++++--------- .../pydarshan/darshan/tests/test_cffi_misc.py | 62 +++++++++-- 2 files changed, 104 insertions(+), 63 deletions(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index 557e2b694..d4fa0e2f3 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -660,61 +660,56 @@ def _log_get_heatmap_record(log): def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): + """ + Pack the DataFrames-format PyDarshan data back into + a C buffer of records that can be consumed by darshan-util + C code. + + Parameters + ---------- + rec_dict: dict + Dictionary containing the counter and fcounter dataframes. + + mod_name: str + Name of the darshan module. + + rec_index_of_interest: int or None + If ``None``, use all records in the dataframe. Otherwise, + repack only the the record at the provided integer index. + + Returns + ------- + buf: CFFI data object containing a buffer of record(s) or a single + record; when multiple records are present this will effectively + be a raw char array; with a single record, it should be cast + to a struct of the appropriate type + """ mod_type = _structdefs[mod_name] - counters_n_cols = rec_dict["counters"].shape[1] - fcounters_n_cols = rec_dict["fcounters"].shape[1] - size = (counters_n_cols + - fcounters_n_cols - - # id and rank columns are duplicated - # in counters and fcounters - 2) - rbuf_orig = np.empty(shape=(size,), dtype=np.float64) - rbuf = ffi.cast(mod_type, ffi.from_buffer(rbuf_orig)) - rbuf[0] = ffi.from_buffer(rbuf_orig) - for counter_name, dtype, cols in zip(["counters", "fcounters"], - [np.int64, np.float64], - [counters_n_cols, fcounters_n_cols]): - df = rec_dict[counter_name] - record = df.iloc[rec_index_of_interest, ...] - data = np.ascontiguousarray(record[:cols - 2].to_numpy(), dtype=dtype) - if counter_name == "fcounters": - rbuf[0].fcounters = data.tolist() - else: - rbuf[0].counters = data.tolist() - #rbuf[0].base_rec.rank = np.int64(record.loc["rank"]) - #rbuf[0].base_rec.id = np.uint64(record.id) - return rbuf - - -def _df_to_rec_array(rec_dict, mod_name): - # same _df_to_rec, but first allocate a contiguous - # buffer into which we can add ALL the records - # in the pandas data structures - # TODO: looping like this in Python/pandas is - # inefficient for many reasons--we really should - # be doing the data analysis on the DataFrame - # long term rather than moving data backwards - # like this to the C layer counters_df = rec_dict["counters"] fcounters_df = rec_dict["fcounters"] - num_recs_counters = counters_df.shape[0] - num_recs_fcounters = fcounters_df.shape[0] - size = (counters_df.shape[1] + - fcounters_df.shape[1] - - # id and rank columns are duplicated - # in counters and fcounters - 2) - if num_recs_counters != num_recs_fcounters: - raise ValueError(f"{num_recs_counters} counter records " - f"but {num_recs_fcounters} fcounter records") - record_array = np.empty(shape=size * num_recs_counters, dtype=np.float64) - # NOTE: we may be able to slurp the data in as a single - # buffer rather than iterating, but moving data from Python back to C - # layer shouldn't be relied upon long-term anyway - pos = 0 - for i, rec_num in enumerate(range(num_recs_counters)): - record_array[pos:pos + size] = np.frombuffer(ffi.buffer(_df_to_rec(rec_dict=rec_dict, - mod_name=mod_name, - rec_index_of_interest=i))) - pos += size - return record_array + counters_n_cols = counters_df.shape[1] + fcounters_n_cols = fcounters_df.shape[1] + if rec_index_of_interest is None: + num_recs = counters_df.shape[0] + rec_index_of_interest = ... + else: + num_recs = 1 + # id and rank columns are duplicated + # in counters and fcounters + rec_arr = np.recarray(shape=(num_recs), dtype=[("id", " 1: + rec_arr.id = counters_df.iloc[rec_index_of_interest, 0].to_numpy().reshape((num_recs, 1)) + rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1].to_numpy().reshape((num_recs, 1)) + buf = ffi.new("char[]", (counters_n_cols + fcounters_n_cols - 2) * num_recs * 8) + buf = rec_arr.tobytes() + else: + rec_arr.id = counters_df.iloc[rec_index_of_interest, 0] + rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1] + buf = ffi.new(mod_type) + buf[0] = ffi.from_buffer(rec_arr) + return buf diff --git a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py index f7ba8d772..d17dedc4a 100644 --- a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py +++ b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py @@ -181,19 +181,28 @@ def test_df_to_rec(log_name, index, module): with darshan.DarshanReport(log_path, read_all=True) as report: report.mod_read_all_records(module, dtype="pandas") rec_dict = report.records[module][0] + # id and rank are not formally included in the reconsituted # (f)counters "buffer" so truncate a bit on comparison - expected_fcounters = rec_dict["fcounters"].iloc[index, :-2] - expected_counters = rec_dict["counters"].iloc[index, :-2].astype(np.int64) + expected_fcounters = rec_dict["fcounters"].iloc[index, 2:] + expected_counters = rec_dict["counters"].iloc[index, 2:].astype(np.int64) + expected_id = rec_dict["counters"].iloc[index, 0].astype(np.uint64) + expected_rank = rec_dict["counters"].iloc[index, 1] + + # retrive the "re-packed"/actual record data: rbuf = backend._df_to_rec(rec_dict, module, index) actual_fcounters = np.frombuffer(ffi.buffer(rbuf[0].fcounters)) actual_counters = np.frombuffer(ffi.buffer(rbuf[0].counters), dtype=np.int64) + actual_id = rbuf[0].base_rec.id + actual_rank = rbuf[0].base_rec.rank + + assert_allclose(actual_fcounters, expected_fcounters) assert_allclose(actual_counters, expected_counters) + assert actual_id == expected_id + assert actual_rank == expected_rank -@pytest.mark.xfail(run=False, - reason="Segfault: darshan-logutils-accumulator.c:145: darshan_accumulator_inject: Assertion `rank < acc->job_nprocs' failed") def test_reverse_record_array(): # pack pandas DataFrame objects back into # a contiguous buffer of several records @@ -204,10 +213,24 @@ def test_reverse_record_array(): with darshan.DarshanReport(log_path, read_all=True) as report: report.mod_read_all_records("POSIX", dtype="pandas") rec_dict = report.records["POSIX"][0] - record_array = backend._df_to_rec_array(rec_dict, "POSIX") + counters_df = rec_dict["counters"] + fcounters_df = rec_dict["fcounters"] + # gh-867 and the perl report filtered files that were + # only stat'd rather than opened, so demo the same filtering + # here at Python layer, then feed back to C accum stuff + fcounters_df = fcounters_df[counters_df["POSIX_OPENS"] > 0] + counters_df = counters_df[counters_df["POSIX_OPENS"] > 0] + rec_dict["counters"] = counters_df + rec_dict["fcounters"] = fcounters_df + record_array = backend._df_to_rec(rec_dict, "POSIX") num_recs = rec_dict["fcounters"].shape[0] + mod_type = backend._structdefs["POSIX"] + buf = ffi.new("void **") + rbuf = ffi.cast(mod_type, buf) - # need to deal with the low-level C stuff... + # need to deal with the low-level C stuff to set up + # accumulator infrastructure to receive the repacked + # records log_handle = backend.log_open(log_path) jobrec = ffi.new("struct darshan_job *") libdutil.darshan_log_get_job(log_handle['handle'], jobrec) @@ -216,5 +239,28 @@ def test_reverse_record_array(): r = libdutil.darshan_accumulator_create(modules["POSIX"]['idx'], jobrec[0].nprocs, darshan_accumulator) - record_array = ffi.from_buffer(record_array) - r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array[0], num_recs) + assert r == 0 + r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array, num_recs) + assert r_i == 0 + darshan_derived_metrics = ffi.new("struct darshan_derived_metrics *") + r = libdutil.darshan_accumulator_emit(darshan_accumulator[0], + darshan_derived_metrics, + rbuf) + assert r == 0 + # the indices into category_counters are pretty opaque.. we should just + # move everything to Python eventually... (also to avoid all the junk above when filtering..) + # 0 = total + # 1 = RO + # 2 = WO + # 3 = R/W + expected_total_files = 28 # see gh-867 + expected_ro_files = 13 # see gh-867 + expected_wo_files = 12 # see gh-867 + actual_total_files = darshan_derived_metrics.category_counters[0].count + actual_ro_files = darshan_derived_metrics.category_counters[1].count + actual_wo_files = darshan_derived_metrics.category_counters[2].count + libdutil.darshan_free(buf[0]) + backend.log_close(log_handle) + assert actual_total_files == expected_total_files + assert actual_ro_files == expected_ro_files + assert actual_wo_files == expected_wo_files From 487623ffac49d5526da6f3446bd8c43d4b89e0a2 Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Mon, 16 Jan 2023 09:48:40 -0700 Subject: [PATCH 3/9] MAINT: PR 886 revisions * fix up `test_reverse_record_array` memory issues, parametrize it over scenarios that do/don't match Perl infra, and some cleanups to that test --- .../pydarshan/darshan/tests/test_cffi_misc.py | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py index d17dedc4a..d2ddffa03 100644 --- a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py +++ b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py @@ -203,7 +203,15 @@ def test_df_to_rec(log_name, index, module): assert actual_rank == expected_rank -def test_reverse_record_array(): +@pytest.mark.parametrize("python_filter, expected_counts", [ + # whether to do an initial filtering + # of the DataFrame in Python before + # packing it back into C records + pytest.param(True, [28, 13, 12, 1], + marks=pytest.mark.xfail(reason="mismatch perl report")), + (False, [1026, 12, 2, 1]) # see gh-867 + ]) +def test_reverse_record_array(python_filter, expected_counts): # pack pandas DataFrame objects back into # a contiguous buffer of several records # and then use the darshan-util C lib accumulator @@ -215,18 +223,17 @@ def test_reverse_record_array(): rec_dict = report.records["POSIX"][0] counters_df = rec_dict["counters"] fcounters_df = rec_dict["fcounters"] - # gh-867 and the perl report filtered files that were - # only stat'd rather than opened, so demo the same filtering - # here at Python layer, then feed back to C accum stuff - fcounters_df = fcounters_df[counters_df["POSIX_OPENS"] > 0] - counters_df = counters_df[counters_df["POSIX_OPENS"] > 0] - rec_dict["counters"] = counters_df - rec_dict["fcounters"] = fcounters_df - record_array = backend._df_to_rec(rec_dict, "POSIX") + if python_filter: + # gh-867 and the perl report filtered files that were + # only stat'd rather than opened, so demo the same filtering + # here at Python layer, then feed back to C accum stuff + fcounters_df = fcounters_df[counters_df["POSIX_OPENS"] > 0] + counters_df = counters_df[counters_df["POSIX_OPENS"] > 0] + rec_dict["counters"] = counters_df + rec_dict["fcounters"] = fcounters_df num_recs = rec_dict["fcounters"].shape[0] - mod_type = backend._structdefs["POSIX"] - buf = ffi.new("void **") - rbuf = ffi.cast(mod_type, buf) + record_array = backend._df_to_rec(rec_dict, "POSIX") + rbuf = backend._df_to_rec(rec_dict, "POSIX", 0) # need to deal with the low-level C stuff to set up # accumulator infrastructure to receive the repacked @@ -245,22 +252,24 @@ def test_reverse_record_array(): darshan_derived_metrics = ffi.new("struct darshan_derived_metrics *") r = libdutil.darshan_accumulator_emit(darshan_accumulator[0], darshan_derived_metrics, - rbuf) + rbuf[0]) assert r == 0 + backend.log_close(log_handle) + # NOTE: freeing rbuf[0] manually can cause + # segfaults here... + # the indices into category_counters are pretty opaque.. we should just - # move everything to Python eventually... (also to avoid all the junk above when filtering..) + # move everything to Python "eventually"... (also to avoid all the junk above after filtering..) # 0 = total # 1 = RO # 2 = WO # 3 = R/W - expected_total_files = 28 # see gh-867 - expected_ro_files = 13 # see gh-867 - expected_wo_files = 12 # see gh-867 actual_total_files = darshan_derived_metrics.category_counters[0].count actual_ro_files = darshan_derived_metrics.category_counters[1].count actual_wo_files = darshan_derived_metrics.category_counters[2].count - libdutil.darshan_free(buf[0]) - backend.log_close(log_handle) - assert actual_total_files == expected_total_files - assert actual_ro_files == expected_ro_files - assert actual_wo_files == expected_wo_files + actual_rw_files = darshan_derived_metrics.category_counters[3].count + assert_array_equal([actual_total_files, + actual_ro_files, + actual_wo_files, + actual_rw_files], + expected_counts) From ac71ed54ae155b62ed70d06594bd2c45377a04fd Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Mon, 16 Jan 2023 10:49:28 -0700 Subject: [PATCH 4/9] MAINT: PR 886 revisions * support older `pandas` versions with the slicing index in `_df_to_rec()` --- darshan-util/pydarshan/darshan/backend/cffi_backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index d4fa0e2f3..2f3a6a44a 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -691,7 +691,9 @@ def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): fcounters_n_cols = fcounters_df.shape[1] if rec_index_of_interest is None: num_recs = counters_df.shape[0] - rec_index_of_interest = ... + # newer pandas versions can support ... + # but we use a slice for now + rec_index_of_interest = slice(0, counters_df.shape[0]) else: num_recs = 1 # id and rank columns are duplicated From 1b4945b5f72db4fa550e3c6a158e230739f46789 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Mon, 6 Feb 2023 17:00:14 -0600 Subject: [PATCH 5/9] add MPI-IO test cases --- darshan-util/pydarshan/darshan/tests/test_cffi_misc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py index d2ddffa03..48e8879e3 100644 --- a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py +++ b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py @@ -170,6 +170,8 @@ def test_log_get_generic_record(dtype): ("POSIX", 0), ("POSIX", 3), ("POSIX", 5), + ("MPI-IO", 0), + ("MPI-IO", 2), # less records available for STDIO testing # with these logs ("STDIO", 0), From 72e7883182a4172a8641dfa147a6fda4f343bb63 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Tue, 7 Feb 2023 17:37:52 -0600 Subject: [PATCH 6/9] fixup test_reverse_record_array test * modified expected values for case where filtering is applied, and removed xfail tag - the expected values from the Perl summary report sum the derived metrics across the POSIX and STDIO modules, which is why the old expected values are larger than the values that are now passing * make sure to call accumulator_destroy to avoid mem leaks * cache nprocs/modules so we don't need to reopen the log --- .../pydarshan/darshan/tests/test_cffi_misc.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py index 48e8879e3..2cd91c164 100644 --- a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py +++ b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py @@ -209,8 +209,7 @@ def test_df_to_rec(log_name, index, module): # whether to do an initial filtering # of the DataFrame in Python before # packing it back into C records - pytest.param(True, [28, 13, 12, 1], - marks=pytest.mark.xfail(reason="mismatch perl report")), + (True, [18, 12, 2, 1]), (False, [1026, 12, 2, 1]) # see gh-867 ]) def test_reverse_record_array(python_filter, expected_counts): @@ -221,6 +220,8 @@ def test_reverse_record_array(python_filter, expected_counts): # with those discussed in gh-867 from Perl report log_path = get_log_path("imbalanced-io.darshan") with darshan.DarshanReport(log_path, read_all=True) as report: + nprocs = report.metadata['job']['nprocs'] + modules = report.modules report.mod_read_all_records("POSIX", dtype="pandas") rec_dict = report.records["POSIX"][0] counters_df = rec_dict["counters"] @@ -240,13 +241,9 @@ def test_reverse_record_array(python_filter, expected_counts): # need to deal with the low-level C stuff to set up # accumulator infrastructure to receive the repacked # records - log_handle = backend.log_open(log_path) - jobrec = ffi.new("struct darshan_job *") - libdutil.darshan_log_get_job(log_handle['handle'], jobrec) - modules = backend.log_get_modules(log_handle) darshan_accumulator = ffi.new("darshan_accumulator *") r = libdutil.darshan_accumulator_create(modules["POSIX"]['idx'], - jobrec[0].nprocs, + nprocs, darshan_accumulator) assert r == 0 r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array, num_recs) @@ -256,7 +253,8 @@ def test_reverse_record_array(python_filter, expected_counts): darshan_derived_metrics, rbuf[0]) assert r == 0 - backend.log_close(log_handle) + r = libdutil.darshan_accumulator_destroy(darshan_accumulator[0]) + assert r == 0 # NOTE: freeing rbuf[0] manually can cause # segfaults here... From 79991d7df875c332bd6e0c1b4ff8b0bed4e27ae3 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Wed, 8 Feb 2023 13:10:18 -0600 Subject: [PATCH 7/9] revisions for `_df_to_rec()` * always return a Python bytes object (return value from recarray tobytes() method) from _df_to_rec * adjust `test_df_to_rec()` to "cast" from returned bytes to a record pointer we can use to sanity check record fields * fix `test_reverse_record_array()` usage of 'summation_record' argument to C library `accumulator_emit()` routine - 'summation_record' is a pointer to a record structure for the corresponding module to write an aggregate record (i.e., a record accumulated from all injected records) - explicitly allocate a 'summation_record' and pass it to the emit routine, similar to what is done with the derived_metrics structure - remove unnecessary secondary call in to `_df_to_rec()` --- .../pydarshan/darshan/backend/cffi_backend.py | 5 +--- .../pydarshan/darshan/tests/test_cffi_misc.py | 29 +++++++++---------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index 2f3a6a44a..e6d928652 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -707,11 +707,8 @@ def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): if num_recs > 1: rec_arr.id = counters_df.iloc[rec_index_of_interest, 0].to_numpy().reshape((num_recs, 1)) rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1].to_numpy().reshape((num_recs, 1)) - buf = ffi.new("char[]", (counters_n_cols + fcounters_n_cols - 2) * num_recs * 8) - buf = rec_arr.tobytes() else: rec_arr.id = counters_df.iloc[rec_index_of_interest, 0] rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1] - buf = ffi.new(mod_type) - buf[0] = ffi.from_buffer(rec_arr) + buf = rec_arr.tobytes() return buf diff --git a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py index 2cd91c164..f309dcb99 100644 --- a/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py +++ b/darshan-util/pydarshan/darshan/tests/test_cffi_misc.py @@ -9,7 +9,7 @@ from numpy.testing import assert_array_equal, assert_allclose import darshan import darshan.backend.cffi_backend as backend -from darshan.backend.cffi_backend import ffi, libdutil +from darshan.backend.cffi_backend import ffi, libdutil, _structdefs from darshan.log_utils import get_log_path def test_get_lib_version(): @@ -193,10 +193,11 @@ def test_df_to_rec(log_name, index, module): # retrive the "re-packed"/actual record data: rbuf = backend._df_to_rec(rec_dict, module, index) - actual_fcounters = np.frombuffer(ffi.buffer(rbuf[0].fcounters)) - actual_counters = np.frombuffer(ffi.buffer(rbuf[0].counters), dtype=np.int64) - actual_id = rbuf[0].base_rec.id - actual_rank = rbuf[0].base_rec.rank + rec_buf = ffi.from_buffer(_structdefs[module].replace("**", "*"), rbuf) + actual_fcounters = np.frombuffer(ffi.buffer(rec_buf[0].fcounters)) + actual_counters = np.frombuffer(ffi.buffer(rec_buf[0].counters), dtype=np.int64) + actual_id = rec_buf[0].base_rec.id + actual_rank = rec_buf[0].base_rec.rank assert_allclose(actual_fcounters, expected_fcounters) @@ -236,7 +237,6 @@ def test_reverse_record_array(python_filter, expected_counts): rec_dict["fcounters"] = fcounters_df num_recs = rec_dict["fcounters"].shape[0] record_array = backend._df_to_rec(rec_dict, "POSIX") - rbuf = backend._df_to_rec(rec_dict, "POSIX", 0) # need to deal with the low-level C stuff to set up # accumulator infrastructure to receive the repacked @@ -248,15 +248,14 @@ def test_reverse_record_array(python_filter, expected_counts): assert r == 0 r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array, num_recs) assert r_i == 0 - darshan_derived_metrics = ffi.new("struct darshan_derived_metrics *") + derived_metrics = ffi.new("struct darshan_derived_metrics *") + summation_record = ffi.new(_structdefs["POSIX"].replace("**", "*")) r = libdutil.darshan_accumulator_emit(darshan_accumulator[0], - darshan_derived_metrics, - rbuf[0]) + derived_metrics, + summation_record) assert r == 0 r = libdutil.darshan_accumulator_destroy(darshan_accumulator[0]) assert r == 0 - # NOTE: freeing rbuf[0] manually can cause - # segfaults here... # the indices into category_counters are pretty opaque.. we should just # move everything to Python "eventually"... (also to avoid all the junk above after filtering..) @@ -264,10 +263,10 @@ def test_reverse_record_array(python_filter, expected_counts): # 1 = RO # 2 = WO # 3 = R/W - actual_total_files = darshan_derived_metrics.category_counters[0].count - actual_ro_files = darshan_derived_metrics.category_counters[1].count - actual_wo_files = darshan_derived_metrics.category_counters[2].count - actual_rw_files = darshan_derived_metrics.category_counters[3].count + actual_total_files = derived_metrics.category_counters[0].count + actual_ro_files = derived_metrics.category_counters[1].count + actual_wo_files = derived_metrics.category_counters[2].count + actual_rw_files = derived_metrics.category_counters[3].count assert_array_equal([actual_total_files, actual_ro_files, actual_wo_files, From dfc64d7c937480e04f958d1c962ed9f0aee47b41 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Wed, 8 Feb 2023 13:33:39 -0600 Subject: [PATCH 8/9] remove unused var --- darshan-util/pydarshan/darshan/backend/cffi_backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index e6d928652..d93e94f2c 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -684,7 +684,6 @@ def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): be a raw char array; with a single record, it should be cast to a struct of the appropriate type """ - mod_type = _structdefs[mod_name] counters_df = rec_dict["counters"] fcounters_df = rec_dict["fcounters"] counters_n_cols = counters_df.shape[1] From cd0143eaf3875154198cc890c6dc2c1ee50da667 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Wed, 8 Feb 2023 14:08:55 -0600 Subject: [PATCH 9/9] clarify docstring for `_df_to_rec()` --- darshan-util/pydarshan/darshan/backend/cffi_backend.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index d93e94f2c..6e6d76f4c 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -679,10 +679,7 @@ def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): Returns ------- - buf: CFFI data object containing a buffer of record(s) or a single - record; when multiple records are present this will effectively - be a raw char array; with a single record, it should be cast - to a struct of the appropriate type + buf: Raw char array containing a buffer of record(s) or a single record. """ counters_df = rec_dict["counters"] fcounters_df = rec_dict["fcounters"]