"""This module contains collector bundles and tools.
A collector bundle is an object that contains the definition of
the collector state, a factory function to create a collector, and a
formatter function that will extract, and format the collector state from
a :class:`~logging.LogRecord`.
In addition to the bundles in this module, you can create custom bundles. A
collector is generic in the type of its state, which can be of an arbitrary
type. A collector factory can be defined to have arbitrary parameters that
depend on how the collector is initialized. A collector formatter has a
fixed signature as described in :class:`Formatter`.
"""
from __future__ import annotations
import dataclasses
import logging
import time
import uuid
from collections import defaultdict
from collections.abc import Mapping
from typing import NamedTuple, Protocol
import tremors.logger
[docs]
def get_state[T](typ: type[T], name: str, record: logging.LogRecord) -> T | None:
"""Get the collector state from a :class:`~logging.LogRecord`.
The function can be used by custom collector formatters to get the
collector state from a record.
Args:
typ: The collector state class.
name: The name of the collector.
record: The LogRecord the may contain the collector state.
Returns:
An object of type ``typ`` if the LogRecord contains such an object
for the collector. Otherwise None.
"""
tremors_extra = getattr(record, tremors.logger.EXTRA_KEY, None)
if isinstance(tremors_extra, Mapping):
state = tremors_extra.get(name)
if isinstance(state, typ):
return state
return None
[docs]
class CollectorBundle[T, U](NamedTuple):
"""A bundle to create, and work with a collector."""
state: type[T]
factory: U
formatter: Formatter
[docs]
class IndentifierState(NamedTuple):
"""The state of a collector created by :class:`IdentifierFactory`."""
group_id: uuid.UUID
parent: tremors.logger.Logger | None
path: tuple[str, ...]
def _identifier_collect(
state: IndentifierState | None, log_item: tremors.logger.LogItem
) -> IndentifierState | None:
logger = log_item.logger
if logger and logger.group_id and logger.path:
path = logger.path if not log_item.record else (*logger.path, f"std:{log_item.record.name}")
return IndentifierState(group_id=logger.group_id, parent=logger.parent, path=path)
return state
[docs]
class IdentifierFactory(tremors.logger.CollectorFactory[IndentifierState | None]):
"""Create collectors to gather logger identifiers."""
def __init__(
self, name: str = "identifier", *, level: int = logging.INFO, inherit: bool = False
) -> None:
"""Initialize the state to create collectors.
Args:
name: The name of collectors created by the factory.
level: The level of collectors created by the factory.
inherit: If True, collectors created by the factory will be
inherited.
"""
self._name = name
self._level = level
self._inherit = inherit
[docs]
def create(self) -> tremors.logger.Collector[IndentifierState | None]:
"""Create a collector."""
return tremors.logger.Collector(
id=uuid.uuid4(),
name=self._name,
level=self._level,
inherit=self._inherit,
state=None,
collect=_identifier_collect,
)
identifier = CollectorBundle(
state=IndentifierState, factory=IdentifierFactory, formatter=identifier_formatter
)
"""The identifier collector bundle.
Examples:
With :func:`~logging.basicConfig` we configure the standard root logger
with a stream handler that has a formatter that incorporates the record's
``ident`` attribute in the message. We add the filter function ``flt`` to
the handler. The filter uses the formatter from the identifier collector
bundle to extract the collector state from a record, and format it.
Then the filter returns a modified record with the ident attribute set
to the formatted state.
.. note::
If we had added the filter to the logger instead of the handler,
the filter would not be called for messages logged by descendant
standard loggers. But by adding the filter to the handler, we ensure
that it is called for all messages that the handler could emit.
..
Example 1
.. code-block:: python
import copy
import logging
from tremors import from_logged, logged
from tremors.collector import identifier
def flt(record):
own_record = copy.copy(record)
ident = identifier.formatter(own_record)
own_record.ident = f"{ident} > " if ident else ""
return own_record
@logged(identifier.factory())
def parent(*, logger=from_logged):
logger.info("foo")
child()
@logged(identifier.factory())
def child(*, logger=from_logged):
logger.info("bar")
logging.basicConfig(
format="%(ident)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO
)
logging.root.handlers[0].addFilter(flt)
parent()
The logged messages contain the process ID, thread ID, group ID, and
hierarchy as a path.
.. code-block:: shell
[p:... t:... g:...] parent > INFO:root:entered: parent
[p:... t:... g:...] parent > INFO:root:foo
[p:... t:... g:...] parent/child > INFO:root:entered: child
[p:... t:... g:...] parent/child > INFO:root:bar
[p:... t:... g:...] parent/child > INFO:root:exited: child
[p:... t:... g:...] parent > INFO:root:exited: parent
We can expand on the previous example by throwing a function that uses
standard logging into the mix. ``child2`` calls ``other``, which could be a
function that you can't change, and doesn't use Tremors loggers. By
default, standard loggers propagate their messages, so descendant messages
will eventually be processed by the root logger's handler with our filter.
The collector formatter will not be able to extract the state from a
descendant record, and will return the default value, the empty string.
Then ``record.ident`` will be the empty string, which will be interpolated
into the formatted message.
.. note::
Since the formatter expects the record to have an ident attribute,
the filter must always add this attribute to the record.
..
Example 2
.. code-block:: python
@logged(identifier.factory())
def child(*, logger=from_logged) -> None:
logger.info("bar")
other()
def other() -> None:
logging.getLogger(__name__).info("baz")
parent()
The functions that use Tremors loggers have entered and exited messages,
and their messages contain identifier information, as before. The ``other``
function, which logs with a standard logger simply emits messages with
empty ident sections.
.. code-block:: shell
[p:... t:... g:...] parent > INFO:root:entered: parent
[p:... t:... g:...] parent > INFO:root:foo
[p:... t:... g:...] parent/child > INFO:root:entered: child
[p:... t:... g:...] parent/child > INFO:root:bar
INFO:__main__:baz
[p:... t:... g:...] parent/child > INFO:root:exited: child
[p:... t:... g:...] parent > INFO:root:exited: parent
"""
[docs]
@dataclasses.dataclass
class CounterState:
"""The state of a collector created by the counter factory."""
count: int
step: int
def _counter_collect(state: CounterState, _: tremors.logger.LogItem) -> CounterState:
state.count += state.step
return state
[docs]
class CounterFactory(tremors.logger.CollectorFactory[CounterState]):
"""Create collectors to count logging calls."""
def __init__(
self,
name: str = "counter",
*,
level: int = logging.INFO,
inherit: bool = False,
initial: int = 0,
step: int = 1,
) -> None:
"""Initialize the state to create collectors.
Args:
name: The name of collectors created by the factory.
level: The level of collectors created by the factory.
inherit: If True, collectors created by the factory will be
inherited.
initial: The initial value of the count of collectors created by
the factory.
step: How much to increase the count by each time the collectors
created by the factory run.
"""
self._name = name
self._level = level
self._inherit = inherit
self._initial = initial
self._step = step
[docs]
def create(self) -> tremors.logger.Collector[CounterState]:
"""Create a collector."""
return tremors.logger.Collector(
id=uuid.uuid4(),
name=self._name,
level=self._level,
inherit=self._inherit,
state=CounterState(count=self._initial, step=self._step),
collect=_counter_collect,
)
counter = CollectorBundle(state=CounterState, factory=CounterFactory, formatter=counter_formatter)
"""The counter collector bundle.
Examples:
We configure standard logging in a similar manner to the examples in
:attr:`identifier`, except here we use counter collectors.
..
Example 1
.. code-block:: python
import copy
import logging
from tremors import from_logged, logged
from tremors.collector import counter
def flt(record):
own_record = copy.copy(record)
count = counter.formatter(own_record, fmt="[{count}] ", default="")
own_record.count = count
return own_record
@logged(counter.factory())
def parent(*, logger=from_logged):
logger.info("foo")
child()
@logged(counter.factory())
def child(*, logger=from_logged):
logger.info("bar")
logging.basicConfig(
format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO
)
logging.root.handlers[0].addFilter(flt)
parent()
The logged messages include the count, where the ``parent`` and ``child``
counts are distinct from each other because each loggers uses its own
counter collector.
.. code-block:: shell
[1] INFO:root:entered: parent
[2] INFO:root:foo
[1] INFO:root:entered: child
[2] INFO:root:bar
[3] INFO:root:exited: child
[3] INFO:root:exited: parent
Now let's modify the previous example to have ``parent`` and ``child`` use
a shared counter by passing a collector instance that we get from the
factory's ``create`` method.
..
Example 2
.. code-block:: python
shared_counter = counter.factory().create()
@logged(shared_counter)
def parent(*, logger=from_logged) -> None:
logger.info("foo")
child()
@logged(shared_counter)
def child(*, logger=from_logged) -> None:
logger.info("bar")
parent()
Here the count is shared between the two functions.
.. code-block:: shell
[1] INFO:root:entered: parent
[2] INFO:root:foo
[3] INFO:root:entered: child
[4] INFO:root:bar
[5] INFO:root:exited: child
[6] INFO:root:exited: parent
"""
_enter_collect_cache = defaultdict[str, int](int)
[docs]
@dataclasses.dataclass
class EnterCounterState:
"""The state of a collector created by the enter counter factory."""
cache_key: str | None
step: int
@property
def count(self) -> int:
"""Get the count from the cache."""
return _enter_collect_cache[self.cache_key or "__default__"]
def _enter_counter_collect(
state: EnterCounterState, log_item: tremors.logger.LogItem
) -> EnterCounterState:
if (
(extra := log_item.extra)
and isinstance(tremors_extra := extra.get(tremors.logger.EXTRA_KEY, {}), Mapping)
and tremors_extra.get(tremors.logger.ENTER_KEY)
):
if not state.cache_key and log_item.logger:
state.cache_key = log_item.logger.name
_enter_collect_cache[state.cache_key or "__default__"] += state.step
return state
[docs]
class EnterCounterFactory(tremors.logger.CollectorFactory[EnterCounterState]):
"""Create collectors to count when a logger is entered."""
def __init__(
self,
name: str = "enter_counter",
*,
cache_key: str | None = None,
level: int = logging.INFO,
inherit: bool = False,
step: int = 1,
) -> None:
"""Initialize the state to create collectors.
Args:
name: The name of collectors created by the factory.
cache_key: The key the count for the created collector will be stored under in the cache. If None, defaults to ``name``.
level: The level of collectors created by the factory.
inherit: If True, collectors created by the factory will be
inherited.
step: How much to increase the count by each time the collectors
created by the factory run.
"""
self._name = name
self._cache_key = cache_key
self._level = level
self._inherit = inherit
self._step = step
[docs]
def create(self) -> tremors.logger.Collector[EnterCounterState]:
"""Create a collector."""
return tremors.logger.Collector(
id=uuid.uuid4(),
name=self._name,
level=self._level,
inherit=self._inherit,
state=EnterCounterState(cache_key=self._cache_key, step=self._step),
collect=_enter_counter_collect,
)
enter_counter = CollectorBundle(
state=EnterCounterState, factory=EnterCounterFactory, formatter=enter_counter_formatter
)
"""The enter counter collector bundle.
Examples:
We configure standard logging in a similar manner to the examples in
:attr:`counter`, except here we use enter counter collectors, which will
only count entered logs.
..
Example 1
.. code-block:: python
import copy
import logging
from tremors import from_logged, logged
from tremors.collector import enter_counter
def flt(record):
own_record = copy.copy(record)
count = enter_counter.formatter(
own_record, fmt="[{enter_count}] ", default=""
)
own_record.count = count
return own_record
@logged(enter_counter.factory())
def foo(run: int, *, logger=from_logged):
logger.info("foo %d", run)
@logged(enter_counter.factory())
def bar(run: int, *, logger=from_logged):
logger.info("bar %d", run)
logging.basicConfig(
format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO
)
logging.root.handlers[0].addFilter(flt)
foo(1)
bar(1)
foo(2)
foo(3)
bar(2)
The logged messages include the total number of times each function has
been called.
.. code-block:: shell
[1] INFO:root:entered: foo
[1] INFO:root:foo 1
[1] INFO:root:exited: foo
[1] INFO:root:entered: bar
[1] INFO:root:bar 1
[1] INFO:root:exited: bar
[2] INFO:root:entered: foo
[2] INFO:root:foo 2
[2] INFO:root:exited: foo
[3] INFO:root:entered: foo
[3] INFO:root:foo 3
[3] INFO:root:exited: foo
[2] INFO:root:entered: bar
[2] INFO:root:bar 2
[2] INFO:root:exited: bar
Let's see what happens if a function doesn't log the entered message.
..
Example 2
.. code-block:: python
@logged(enter_counter.factory(), enter_msg=False, exit_msg=False)
def baz(run: int, *, logger=from_logged):
logger.info("baz %d", run)
foo(4)
baz(1)
foo(5)
foo(6)
baz(2)
The ``foo`` counter keeps counting up from the previous example, but the
``baz`` counter doesn't increase at all because the ``baz`` logger doesn't
log entered messages.
.. code-block:: shell
[4] INFO:root:entered: foo
[4] INFO:root:foo 4
[4] INFO:root:exited: foo
[0] INFO:root:baz 1
[5] INFO:root:entered: foo
[5] INFO:root:foo 5
[5] INFO:root:exited: foo
[6] INFO:root:entered: foo
[6] INFO:root:foo 6
[6] INFO:root:exited: foo
[0] INFO:root:baz 2
"""
[docs]
@dataclasses.dataclass
class ElapsedState:
"""The state of a collector created by :class:`ElapsedFactory`."""
t0: int | None
t: int | None
def _collect_elapsed(state: ElapsedState, _: tremors.logger.LogItem) -> ElapsedState:
t = time.perf_counter_ns()
if state.t0 is None:
state.t0 = t
state.t = t
return state
[docs]
class ElapsedFactory(tremors.logger.CollectorFactory[ElapsedState]):
"""Create collectors to measure how much time has elapsed."""
def __init__(
self, name: str = "elapsed", *, level: int = logging.INFO, inherit: bool = False
) -> None:
"""Initialize the state to create collectors.
Args:
name: The name of collectors created by the factory.
level: The level of collectors created by the factory.
inherit: If True, collectors created by the factory will be
inherited.
"""
self._name = name
self._level = level
self._inherit = inherit
[docs]
def create(self) -> tremors.logger.Collector[ElapsedState]:
"""Create a collector."""
return tremors.logger.Collector(
id=uuid.uuid4(),
name=self._name,
level=self._level,
inherit=self._inherit,
state=ElapsedState(t0=None, t=None),
collect=_collect_elapsed,
)
elapsed = CollectorBundle(state=ElapsedState, factory=ElapsedFactory, formatter=elapsed_formatter)
"""The elapsed collector bundle.
Examples:
We configure standard logging in a similar manner to the previous examples
:attr:`identifier`, except here we use elapsed collectors, and identifier
collectors with a custom format to only show the path.
..
Example 1
.. code-block:: python
import copy
import logging
import time
from tremors import from_logged, logged
from tremors.collector import elapsed, identifier
def flt(record):
own_record = copy.copy(record)
ident_msg = identifier.formatter(own_record, fmt="{path}")
elapsed_msg = elapsed.formatter(own_record)
own_record.ident = f"{ident_msg} > " if ident_msg else ""
own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
return own_record
@logged(identifier.factory(), elapsed.factory())
def parent(*, logger=from_logged):
logger.info("sleeping for 1s...")
time.sleep(1)
child()
@logged(identifier.factory(), elapsed.factory())
def child(*, logger=from_logged):
logger.info("sleeping for 1s...")
time.sleep(1)
logging.basicConfig(
format="%(elapsed)s%(ident)s%(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
)
logging.root.handlers[0].addFilter(flt)
parent()
The logged messages include the elapsed time since each elapsed collector
started.
.. code-block:: shell
0.000 parent > INFO:root:entered: parent
0.000 parent > INFO:root:sleeping for 1s...
0.000 parent/child > INFO:root:entered: child
0.000 parent/child > INFO:root:sleeping for 1s...
1.001 parent/child > INFO:root:exited: child
2.001 parent > INFO:root:exited: parent
We can try the same thing again, only this time with a shared collectors.
The parent's identifier, and elapsed collectors will be inherited by its
descendants.
..
Example 2
.. code-block:: python
@logged(identifier.factory(inherit=True), elapsed.factory(inherit=True))
def parent(*, logger=from_logged):
logger.info("sleeping for 1s...")
time.sleep(1)
child()
@logged
def child(*, logger=from_logged):
logger.info("sleeping for 1s...")
time.sleep(1)
grandchild()
@logged
def grandchild(*, logger=from_logged):
logger.info("sleeping for 0.5s...")
time.sleep(0.5)
parent()
Notice that this time, the ``child`` elapsed time starts at 1s instead
of resetting to 0s.
.. code-block:: shell
0.000 parent > INFO:root:entered: parent
0.000 parent > INFO:root:sleeping for 1s...
1.000 parent/child > INFO:root:entered: child
1.001 parent/child > INFO:root:sleeping for 1s...
2.001 parent/child/grandchild > INFO:root:entered: grandchild
2.001 parent/child/grandchild > INFO:root:sleeping for 0.5s...
2.502 parent/child/grandchild > INFO:root:exited: grandchild
2.502 parent/child > INFO:root:exited: child
2.502 parent > INFO:root:exited: parent
"""