Source code for tremors.collector

"""This module contains collector bundles and tools.

A collector bundle is an object that contains the definition of
the collector state, a factory function to create a collector, and a
formatter function that will extract, and format the collector state from
a :class:`~logging.LogRecord`.

In addition to the bundles in this module, you can create custom bundles. A
collector is generic in the type of its state, which can be of an arbitrary
type. A collector factory can be defined to have arbitrary parameters that
depend on how the collector is initialized. A collector formatter has a
fixed signature as described in :class:`Formatter`.
"""

from __future__ import annotations

import dataclasses
import logging
import time
import uuid
from collections import defaultdict
from collections.abc import Mapping
from typing import NamedTuple, Protocol

import tremors.logger


[docs] def get_state[T](typ: type[T], name: str, record: logging.LogRecord) -> T | None: """Get the collector state from a :class:`~logging.LogRecord`. The function can be used by custom collector formatters to get the collector state from a record. Args: typ: The collector state class. name: The name of the collector. record: The LogRecord the may contain the collector state. Returns: An object of type ``typ`` if the LogRecord contains such an object for the collector. Otherwise None. """ tremors_extra = getattr(record, tremors.logger.EXTRA_KEY, None) if isinstance(tremors_extra, Mapping): state = tremors_extra.get(name) if isinstance(state, typ): return state return None
[docs] class Formatter(Protocol): """Callback proctocol for a collector formatter."""
[docs] def __call__( self, record: logging.LogRecord, *, name: str = "", fmt: object = "", default: str = "" ) -> str: """Extract and format a collector state. Args: record: A :class:`~logging.LogRecord` that may contain the collector state. name: The name of the collector. fmt: A format specification. Implementations commonly expect this to adhere to the :ref:`formatspec`. Then ``str(fmt)`` may be used to obtain the specification, so ``fmt`` does not have to be a string, but could be any object with an appropriate string representation. default: A value that may be returned if the formatter could not obtain, or format the state. Returns: The state formatted as a string, or the default value. """
[docs] class CollectorBundle[T, U](NamedTuple): """A bundle to create, and work with a collector.""" state: type[T] factory: U formatter: Formatter
[docs] class IndentifierState(NamedTuple): """The state of a collector created by :class:`IdentifierFactory`.""" group_id: uuid.UUID parent: tremors.logger.Logger | None path: tuple[str, ...]
def _identifier_collect( state: IndentifierState | None, log_item: tremors.logger.LogItem ) -> IndentifierState | None: logger = log_item.logger if logger and logger.group_id and logger.path: path = logger.path if not log_item.record else (*logger.path, f"std:{log_item.record.name}") return IndentifierState(group_id=logger.group_id, parent=logger.parent, path=path) return state
[docs] class IdentifierFactory(tremors.logger.CollectorFactory[IndentifierState | None]): """Create collectors to gather logger identifiers.""" def __init__( self, name: str = "identifier", *, level: int = logging.INFO, inherit: bool = False ) -> None: """Initialize the state to create collectors. Args: name: The name of collectors created by the factory. level: The level of collectors created by the factory. inherit: If True, collectors created by the factory will be inherited. """ self._name = name self._level = level self._inherit = inherit
[docs] def create(self) -> tremors.logger.Collector[IndentifierState | None]: """Create a collector.""" return tremors.logger.Collector( id=uuid.uuid4(), name=self._name, level=self._level, inherit=self._inherit, state=None, collect=_identifier_collect, )
[docs] def identifier_formatter( record: logging.LogRecord, *, name: str = "identifier", fmt: object = "[p:{process} t:{thread} g:{group_id}] {path}", default: str = "", ) -> str: """Extract an :class:`IndentifierState` from a record, and format it. ``str(fmt)`` may contain any of the fields in this table. The formatted string is generated by replacing the fields with the values described in the table. ======== ===== Field Value ======== ===== process ``f"p:{process} "`` where ``process`` is the process ID if it's available. Otherwise the empty string. thread ``f"t:{thread} "`` where ``thread`` is the thread ID if it's available. Otherwise the empty string. group_id ``f"t:{group_id}"`` where ``group_id`` is the group ID. path The path represented as a POSIX-like path. ======== ===== Returns: The formatted string if the record contains a state for the collector. Otherwise ``default``. """ state = get_state(IndentifierState, name, record) if state is None: return default process = record.process if record.process is not None else "" thread = record.thread if record.thread is not None else "" group_id = state.group_id path = "/".join(state.path) return str(fmt).format(process=process, thread=thread, group_id=group_id, path=path)
identifier = CollectorBundle( state=IndentifierState, factory=IdentifierFactory, formatter=identifier_formatter ) """The identifier collector bundle. Examples: With :func:`~logging.basicConfig` we configure the standard root logger with a stream handler that has a formatter that incorporates the record's ``ident`` attribute in the message. We add the filter function ``flt`` to the handler. The filter uses the formatter from the identifier collector bundle to extract the collector state from a record, and format it. Then the filter returns a modified record with the ident attribute set to the formatted state. .. note:: If we had added the filter to the logger instead of the handler, the filter would not be called for messages logged by descendant standard loggers. But by adding the filter to the handler, we ensure that it is called for all messages that the handler could emit. .. Example 1 .. code-block:: python import copy import logging from tremors import from_logged, logged from tremors.collector import identifier def flt(record): own_record = copy.copy(record) ident = identifier.formatter(own_record) own_record.ident = f"{ident} > " if ident else "" return own_record @logged(identifier.factory()) def parent(*, logger=from_logged): logger.info("foo") child() @logged(identifier.factory()) def child(*, logger=from_logged): logger.info("bar") logging.basicConfig( format="%(ident)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO ) logging.root.handlers[0].addFilter(flt) parent() The logged messages contain the process ID, thread ID, group ID, and hierarchy as a path. .. code-block:: shell [p:... t:... g:...] parent > INFO:root:entered: parent [p:... t:... g:...] parent > INFO:root:foo [p:... t:... g:...] parent/child > INFO:root:entered: child [p:... t:... g:...] parent/child > INFO:root:bar [p:... t:... g:...] parent/child > INFO:root:exited: child [p:... t:... g:...] parent > INFO:root:exited: parent We can expand on the previous example by throwing a function that uses standard logging into the mix. ``child2`` calls ``other``, which could be a function that you can't change, and doesn't use Tremors loggers. By default, standard loggers propagate their messages, so descendant messages will eventually be processed by the root logger's handler with our filter. The collector formatter will not be able to extract the state from a descendant record, and will return the default value, the empty string. Then ``record.ident`` will be the empty string, which will be interpolated into the formatted message. .. note:: Since the formatter expects the record to have an ident attribute, the filter must always add this attribute to the record. .. Example 2 .. code-block:: python @logged(identifier.factory()) def child(*, logger=from_logged) -> None: logger.info("bar") other() def other() -> None: logging.getLogger(__name__).info("baz") parent() The functions that use Tremors loggers have entered and exited messages, and their messages contain identifier information, as before. The ``other`` function, which logs with a standard logger simply emits messages with empty ident sections. .. code-block:: shell [p:... t:... g:...] parent > INFO:root:entered: parent [p:... t:... g:...] parent > INFO:root:foo [p:... t:... g:...] parent/child > INFO:root:entered: child [p:... t:... g:...] parent/child > INFO:root:bar INFO:__main__:baz [p:... t:... g:...] parent/child > INFO:root:exited: child [p:... t:... g:...] parent > INFO:root:exited: parent """
[docs] @dataclasses.dataclass class CounterState: """The state of a collector created by the counter factory.""" count: int step: int
def _counter_collect(state: CounterState, _: tremors.logger.LogItem) -> CounterState: state.count += state.step return state
[docs] class CounterFactory(tremors.logger.CollectorFactory[CounterState]): """Create collectors to count logging calls.""" def __init__( self, name: str = "counter", *, level: int = logging.INFO, inherit: bool = False, initial: int = 0, step: int = 1, ) -> None: """Initialize the state to create collectors. Args: name: The name of collectors created by the factory. level: The level of collectors created by the factory. inherit: If True, collectors created by the factory will be inherited. initial: The initial value of the count of collectors created by the factory. step: How much to increase the count by each time the collectors created by the factory run. """ self._name = name self._level = level self._inherit = inherit self._initial = initial self._step = step
[docs] def create(self) -> tremors.logger.Collector[CounterState]: """Create a collector.""" return tremors.logger.Collector( id=uuid.uuid4(), name=self._name, level=self._level, inherit=self._inherit, state=CounterState(count=self._initial, step=self._step), collect=_counter_collect, )
[docs] def counter_formatter( record: logging.LogRecord, *, name: str = "counter", fmt: object = "{count}", default: str = "0" ) -> str: """Extract a count from a record, and format it. ``str(fmt)`` may contain the field ``count``. The formatted string is generated by replacing the field with the state's count. Returns: The formatted string if the record contains a state for the collector. Otherwise ``default``. """ state = get_state(CounterState, name, record) if state is None: return default return str(fmt).format(count=state.count)
counter = CollectorBundle(state=CounterState, factory=CounterFactory, formatter=counter_formatter) """The counter collector bundle. Examples: We configure standard logging in a similar manner to the examples in :attr:`identifier`, except here we use counter collectors. .. Example 1 .. code-block:: python import copy import logging from tremors import from_logged, logged from tremors.collector import counter def flt(record): own_record = copy.copy(record) count = counter.formatter(own_record, fmt="[{count}] ", default="") own_record.count = count return own_record @logged(counter.factory()) def parent(*, logger=from_logged): logger.info("foo") child() @logged(counter.factory()) def child(*, logger=from_logged): logger.info("bar") logging.basicConfig( format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO ) logging.root.handlers[0].addFilter(flt) parent() The logged messages include the count, where the ``parent`` and ``child`` counts are distinct from each other because each loggers uses its own counter collector. .. code-block:: shell [1] INFO:root:entered: parent [2] INFO:root:foo [1] INFO:root:entered: child [2] INFO:root:bar [3] INFO:root:exited: child [3] INFO:root:exited: parent Now let's modify the previous example to have ``parent`` and ``child`` use a shared counter by passing a collector instance that we get from the factory's ``create`` method. .. Example 2 .. code-block:: python shared_counter = counter.factory().create() @logged(shared_counter) def parent(*, logger=from_logged) -> None: logger.info("foo") child() @logged(shared_counter) def child(*, logger=from_logged) -> None: logger.info("bar") parent() Here the count is shared between the two functions. .. code-block:: shell [1] INFO:root:entered: parent [2] INFO:root:foo [3] INFO:root:entered: child [4] INFO:root:bar [5] INFO:root:exited: child [6] INFO:root:exited: parent """ _enter_collect_cache = defaultdict[str, int](int)
[docs] @dataclasses.dataclass class EnterCounterState: """The state of a collector created by the enter counter factory.""" cache_key: str | None step: int @property def count(self) -> int: """Get the count from the cache.""" return _enter_collect_cache[self.cache_key or "__default__"]
def _enter_counter_collect( state: EnterCounterState, log_item: tremors.logger.LogItem ) -> EnterCounterState: if ( (extra := log_item.extra) and isinstance(tremors_extra := extra.get(tremors.logger.EXTRA_KEY, {}), Mapping) and tremors_extra.get(tremors.logger.ENTER_KEY) ): if not state.cache_key and log_item.logger: state.cache_key = log_item.logger.name _enter_collect_cache[state.cache_key or "__default__"] += state.step return state
[docs] class EnterCounterFactory(tremors.logger.CollectorFactory[EnterCounterState]): """Create collectors to count when a logger is entered.""" def __init__( self, name: str = "enter_counter", *, cache_key: str | None = None, level: int = logging.INFO, inherit: bool = False, step: int = 1, ) -> None: """Initialize the state to create collectors. Args: name: The name of collectors created by the factory. cache_key: The key the count for the created collector will be stored under in the cache. If None, defaults to ``name``. level: The level of collectors created by the factory. inherit: If True, collectors created by the factory will be inherited. step: How much to increase the count by each time the collectors created by the factory run. """ self._name = name self._cache_key = cache_key self._level = level self._inherit = inherit self._step = step
[docs] def create(self) -> tremors.logger.Collector[EnterCounterState]: """Create a collector.""" return tremors.logger.Collector( id=uuid.uuid4(), name=self._name, level=self._level, inherit=self._inherit, state=EnterCounterState(cache_key=self._cache_key, step=self._step), collect=_enter_counter_collect, )
[docs] def enter_counter_formatter( record: logging.LogRecord, *, name: str = "enter_counter", fmt: object = "{enter_count}", default: str = "0", ) -> str: """Extract a count from a record, and format it. ``str(fmt)`` may contain the field ``enter_count``. The formatted string is generated by replacing the field with the state's count. Returns: The formatted string if the record contains a state for the collector. Otherwise ``default``. """ state = get_state(EnterCounterState, name, record) if state is None: return default return str(fmt).format(enter_count=state.count)
enter_counter = CollectorBundle( state=EnterCounterState, factory=EnterCounterFactory, formatter=enter_counter_formatter ) """The enter counter collector bundle. Examples: We configure standard logging in a similar manner to the examples in :attr:`counter`, except here we use enter counter collectors, which will only count entered logs. .. Example 1 .. code-block:: python import copy import logging from tremors import from_logged, logged from tremors.collector import enter_counter def flt(record): own_record = copy.copy(record) count = enter_counter.formatter( own_record, fmt="[{enter_count}] ", default="" ) own_record.count = count return own_record @logged(enter_counter.factory()) def foo(run: int, *, logger=from_logged): logger.info("foo %d", run) @logged(enter_counter.factory()) def bar(run: int, *, logger=from_logged): logger.info("bar %d", run) logging.basicConfig( format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO ) logging.root.handlers[0].addFilter(flt) foo(1) bar(1) foo(2) foo(3) bar(2) The logged messages include the total number of times each function has been called. .. code-block:: shell [1] INFO:root:entered: foo [1] INFO:root:foo 1 [1] INFO:root:exited: foo [1] INFO:root:entered: bar [1] INFO:root:bar 1 [1] INFO:root:exited: bar [2] INFO:root:entered: foo [2] INFO:root:foo 2 [2] INFO:root:exited: foo [3] INFO:root:entered: foo [3] INFO:root:foo 3 [3] INFO:root:exited: foo [2] INFO:root:entered: bar [2] INFO:root:bar 2 [2] INFO:root:exited: bar Let's see what happens if a function doesn't log the entered message. .. Example 2 .. code-block:: python @logged(enter_counter.factory(), enter_msg=False, exit_msg=False) def baz(run: int, *, logger=from_logged): logger.info("baz %d", run) foo(4) baz(1) foo(5) foo(6) baz(2) The ``foo`` counter keeps counting up from the previous example, but the ``baz`` counter doesn't increase at all because the ``baz`` logger doesn't log entered messages. .. code-block:: shell [4] INFO:root:entered: foo [4] INFO:root:foo 4 [4] INFO:root:exited: foo [0] INFO:root:baz 1 [5] INFO:root:entered: foo [5] INFO:root:foo 5 [5] INFO:root:exited: foo [6] INFO:root:entered: foo [6] INFO:root:foo 6 [6] INFO:root:exited: foo [0] INFO:root:baz 2 """
[docs] @dataclasses.dataclass class ElapsedState: """The state of a collector created by :class:`ElapsedFactory`.""" t0: int | None t: int | None
def _collect_elapsed(state: ElapsedState, _: tremors.logger.LogItem) -> ElapsedState: t = time.perf_counter_ns() if state.t0 is None: state.t0 = t state.t = t return state
[docs] class ElapsedFactory(tremors.logger.CollectorFactory[ElapsedState]): """Create collectors to measure how much time has elapsed.""" def __init__( self, name: str = "elapsed", *, level: int = logging.INFO, inherit: bool = False ) -> None: """Initialize the state to create collectors. Args: name: The name of collectors created by the factory. level: The level of collectors created by the factory. inherit: If True, collectors created by the factory will be inherited. """ self._name = name self._level = level self._inherit = inherit
[docs] def create(self) -> tremors.logger.Collector[ElapsedState]: """Create a collector.""" return tremors.logger.Collector( id=uuid.uuid4(), name=self._name, level=self._level, inherit=self._inherit, state=ElapsedState(t0=None, t=None), collect=_collect_elapsed, )
[docs] def elapsed_formatter( record: logging.LogRecord, *, name: str = "elapsed", fmt: object = "{elapsed:.3f}", default: str = "", ) -> str: """Extract an :class:`ElapsedState` from a record, and format it. ``str(fmt)`` may contain the field ``elapsed``. The formatted string is generated by replacing the field with the state's elapsed time in seconds. Returns: The formatted string if the record contains a state for the collector. Otherwise ``default``. """ state = get_state(ElapsedState, name, record) if state is None: return default t0, t = state.t0, state.t if t0 is None or t is None: return default elapsed_s = (t - t0) / 1e9 return str(fmt).format(elapsed=elapsed_s)
elapsed = CollectorBundle(state=ElapsedState, factory=ElapsedFactory, formatter=elapsed_formatter) """The elapsed collector bundle. Examples: We configure standard logging in a similar manner to the previous examples :attr:`identifier`, except here we use elapsed collectors, and identifier collectors with a custom format to only show the path. .. Example 1 .. code-block:: python import copy import logging import time from tremors import from_logged, logged from tremors.collector import elapsed, identifier def flt(record): own_record = copy.copy(record) ident_msg = identifier.formatter(own_record, fmt="{path}") elapsed_msg = elapsed.formatter(own_record) own_record.ident = f"{ident_msg} > " if ident_msg else "" own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else "" return own_record @logged(identifier.factory(), elapsed.factory()) def parent(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) child() @logged(identifier.factory(), elapsed.factory()) def child(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) logging.basicConfig( format="%(elapsed)s%(ident)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO, ) logging.root.handlers[0].addFilter(flt) parent() The logged messages include the elapsed time since each elapsed collector started. .. code-block:: shell 0.000 parent > INFO:root:entered: parent 0.000 parent > INFO:root:sleeping for 1s... 0.000 parent/child > INFO:root:entered: child 0.000 parent/child > INFO:root:sleeping for 1s... 1.001 parent/child > INFO:root:exited: child 2.001 parent > INFO:root:exited: parent We can try the same thing again, only this time with a shared collectors. The parent's identifier, and elapsed collectors will be inherited by its descendants. .. Example 2 .. code-block:: python @logged(identifier.factory(inherit=True), elapsed.factory(inherit=True)) def parent(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) child() @logged def child(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) grandchild() @logged def grandchild(*, logger=from_logged): logger.info("sleeping for 0.5s...") time.sleep(0.5) parent() Notice that this time, the ``child`` elapsed time starts at 1s instead of resetting to 0s. .. code-block:: shell 0.000 parent > INFO:root:entered: parent 0.000 parent > INFO:root:sleeping for 1s... 1.000 parent/child > INFO:root:entered: child 1.001 parent/child > INFO:root:sleeping for 1s... 2.001 parent/child/grandchild > INFO:root:entered: grandchild 2.001 parent/child/grandchild > INFO:root:sleeping for 0.5s... 2.502 parent/child/grandchild > INFO:root:exited: grandchild 2.502 parent/child > INFO:root:exited: child 2.502 parent > INFO:root:exited: parent """