Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ClpKeyValuePairStreamHandler which supports logging dictionary-type log events into CLP's key-value pair IR format. #46

Merged
merged 26 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
51636a1
Implement kv pair handler
LinZhihao-723 Nov 28, 2024
e03b6d7
Add unit tests
LinZhihao-723 Nov 29, 2024
5a51940
Fix handlers
LinZhihao-723 Nov 29, 2024
919a15a
Add empty test
LinZhihao-723 Nov 29, 2024
a1ed017
Merge oss-main
LinZhihao-723 Nov 29, 2024
5a53fe5
Add loglevel timeout test cases
LinZhihao-723 Nov 30, 2024
048531d
Skip tests for macos
LinZhihao-723 Nov 30, 2024
ec36f20
Add tests for bad timeout dicts
LinZhihao-723 Nov 30, 2024
d414d65
Fix capitalization after :
LinZhihao-723 Dec 3, 2024
06608a7
Missing these two...
LinZhihao-723 Dec 3, 2024
096c3ce
Update `auto_generated_kv_pairs_utils` based on the review comments.
LinZhihao-723 Feb 3, 2025
e854fa1
Apply suggestions from code review
LinZhihao-723 Feb 3, 2025
b310993
Apply code review comments
LinZhihao-723 Feb 7, 2025
b039278
Apply black
LinZhihao-723 Feb 7, 2025
715961b
Fix utc offset calculation.
LinZhihao-723 Feb 8, 2025
5d8cd33
Use io_open
LinZhihao-723 Feb 8, 2025
f270b39
Apply code review comments
LinZhihao-723 Feb 8, 2025
1bcf7d7
Empty test
LinZhihao-723 Feb 8, 2025
2f03629
Merge branch 'oss-main' into kv-pair-handler
LinZhihao-723 Feb 8, 2025
fe77d91
Apply suggestions from code review
LinZhihao-723 Feb 16, 2025
5ab34d4
Fix version specifier
LinZhihao-723 Feb 16, 2025
9f96dda
Fix linter...
LinZhihao-723 Feb 16, 2025
47d3af7
Update 's docstring
LinZhihao-723 Feb 16, 2025
306fba6
Update src/clp_logging/handlers.py
LinZhihao-723 Feb 17, 2025
1ea69de
Apply code review comments
LinZhihao-723 Feb 17, 2025
cb8be0a
Apply code review comment for missed renaming
LinZhihao-723 Feb 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
requires-python = ">=3.7"
dependencies = [
"backports.zoneinfo >= 0.2.1; python_version < '3.9'",
"clp-ffi-py >= 0.0.11",
"clp-ffi-py >= 0.0.14",
"typing-extensions >= 3.7.4",
"tzlocal == 5.1; python_version < '3.8'",
"tzlocal >= 5.2; python_version >= '3.8'",
Expand Down
68 changes: 68 additions & 0 deletions src/clp_logging/auto_generated_kv_pairs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
from typing import Any, Dict

from clp_logging.utils import Timestamp

TIMESTAMP_KEY: str = "timestamp"
TIMESTAMP_UNIX_MILLISECS_KEY: str = "unix_millisecs"
TIMESTAMP_UTC_OFFSET_SECS_KEY: str = "utc_offset_secs"

LEVEL_KEY: str = "level"
LEVEL_NUM_KEY: str = "num"
LEVEL_NAME_KEY: str = "name"

SOURCE_LOCATION_KEY: str = "source_location"
SOURCE_LOCATION_PATH_KEY: str = "path"
SOURCE_LOCATION_LINE_KEY: str = "line"


class AutoGeneratedKeyValuePairsBuffer:
"""
A reusable buffer for auto-generated key-value pairs.

This buffer maintains a predefined dictionary for common metadata fields, to
enable efficient reuse without creating new dictionaries for each log event.
"""

def __init__(self) -> None:
self._buf: Dict[str, Any] = {
TIMESTAMP_KEY: {
TIMESTAMP_UNIX_MILLISECS_KEY: None,
TIMESTAMP_UTC_OFFSET_SECS_KEY: None,
},
LEVEL_KEY: {
LEVEL_NUM_KEY: None,
LEVEL_NAME_KEY: None,
},
SOURCE_LOCATION_KEY: {
SOURCE_LOCATION_PATH_KEY: None,
SOURCE_LOCATION_LINE_KEY: None,
},
}

def generate(self, ts: Timestamp, record: logging.LogRecord) -> Dict[str, Any]:
"""
Generates the auto-generated key-value pairs by populating the
underlying buffer with the given log event metadata.

:param ts: The timestamp assigned to the log event.
:param record: The LogRecord containing metadata for the log event.
:return: The populated underlying buffer as the auto-generated key-value
pairs.
"""

self._buf[TIMESTAMP_KEY][TIMESTAMP_UNIX_MILLISECS_KEY] = ts.get_unix_ts()
self._buf[TIMESTAMP_KEY][TIMESTAMP_UTC_OFFSET_SECS_KEY] = ts.get_utc_offset()

# NOTE: We don't add all the metadata contained in `record`. Instead, we only add the
# following fields:
# - Log level
# - Source context

self._buf[LEVEL_KEY][LEVEL_NUM_KEY] = record.levelno
self._buf[LEVEL_KEY][LEVEL_NAME_KEY] = record.levelname

self._buf[SOURCE_LOCATION_KEY][SOURCE_LOCATION_PATH_KEY] = record.pathname
self._buf[SOURCE_LOCATION_KEY][SOURCE_LOCATION_LINE_KEY] = record.lineno

return self._buf
183 changes: 178 additions & 5 deletions src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
import socket
import sys
import time
import warnings
from abc import ABCMeta, abstractmethod
from contextlib import nullcontext
from math import floor
from pathlib import Path
from queue import Empty, Queue
from signal import SIGINT, signal, SIGTERM
from threading import Thread, Timer
from types import FrameType
from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union

import tzlocal
from clp_ffi_py.ir import FourByteEncoder
from clp_ffi_py.ir import FourByteEncoder, Serializer
from clp_ffi_py.utils import serialize_dict_to_msgpack
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor

from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer
from clp_logging.protocol import (
BYTE_ORDER,
EOF_CHAR,
Expand All @@ -25,12 +29,15 @@
UINT_MAX,
ULONG_MAX,
)
from clp_logging.utils import Timestamp

# TODO: lock writes to zstream if GIL ever goes away
# Note: no need to quote "Queue[Tuple[int, bytes]]" in python 3.9

DEFAULT_LOG_FORMAT: str = " %(levelname)s %(name)s %(message)s"
WARN_PREFIX: str = " [WARN][clp_logging]"
AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs"
USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs"


def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]:
Expand Down Expand Up @@ -129,9 +136,9 @@ def _write(self, loglevel: int, msg: str) -> None:
# override
def emit(self, record: logging.LogRecord) -> None:
"""
Override `logging.Handler.emit` in base class to ensure
`logging.Handler.handleError` is always called and avoid requiring a
`logging.LogRecord` to call internal writing functions.
Implements `logging.Handler.emit` to ensure
`logging.Handler.handleError` is always called and so derived classes
only need to implement `_write` instead of implementing this method.
"""
msg: str = self.format(record) + "\n"
try:
Expand Down Expand Up @@ -792,3 +799,169 @@ def __init__(
super().__init__(
open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout
)


class ClpKeyValuePairStreamHandler(logging.Handler):
"""
A custom logging handler that serializes key-value pair log events into the
CLP key-value pair IR format.

Differences from `logging.StreamHandler`:

- Log events (`logging.LogRecord`) should contain the key-value pairs that a user wants to log
as a Python dictionary.
- As a result, the key-value pairs will not be formatted into a string before being written.
- The key-value pairs will be serialized into the CLP key-value pair IR format before writing to
the stream.

Key-value pairs in the log event must abide by the following rules:
- Keys must be of type `str`.
- Values must be one of the following types:
- Primitives: `int`, `float`, `str`, `bool`, or `None`.
- Arrays, where each array:
- may contain primitive values, dictionaries, or nested arrays.
- can be empty.
- Dictionaries, where each dictionary:
- must adhere to the aforementioned rules for keys and values.
- can be empty.

:param stream: A writable byte output stream to which the handler will write the serialized IR
byte sequences.
:param enable_compression: Whether to compress the serialized IR byte sequences using Zstandard.
"""

def __init__(
self,
stream: IO[bytes],
enable_compression: bool = True,
) -> None:
super().__init__()

self._enable_compression: bool = enable_compression
self._serializer: Optional[Serializer] = None
self._formatter: Optional[logging.Formatter] = None
self._ostream: IO[bytes] = stream

self._auto_gen_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = (
AutoGeneratedKeyValuePairsBuffer()
)

self._init_new_serializer(stream)

# override
def setFormatter(self, fmt: Optional[logging.Formatter]) -> None:
if fmt is None:
return
warnings.warn(
f"{self.__class__.__name__} doesn't currently support Formatters",
category=RuntimeWarning,
)
self._formatter = fmt

# override
def emit(self, record: logging.LogRecord) -> None:
"""
Implements `logging.Handler.emit` to encode the given record into CLP's
IR format before it's written to the underlying stream.

:param record: The log event to serialize.
"""
try:
self._write(record)
except Exception:
self.handleError(record)

# override
def setStream(self, stream: IO[bytes]) -> Optional[IO[bytes]]:
"""
Sets the instance's stream to the given value, if it's different from
the current value. The old stream is flushed before the new stream is
set.

NOTE: The old stream will also be closed by this method.

:param stream: A writable byte output stream to which the handler will write the serialized
IR byte sequences.
:return: The old stream if the stream was changed, or `None` if it wasn't.
"""

# NOTE: This function is implemented by mirroring CPython's implementation.

if stream is self._ostream:
return None

old_stream: IO[bytes] = self._ostream
with self.lock if self.lock else nullcontext():
# TODO: The following call will close the old stream whereas `logging.StreamHandler`'s
# implementation will only flush the stream without closing it. To support
# `logging.StreamHandler`'s behaviour, we need `clp_ffi_py.ir.Serializer` to allow
# closing the serializer without closing the underlying output stream.
self._init_new_serializer(stream)
self._ostream = stream
return old_stream

# override
def close(self) -> None:
if self._is_closed():
return
self._close_serializer()
super().close()

def _is_closed(self) -> bool:
return self._serializer is None

def _close_serializer(self) -> None:
"""
Closes the current serializer if it's open.

NOTE: The underlying output stream will also be closed.
"""
if self._is_closed():
return
assert self._serializer is not None
self._serializer.close()
self._serializer = None

def _init_new_serializer(self, stream: IO[bytes]) -> None:
"""
Initializes a new serializer that will write to the given stream.

:param stream: The stream that the underlying serializer will write to.
"""
self._close_serializer()
self._serializer = Serializer(
ZstdCompressor().stream_writer(stream) if self._enable_compression else stream
)

def _write(self, record: logging.LogRecord) -> None:
"""
Writes the log event into the underlying serializer.

:param record: The log event to serialize.
:raise RuntimeError: If the handler has been already closed.
:raise TypeError: If `record.msg` is not a Python dictionary.
"""
if self._is_closed():
raise RuntimeError("Stream already closed.")

if not isinstance(record.msg, dict):
raise TypeError("`record.msg` must be a Python dictionary.")

self._serialize_kv_pair_log_event(
self._auto_gen_kv_pairs_buf.generate(Timestamp.now(), record), record.msg
)

def _serialize_kv_pair_log_event(
self, auto_gen_kv_pairs: Dict[str, Any], user_gen_kv_pairs: Dict[str, Any]
) -> None:
"""
:param auto_gen_kv_pairs: A dict of auto-generated kv-pairs.
:param user_gen_kv_pairs: A dict of user-generated kv-pairs.
"""
if self._is_closed():
raise RuntimeError("Stream already closed.")
assert self._serializer is not None
self._serializer.serialize_log_event_from_msgpack_map(
serialize_dict_to_msgpack(auto_gen_kv_pairs),
serialize_dict_to_msgpack(user_gen_kv_pairs),
)
44 changes: 44 additions & 0 deletions src/clp_logging/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

import time
from math import floor


class Timestamp:
"""
A timestamp represented as a Unix timestamp and a timezone offset from UTC.
"""

@staticmethod
def now() -> Timestamp:
"""
:return: A `Timestamp` instance representing the current time.
"""
ts: float = time.time()
return Timestamp(
unix_ts=floor(ts * 1000),
utc_offset=time.localtime(ts).tm_gmtoff,
)

def __init__(self, unix_ts: int, utc_offset: int):
"""
Initializes a `Timestamp` instance with the given time.

:param unix_ts: Unix timestamp in milliseconds.
:param utc_offset: The number of seconds the timezone is ahead of
(positive) or behind (negative) UTC.
"""
self._utc_offset: int = utc_offset
self._unix_ts: int = unix_ts

def get_unix_ts(self) -> int:
"""
:return: The Unix timestamp in milliseconds.
"""
return self._unix_ts

def get_utc_offset(self) -> int:
"""
:return: The number of seconds the timezone is ahead of (positive) or behind (negative) UTC.
"""
return self._utc_offset
9 changes: 8 additions & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import unittest
from typing import Iterable, Optional, Union

from tests.test_handlers import TestCLPBase, TestCLPSegmentStreamingBase
from tests.test_handlers import (
TestCLPBase,
TestClpKeyValuePairLoggingBase,
TestCLPSegmentStreamingBase,
)


def add_tests(suite: unittest.TestSuite, loader: unittest.TestLoader, test_class: type) -> None:
Expand Down Expand Up @@ -35,4 +39,7 @@ def load_tests(
for seg_test_class in TestCLPSegmentStreamingBase.__subclasses__():
add_tests(suite, loader, seg_test_class)

for kv_pair_handler_test_class in TestClpKeyValuePairLoggingBase.__subclasses__():
add_tests(suite, loader, kv_pair_handler_test_class)

return suite
Loading
Loading