diff --git a/CHANGELOG.md b/CHANGELOG.md index 42ce0be0..4bdbfef6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.13.0 [unreleased] +### Bug Fixes +1. [#164](https://github.com/influxdata/influxdb-client-python/pull/170): Skip DataFrame rows without data - all fields are nan. + + ## 1.12.0 [2020-10-30] 1. [#163](https://github.com/influxdata/influxdb-client-python/pull/163): Added support for Python 3.9 diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py index 952b5052..6abbddce 100644 --- a/influxdb_client/client/write/dataframe_serializer.py +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -37,6 +37,14 @@ def _itertuples(data_frame): return zip(data_frame.index, *cols) +def _is_nan(x): + return x != x + + +def _any_not_nan(p, indexes): + return any(map(lambda inx: not _is_nan(p[inx]), indexes)) + + def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): """Serialize DataFrame into LineProtocols.""" from ...extras import pd, np @@ -61,6 +69,7 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): tags = [] fields = [] + fields_indexes = [] keys = [] if point_settings.defaultTags: @@ -73,14 +82,18 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): keys.append(key.translate(_ESCAPE_KEY)) key_format = f'{{keys[{index}]}}' + index_value = index + 1 if key in data_frame_tag_columns: - tags.append({'key': key, 'value': f"{key_format}={{str(p[{index + 1}]).translate(_ESCAPE_KEY)}}"}) + tags.append({'key': key, 'value': f"{key_format}={{str(p[{index_value}]).translate(_ESCAPE_KEY)}}"}) elif issubclass(value.type, np.integer): - fields.append(f"{key_format}={{p[{index + 1}]}}i") + fields.append(f"{key_format}={{p[{index_value}]}}i") + fields_indexes.append(index_value) elif issubclass(value.type, (np.float, np.bool_)): - fields.append(f"{key_format}={{p[{index + 1}]}}") + fields.append(f"{key_format}={{p[{index_value}]}}") + fields_indexes.append(index_value) else: - fields.append(f"{key_format}=\"{{str(p[{index + 1}]).translate(_ESCAPE_STRING)}}\"") + fields.append(f"{key_format}=\"{{str(p[{index_value}]).translate(_ESCAPE_STRING)}}\"") + fields_indexes.append(index_value) tags.sort(key=lambda x: x['key']) tags = ','.join(map(lambda y: y['value'], tags)) @@ -100,7 +113,7 @@ def data_frame_to_list_of_points(data_frame, point_settings, **kwargs): if isnull.any(): rep = _replace(data_frame) lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p)) - for p in _itertuples(data_frame)) + for p in filter(lambda x: _any_not_nan(x, fields_indexes), _itertuples(data_frame))) return list(lp) else: return list(map(f, _itertuples(data_frame))) diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 7b190462..63de0c82 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -99,9 +99,10 @@ def test_write_nan(self): data_frame = pd.DataFrame(data=[[3.1955, np.nan, 20.514305, np.nan], [5.7310, np.nan, 23.328710, np.nan], [np.nan, 3.138664, np.nan, 20.755026], - [5.7310, 5.139563, 23.328710, 19.791240]], + [5.7310, 5.139563, 23.328710, 19.791240], + [np.nan, np.nan, np.nan, np.nan]], index=[now, now + timedelta(minutes=30), now + timedelta(minutes=60), - now + timedelta(minutes=90)], + now + timedelta(minutes=90), now + timedelta(minutes=120)], columns=["actual_kw_price", "forecast_kw_price", "actual_general_use", "forecast_general_use"])