Tremors

Tremors is a library for logging while collecting metrics. Tremors loggers are drop-in replacements for standard loggers. But Tremors loggers have metrics collectors that run when messages are logged. The loggers are also context managers. The library maintains a hierarchy of nested contexts, where all logs and metrics are grouped together. You can create a new hierarchy at anytime to group related logs.

Installation

pip install tremors

Usage

A function can be wrapped in a logger context with the logged decorator. If you call the function without a logger argument, one will automatically be injected into it.

import logging

from tremors import from_logged, logged


@logged
def fn(*, logger=from_logged):
    logger.info("hello")


logging.basicConfig(
    format="Tremors > %(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
)
fn()

The context automatically logs entered, and exited messages before, and after each function call. The logger uses the configured standard root logger by default to log the messages.

Tremors > INFO:root:entered: fn
Tremors > INFO:root:hello
Tremors > INFO:root:exited: fn

You may specify a standard logger by name for the Tremors logger to use as its underlying logger.

@loggeds.logged(logger_name=__name__)
def fn((*, logger: tremors.Logger = tremors.from_logged) -> None:
    logger.info("hello")


fn()

The messages are logged by the specified underlying logger. Based on our standard logging configuration, the messages propagate from the underlying logger to the standard root logger, which emits them.

Tremors > INFO:__main__:entered: fn
Tremors > INFO:__main__:hello
Tremors > INFO:__main__:exited: fn

Next let’s use a collector to measure the elapsed time since the function started each time a message is logged. When a message is logged, the logger runs the collector, and adds its updated state to the message’s LogRecord. We use a standard logging filter to inspect and modify the record before it is emitted. We format the collector state, then add the formatted state to the elapsed custom attribute of the record. Finally, we configure the root logger’s formatter to incorporate the elapsed attribute.

Note

The elapsed collector bundle included with Tremors has a factory for creating a collector. It also has a formatter that we use in flt to extract the state from the record, and format it. In flt we make a copy of the record, then modify and return the copy, instead of modifying the original record, so as not to have side effects on other loggers that may process the message. We also make sure to attach flt to the root logger’s handler, and not to the logger itself; messages that originate from descendant loggers will not go through logger filters when they are propagated, but they will go through handler filters before they are emitted.

import copy
import time

from tremors.collector import elapsed


def flt(record):
    own_record = copy.copy(record)
    elapsed_msg = elapsed.formatter(own_record)
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    return own_record


@logged(elapsed.factory())
def fn(*, logger=from_logged):
    logger.info("sleeping for 1s...")
    time.sleep(1)


logging.basicConfig(
    format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()

The messages contain elapsed information, according to the formatter configuration, that is sourced from the record’s elapsed custom attribute.

0.000 INFO:root:entered: fn
0.000 INFO:root:sleeping for 1s...
1.000 INFO:root:exited: fn

A Logger can have any number of collectors. Here, in addition to the elapsed collector from the previous example, we add a counter collector. A collector has a level, and will only run if the message is being logged at that level or higher. Our counter level is ERROR. We can also control which custom record attribute has the formatted collector state via the collector’s name. This is useful if you have multiple of the same collector on a single logger. Here, we name the counter errors, so record.errors will contain a formatted string with the running total number of errors that have been logged by a single function call. Finally, we an control the format of the counter state via the fmt argument of the counter’s formatter.

from tremors.collector import counter


def flt(record):
    own_record = copy.copy(record)
    errors = counter.formatter(
        own_record, name="errors", fmt="errors={count}"
    )
    own_record.errors = f"{errors} " if errors else ""
    elapsed_msg = elapsed.formatter(own_record)
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    return own_record


@logged(
    elapsed.factory(), counter.factory(name="errors", level=logging.ERROR)
)
def fn(*, logger=from_logged):
    logger.info("hello")
    time.sleep(1)
    logger.error("uh-ho!")


logging.basicConfig(
    format="%(elapsed)s%(errors)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()

The messages contain information from both collectors.

0.000 errors=0 INFO:root:entered: fn
0.000 errors=0 INFO:root:hello
1.001 errors=1 ERROR:root:uh-ho!
1.001 errors=1 INFO:root:exited: fn

Passing a collector factory, as in the previous example, will result in a new counter collector being used each time the function is called. Let’s reuse the same collector to keep a tally of errors across all calls to the function by passing a collector instance that we get by calling the factory’s create method.

fn_errors = counter.factory(name="errors", level=logging.ERROR).create()


@logged(fn_errors)
def fn(*, logger=from_logged):
    logger.error("uh-ho!")


fn()
fn()

The error count doesn’t reset in the second function call.

errors=0 INFO:root:entered: fn
errors=1 ERROR:root:uh-ho!
errors=1 INFO:root:exited: fn
errors=1 INFO:root:entered: fn
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: fn

Another way we can tally the count across all function calls is to pass the same logger with each call.

from tremors import Logger


def fn(*, logger):
    logger.error("uh-ho!")


with Logger(
    counter.factory(name="errors", level=logging.ERROR), name="context"
) as logger:
    fn(logger=logger)
    fn(logger=logger)

We only get entered and exited messages for the context block. But the single logger used in both function calls maintains its state between calls.

errors=0 INFO:root:entered: context
errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: context

Collectors may be inherited by descendant loggers. Let’s count errors across nested loggers.

@logged(
    counter.factory(name="errors", level=logging.ERROR, inherit=True),
    enter_msg=False,
    exit_msg=False,
)
def parent(*, logger=from_logged):
    logger.error("uh-ho!")
    child()
    child()


@logged(enter_msg=False, exit_msg=False)
def child(*, logger=from_logged):
    logger.error("doh!")
    grandchild()


@logged(enter_msg=False, exit_msg=False)
def grandchild(*, logger=from_logged):
    logger.info("so far, so good")
    logger.error("spoke too soon!")


parent()

We’ve disabled the entered and exited messages with the enter_msg and exit_msg parameters. The parent counter is used in the child and grandchild functions.

errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:doh!
errors=2 INFO:root:so far, so good
errors=3 ERROR:root:spoke too soon!
errors=4 ERROR:root:doh!
errors=4 INFO:root:so far, so good
errors=5 ERROR:root:spoke too soon!

Asynchronous functions and methods may be decorated with async_logged.

import asyncio

from tremors import async_logged


@async_logged(elapsed.factory())
async def async_work_long(*, logger=from_logged):
    logger.info("long work starting")
    await asyncio.sleep(1)


@async_logged(elapsed.factory())
async def async_work_short(*, logger=from_logged):
    logger.info("short work starting")
    await asyncio.sleep(0.1)


@async_logged(elapsed.factory())
async def async_fn(*, logger=from_logged):
    coros = async_work_long(), async_work_short()
    logger.info("awaiting %d works", len(coros))
    await asyncio.gather(*coros)


logging.basicConfig(
    format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
asyncio.run(async_fn())
0.000 INFO:root:entered: async_fn
0.000 INFO:root:awaiting 2 works
0.000 INFO:root:entered: async_work_long
0.000 INFO:root:long work starting
0.000 INFO:root:entered: async_work_short
0.000 INFO:root:short work starting
0.101 INFO:root:exited: async_work_short
1.002 INFO:root:exited: async_work_long
1.003 INFO:root:exited: async_fn

We can capture, and run collectors on standard logs using enrich_std_record, which runs the current logger’s collectors with a LogItem that includes the standard log record. Collectors may inspect this record, and update their state accordingly. flt calls enrich_std_record to run the collectors on any standard records it receives.

For tremors loggers, collectors are run before a record is created with the data that will be used to create the record. The resultant extras are then added to the record by the standard logging framework. On the other hand, for standard logs, after the record is created, a filter or handler may run the current logger’s collectors on the record itself.

from tremors import enrich_std_record
from tremors.collector import identifier


def flt(record):
    own_record = copy.copy(record)
    enrich_std_record(own_record)
    elapsed_msg = elapsed.formatter(own_record)
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    ident_msg = identifier.formatter(own_record)
    own_record.identifier = f"{ident_msg} " if ident_msg else ""
    return own_record


@logged(elapsed.factory(), identifier.factory())
def fn(*, logger=from_logged):
    logger.info("attempting to capture a standard log")
    std()


def std():
    time.sleep(0.1)
    logging.getLogger().info("capture me")
    std2()


def std2():
    time.sleep(0.1)
    logging.getLogger().info("capture me too")


logging.basicConfig(
    format="%(elapsed)s%(identifier)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
    force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()

The identifiers collector uses the record from the standard logs to append a path component identifying the standard logger. The elapsed collector doesn’t need any additional information from the record to mark the duration.

0.000 [p:... t:... g:...] fn INFO:root:entered: fn
0.000 [p:... t:... g:...] fn INFO:root:attempting to capture a standard log
0.101 [p:... t:... g:...] fn/std:root INFO:root:capture me
0.201 [p:... t:... g:...] fn/std:root INFO:root:capture me too
0.201 [p:... t:... g:...] fn INFO:root:exited: fn

Let’s finish up with a full example that demonstrates how to turn log messages with collector state into structured JSON that can be persisted in a log store.

import copy
import logging
import time
import uuid

import pydantic
import tremors
from tremors import collector


class AppLogMessage(pydantic.BaseModel):
    """The model for app logs."""

    msg: str
    timestamp: float
    group_id: uuid.UUID | None
    path_list: list[str] | None = pydantic.Field(exclude=True)
    count: int | None
    t0: float | None = pydantic.Field(exclude=True)
    t: float | None = pydantic.Field(exclude=True)
    tt0: float | None = pydantic.Field(exclude=True)
    tt: float | None = pydantic.Field(exclude=True)

    @pydantic.computed_field
    @property
    def path(self) -> str | None:
        if self.path_list is None:
            return None
        return "/".join(self.path_list)

    @pydantic.computed_field
    @property
    def elapsed_ms(self) -> int | None:
        if self.t is None or self.t0 is None:
            return None
        return round((self.t - self.t0) / 1e6)

    @pydantic.computed_field
    @property
    def total_elapsed_ms(self) -> int | None:
        if self.tt is None or self.tt0 is None:
            return None
        return round((self.tt - self.tt0) / 1e6)


def flt(record: logging.LogRecord) -> logging.LogRecord:
    """Convert records with multiple collector states into structured logs.

    Also capture standard logs.
    """
    own_record = copy.copy(record)
    tremors.enrich_std_record(own_record)

    if not hasattr(own_record, tremors.EXTRA_KEY):
        return own_record

    if not hasattr(own_record, "message"):
        own_record.message = own_record.getMessage()

    ident_state = collector.get_state(
        collector.identifier.state, "identifier", own_record
    )
    count_state = collector.get_state(
        collector.enter_counter.state, "enter_counter", own_record
    )
    elapsed_state = collector.get_state(
        collector.elapsed.state, "elapsed", own_record
    )
    tot_elapsed_state = collector.get_state(
        collector.elapsed.state, "total_elapsed", own_record
    )
    group_id, path_list = (
        (ident_state.group_id, ident_state.path)
        if ident_state
        else (None, None)
    )
    count = count_state.count if count_state else None
    t0, t = (
        (elapsed_state.t0, elapsed_state.t)
        if elapsed_state
        else (None, None)
    )
    tt0, tt = (
        (tot_elapsed_state.t0, tot_elapsed_state.t)
        if tot_elapsed_state
        else (None, None)
    )
    try:
        msg = AppLogMessage(
            msg=own_record.message,
            timestamp=own_record.created,
            group_id=group_id,
            path_list=path_list,
            count=count,
            t0=t0,
            t=t,
            tt0=tt0,
            tt=tt,
        )
    except pydantic.ValidationError:
        return own_record
    else:
        own_record.app_log_message = msg
        return own_record


class TremorsHandler(logging.Handler):
    """A handler to persist structured logs in a store."""

    def emit(self, record: logging.LogRecord) -> None:
        """Queue a structured log for persistence to the log store.

        This is just a fake that only prints the JSON to stdout.
        """
        msg = getattr(record, "app_log_message", None)

        if not msg:
            return

        msg_json = msg.model_dump_json(indent=4)
        print(f"queue for log store >>> \033[0;34m{msg_json}\033[0m")  # noqa: T201


global_elapsed = collector.elapsed.factory(
    "total_elapsed", inherit=True
).create()
"""A global elapsed collector instance.

This collector can be added to all loggers to track the elapsed time since the
very first log.
"""

root_logged = tremors.logged(
    collector.identifier.factory(name="identifier", inherit=True),
    collector.enter_counter.factory(name="enter_counter"),
    collector.elapsed.factory(name="elapsed"),
    global_elapsed,
    is_root=True,
)
"""A canned decorator for creating root loggers."""

child_logged = tremors.logged(
    collector.enter_counter.factory(name="enter_counter"),
    collector.elapsed.factory(name="elapsed"),
)
"""A canned decorator for creating child loggers."""


@root_logged
def run_app(logger: tremors.Logger = tremors.from_logged) -> None:
    """The application entry point.

    This function has a root logger. The logger's identifiers collector will be
    inherited by all descendants, so they will all share a group_id, and a
    common path.
    """
    logger.info("app initialized")
    do_something()
    do_something_else()
    do_something_else()
    some_library()


@child_logged
def do_something(logger: tremors.Logger = tremors.from_logged) -> None:
    """This is a function you maintain, and can use a tremors logger.

    Its logger inherits its parent's identifiers collector, and the global
    elapsed collector.
    """
    logger.info("doing something")
    do_something_something()


@child_logged
def do_something_something(logger: tremors.Logger = tremors.from_logged):
    """This is another function you maintain, and can use a tremors logger.

    Its logger also inherits its parent's identifiers collector, and the global
    elapsed collector.
    """
    logger.info("doing something, something")
    time.sleep(0.1)


@root_logged
def do_something_else(logger: tremors.Logger = tremors.from_logged) -> None:
    """This is a function you maintain, and has a tremors root logger.

    Its logger gets its own identifiers collector, along with a new group_id,
    and begins a new root path. All of its descendants will share this new
    group_id, have paths starting from this logger's path. However, the logger
    also gets the shared global elapsed collector.
    """
    logger.info("doing something else")
    time.sleep(0.1)


def some_library() -> None:
    """This is a function you don't maintain that uses standard logging.

    As long as this function's logs flow to the standard root logger, its logs
    will be enriched by the current tremors logger's collectors, and structured
    logs will be persisted to the store. These logs will share the current
    logger's group_id, and have a path based on the current logger's path.
    """
    logging.getLogger("some_library").info("doing library stuff")
    time.sleep(0.1)
    another_library()


def another_library() -> None:
    """This is another function you don't maintain that uses standard logging.

    Its logs will be treated similarly to those of ``some_library``. However,
    path information for loggers between this function, and the current logger
    in the call stack will not be recorded. E.g., the path for this function's
    logs will be ``run_app/std:another_library``, and not
    ``run_app/std:some_library/std:another_library``.
    """
    logging.getLogger("another_library").info("doing other library stuff")
    time.sleep(0.1)


if __name__ == "__main__":
    handler = TremorsHandler()
    handler.addFilter(flt)
    logging.basicConfig(level=logging.INFO, handlers=(handler,), force=True)
    run_app()

You can see from the logs that logs descended from the same root logger share a group_id, and the path follows the call stack. The count is increased once for each time a tremors logged function is called, and separate counts are maintained for each function. The duration of each function is recorded, as well as the total time that has elapsed since the entry point started. And standard logs are also captured as if they were descended from the closes tremors logged function up the call stack.

queue for log store >>> {
    "msg": "entered: run_app",
    "timestamp": 1772951375.6724658,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "app initialized",
    "timestamp": 1772951375.6726198,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "entered: do_something",
    "timestamp": 1772951375.6726973,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "doing something",
    "timestamp": 1772951375.6727448,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "entered: do_something_something",
    "timestamp": 1772951375.6727943,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something/do_something_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "doing something, something",
    "timestamp": 1772951375.672832,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something/do_something_something",
    "elapsed_ms": 0,
    "total_elapsed_ms": 0
}
queue for log store >>> {
    "msg": "exited: do_something_something",
    "timestamp": 1772951375.773108,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something/do_something_something",
    "elapsed_ms": 100,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "exited: do_something",
    "timestamp": 1772951375.7734556,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/do_something",
    "elapsed_ms": 101,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "entered: do_something_else",
    "timestamp": 1772951375.7737124,
    "group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
    "count": 1,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "doing something else",
    "timestamp": 1772951375.7738376,
    "group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
    "count": 1,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 101
}
queue for log store >>> {
    "msg": "exited: do_something_else",
    "timestamp": 1772951375.8741887,
    "group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
    "count": 1,
    "path": "do_something_else",
    "elapsed_ms": 100,
    "total_elapsed_ms": 202
}
queue for log store >>> {
    "msg": "entered: do_something_else",
    "timestamp": 1772951375.874693,
    "group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
    "count": 2,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 202
}
queue for log store >>> {
    "msg": "doing something else",
    "timestamp": 1772951375.874823,
    "group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
    "count": 2,
    "path": "do_something_else",
    "elapsed_ms": 0,
    "total_elapsed_ms": 202
}
queue for log store >>> {
    "msg": "exited: do_something_else",
    "timestamp": 1772951375.9751017,
    "group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
    "count": 2,
    "path": "do_something_else",
    "elapsed_ms": 100,
    "total_elapsed_ms": 303
}
queue for log store >>> {
    "msg": "doing library stuff",
    "timestamp": 1772951375.975387,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/std:some_library",
    "elapsed_ms": 303,
    "total_elapsed_ms": 303
}
queue for log store >>> {
    "msg": "doing other library stuff",
    "timestamp": 1772951376.0755894,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app/std:another_library",
    "elapsed_ms": 403,
    "total_elapsed_ms": 403
}
queue for log store >>> {
    "msg": "exited: run_app",
    "timestamp": 1772951376.176055,
    "group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
    "count": 1,
    "path": "run_app",
    "elapsed_ms": 504,
    "total_elapsed_ms": 504
}

See the collector module in the full documentation for how you can define your own collectors, and bundles.

API

logger Module

This module contains the Tremors Logger and Collector definition.

A Tremors logger can be used as a drop-in replacement for a standard Logger. Collectors may be attached to a logger. Each time a message is logged, the collectors run if they are enabled for the message level. When a collector runs, it may update its state. A logger adds the current state of all of its collectors to any LogRecord that it produces.

A logger may also be used as a context manager. When used this way, loggers maintain a hierarchy. Each logger has a parent logger, unless it is the root logger. The root logger assigns a group_id that is shared by all loggers in the hierarchy. Each logger has a path from its root logger to itself. Hierarchies may be nested by explicitly creating a new root logger within an existing hierarchy. The loggers in the nested hierarchy will have a different group ID than the loggers in the containing hierarchy. The paths in the nested hierarchy will also start from the new root logger.

When a logger context is entered, or exited, a message is automatically logged at a specified level, and collectors will run if they are enabled for the message level. This allows us to define collectors that measure lifecycle information, such as the duration of a context, or how much memory was allocated at the beginning, and end of a context.

Each Tremors logger uses an underlying standard logger that may be specified. The underlying logger can be configured in the normal fashion. Underlying loggers may be shared by Tremors loggers. In fact, it is common for all Tremors loggers to use the standard root logger. The Tremors logger simply adds the states of its collectors to the records produced by its standard logger. These augmented records can then be processed by any Filter, Formatter, or Handler that knows about the extra record attributes.

A collector may be bundled with a formatter that can extract the collector state from a record, and format it. This formatter can be used in a filter, formatter, or handler that has been configured for the underling logger. Tremors comes with many useful collector bundles. But you can also define custom collectors, and bundles.

class tremors.logger.LogItem(logger: Logger | None, level: int, msg: object, args: tuple[object, ...] | Mapping[str, object] | None, exc_info: bool | tuple[type[BaseException], BaseException, TracebackType | None] | tuple[None, None, None] | BaseException | None, stack_info: bool, stacklevel: int | None, extra: Mapping[str, object] | None, record: logging.LogRecord | None)[source]

Bases: NamedTuple

The properties of a logged message.

class tremors.logger.Collector(id: uuid.UUID, name: str, level: int, inherit: bool, state: TState, collect: Callable[[TState, LogItem], TState])[source]

Bases: NamedTuple, Generic

The collector specification.

id

A unique identifier for the collector.

Type:

uuid.UUID

name

The name of the collector. When the collector runs for a logged message, the collector’s state is added to the LogRecord, and may be retrieved using this name.

Type:

str

level

The minimum level a logged message must be for the collector to run.

Type:

int

inherit

If True, the collector will also be added to descendant loggers, provided its name does not conflict that of any of the other collectors on the descendant loggers.

Type:

bool

state

The initial state of the collector. Each time the collector runs, it may update this state.

Type:

TState

collect

When the collector runs, this function is called with the current state, and the LogItem. It returns the new state.

Type:

collections.abc.Callable[[TState, tremors.logger.LogItem], TState]

class tremors.logger.CollectorFactory[source]

Bases: Generic

A factory that creates a collector.

abstractmethod create() Collector[TState][source]

Create a collector.

class tremors.logger.Logger(*collectors: Collector[Any] | CollectorFactory[Any], name: str, logger_name: str | None = None, ctx_level: int = 20, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False)[source]

Bases: LoggerAdapter[Logger]

The Tremors logger.

Parameters:
  • *collectors – Collectors or CollectorFactories. The collectors, or resultant collectors will be attached to the logger, and will be run for every logged message for which the collector is enabled.

  • name – The name of the logger. This name is logged in the default entered, and exited messages. This name is used to generate the path, but may be altered in the path to be unique in the hierarchy.

  • logger_name – The name of the underlying Logger that will be used to log messages. If None, the root logger will be used.

  • ctx_level – The level at which entered and exited messages will be logged.

  • enter_msg – If a non-empty string, this will be the logged entered message. If truthy, the default entered message will be logged. Otherwise, no entered message will be logged.

  • exit_msg – If a non-empty string, this will be the logged exited message. If truthy, the default exited message will be logged. Otherwise, no exited message will be logged.

  • is_root – If True, a root logger with no parent that starts a new hierarchy will be created.

property collectors: Mapping[str, Collector[Any]]

The logger collectors.

property group_id: UUID | None

The group ID for the hierarchy assigned by the root logger.

property name: str

The logger name.

This is not the name of the underlying Python logger. This is the name of the logger in the path if the logger has been entered. Otherwise, it is the name that the logger was initialized with.

property parent: Logger | None

The parent logger, or None if this is a root logger.

property path: tuple[str, ...] | None

The logger path.

The path is a sequence of logger names following the hierarchy of loggers from the root logger to this one. The final name in the path will be the name that the logger was initialized with if a logger with the same name has not been registered at that path index yet. Otherwise, an incremental number is appended to the initial name to get a unique name for that path index.

Example

If there are 2 loggers named “baz” at the third level of the hierarchy, their respecitve paths will be:

["foo", "bar", "baz"]
["foo", "bar", "baz2"]
register_path(logger: Logger) tuple[str, ...][source]

A logger can register a path for itself with its parent.

This method should not be called directly. The logger will automatically register itself with its parent.

collect(*identifiers: str | UUID, log_item: LogItem, extra_tremors: MutableMapping[str, object] | None = None) MutableMapping[str, object][source]

Run the specified collectors.

Parameters:
  • *identifiers – Collector names, or IDs to run.

  • log_item – The LogItem to pass to collectors.

  • extra_tremors – The initial EXTRA_KEY extras. This mapping is mutated, and the new collector states are added to it. If none, an empty dict will be used.

Returns:

The mutated extra_tremors, or a new dict with the collector states.

log(level: int, msg: object, *args: object, exc_info: bool | tuple[type[BaseException], BaseException, TracebackType | None] | tuple[None, None, None] | BaseException | None = None, stack_info: bool = False, stacklevel: int = 1, extra: Mapping[str, object] | None = None, **_kwargs: object) None[source]

Run collectors, and log a message at the given level.

The arguments are interpreted as for debug().

This method mutates, or replaces extra[EXTRA_KEY].

tremors.logger.enrich_std_record(record: LogRecord) None[source]

Enrich a record not created by a tremors logger.

This function mutates record if the record doesn’t have the tremors attribute, EXTRA_KEY, i.e., a standard logger produced the record. The current logger’s collectors are run on a LogItem derived from the record’s attributes. Then the results are stored on the record in the EXTRA_KEY attribute, so the record may subsequently be processed by collector formatters.

The function may be used in filters to capture, and treat standard logs, as if they were logged by the current logger.

If there is no current logger, the record won’t be modified.

decorator Module

This module contains decorators for using Tremors loggers.

The logged() decorator wraps a callable in a Tremors logger context, and injects that logger into the callable.

tremors.decorator.logged(fn_or_collector: Callable[P, TRet]) Callable[P, TRet][source]
tremors.decorator.logged(fn_or_collector: Collector[Any] | CollectorFactory[Any], *collectors: Collector[Any] | CollectorFactory[Any], name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, TRet]], Callable[P, TRet]]
tremors.decorator.logged(*, name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, TRet]], Callable[P, TRet]]

Inject a Logger into a callable.

The callable must have a keyword-only parameter logger. If the value for logger is the default value, from_logged, it is replaced by a logger with the specified collectors, name, and underlying logger. This logger is entered, and exited immediately before and after the callable runs. Otherwise, the supplied logger will be used in the call. A supplied logger will not be automatically entered, and exited.

If fn_or_collector is a callable, e.g., the decorator was used like @logged, it is assumed that no collectors were specified, and the default collectors are used. Otherwise, it is assumed this is the first collector, e.g., the decorator was used like @logged(some_collector, ...).

*collectors is the rest of the collectors, with the first collector being fn_or_collector.

The rest of the arguments are passed to :class:~`tremors.logger.Logger`.

tremors.decorator.async_logged(fn_or_collector: Callable[P, Coroutine[object, object, TRet]]) Callable[P, Coroutine[object, object, TRet]][source]
tremors.decorator.async_logged(fn_or_collector: Collector[Any] | CollectorFactory[Any], *collectors: Collector[Any] | CollectorFactory[Any], name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, Coroutine[object, object, TRet]]], Callable[P, Coroutine[object, object, TRet]]]
tremors.decorator.async_logged(*, name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, Coroutine[object, object, TRet]]], Callable[P, Coroutine[object, object, TRet]]]

Inject a Logger into a async callable.

Apart for the async callable, the arguments are the same as for logged().

tremors.decorator.from_logged = <Logger root (WARNING)>

A sentinel logger value.

When used as the logger argument to a logged() callable, the callable is wrapped in a Logger context that is injected into the callable as the logger argument.

collector Module

This module contains collector bundles and tools.

A collector bundle is an object that contains the definition of the collector state, a factory function to create a collector, and a formatter function that will extract, and format the collector state from a LogRecord.

In addition to the bundles in this module, you can create custom bundles. A collector is generic in the type of its state, which can be of an arbitrary type. A collector factory can be defined to have arbitrary parameters that depend on how the collector is initialized. A collector formatter has a fixed signature as described in Formatter.

tremors.collector.get_state(typ: type[T], name: str, record: LogRecord) T | None[source]

Get the collector state from a LogRecord.

The function can be used by custom collector formatters to get the collector state from a record.

Parameters:
  • typ – The collector state class.

  • name – The name of the collector.

  • record – The LogRecord the may contain the collector state.

Returns:

An object of type typ if the LogRecord contains such an object for the collector. Otherwise None.

class tremors.collector.Formatter(*args, **kwargs)[source]

Bases: Protocol

Callback proctocol for a collector formatter.

__call__(record: LogRecord, *, name: str = '', fmt: object = '', default: str = '') str[source]

Extract and format a collector state.

Parameters:
  • record – A LogRecord that may contain the collector state.

  • name – The name of the collector.

  • fmt – A format specification. Implementations commonly expect this to adhere to the Format specification mini-language. Then str(fmt) may be used to obtain the specification, so fmt does not have to be a string, but could be any object with an appropriate string representation.

  • default – A value that may be returned if the formatter could not obtain, or format the state.

Returns:

The state formatted as a string, or the default value.

class tremors.collector.CollectorBundle(state: type[T], factory: U, formatter: Formatter)[source]

Bases: NamedTuple, Generic

A bundle to create, and work with a collector.

class tremors.collector.IndentifierState(group_id: uuid.UUID, parent: tremors.logger.Logger | None, path: tuple[str, ...])[source]

Bases: NamedTuple

The state of a collector created by IdentifierFactory.

class tremors.collector.IdentifierFactory(name: str = 'identifier', *, level: int = 20, inherit: bool = False)[source]

Bases: CollectorFactory[IndentifierState | None]

Create collectors to gather logger identifiers.

create() Collector[IndentifierState | None][source]

Create a collector.

tremors.collector.identifier_formatter(record: LogRecord, *, name: str = 'identifier', fmt: object = '[p:{process} t:{thread} g:{group_id}] {path}', default: str = '') str[source]

Extract an IndentifierState from a record, and format it.

str(fmt) may contain any of the fields in this table. The formatted string is generated by replacing the fields with the values described in the table.

Field

Value

process

f"p:{process} " where process is the process ID if it’s available. Otherwise the empty string.

thread

f"t:{thread} " where thread is the thread ID if it’s available. Otherwise the empty string.

group_id

f"t:{group_id}" where group_id is the group ID.

path

The path represented as a POSIX-like path.

Returns:

The formatted string if the record contains a state for the collector. Otherwise default.

tremors.collector.identifier = (<class 'tremors.collector.IndentifierState'>, <class 'tremors.collector.IdentifierFactory'>, <function identifier_formatter>)

The identifier collector bundle.

Examples

With basicConfig() we configure the standard root logger with a stream handler that has a formatter that incorporates the record’s ident attribute in the message. We add the filter function flt to the handler. The filter uses the formatter from the identifier collector bundle to extract the collector state from a record, and format it. Then the filter returns a modified record with the ident attribute set to the formatted state.

Note

If we had added the filter to the logger instead of the handler, the filter would not be called for messages logged by descendant standard loggers. But by adding the filter to the handler, we ensure that it is called for all messages that the handler could emit.

import copy
import logging

from tremors import from_logged, logged
from tremors.collector import identifier


def flt(record):
    own_record = copy.copy(record)
    ident = identifier.formatter(own_record)
    own_record.ident = f"{ident} > " if ident else ""
    return own_record


@logged(identifier.factory())
def parent(*, logger=from_logged):
    logger.info("foo")
    child()


@logged(identifier.factory())
def child(*, logger=from_logged):
    logger.info("bar")


logging.basicConfig(
    format="%(ident)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO
)
logging.root.handlers[0].addFilter(flt)
parent()

The logged messages contain the process ID, thread ID, group ID, and hierarchy as a path.

[p:... t:... g:...] parent > INFO:root:entered: parent
[p:... t:... g:...] parent > INFO:root:foo
[p:... t:... g:...] parent/child > INFO:root:entered: child
[p:... t:... g:...] parent/child > INFO:root:bar
[p:... t:... g:...] parent/child > INFO:root:exited: child
[p:... t:... g:...] parent > INFO:root:exited: parent

We can expand on the previous example by throwing a function that uses standard logging into the mix. child2 calls other, which could be a function that you can’t change, and doesn’t use Tremors loggers. By default, standard loggers propagate their messages, so descendant messages will eventually be processed by the root logger’s handler with our filter. The collector formatter will not be able to extract the state from a descendant record, and will return the default value, the empty string. Then record.ident will be the empty string, which will be interpolated into the formatted message.

Note

Since the formatter expects the record to have an ident attribute, the filter must always add this attribute to the record.

@logged(identifier.factory())
def child(*, logger=from_logged) -> None:
    logger.info("bar")
    other()


def other() -> None:
    logging.getLogger(__name__).info("baz")


parent()

The functions that use Tremors loggers have entered and exited messages, and their messages contain identifier information, as before. The other function, which logs with a standard logger simply emits messages with empty ident sections.

[p:... t:... g:...] parent > INFO:root:entered: parent
[p:... t:... g:...] parent > INFO:root:foo
[p:... t:... g:...] parent/child > INFO:root:entered: child
[p:... t:... g:...] parent/child > INFO:root:bar
INFO:__main__:baz
[p:... t:... g:...] parent/child > INFO:root:exited: child
[p:... t:... g:...] parent > INFO:root:exited: parent
class tremors.collector.CounterState(count: int, step: int)[source]

Bases: object

The state of a collector created by the counter factory.

count: int
step: int
class tremors.collector.CounterFactory(name: str = 'counter', *, level: int = 20, inherit: bool = False, initial: int = 0, step: int = 1)[source]

Bases: CollectorFactory[CounterState]

Create collectors to count logging calls.

create() Collector[CounterState][source]

Create a collector.

tremors.collector.counter_formatter(record: LogRecord, *, name: str = 'counter', fmt: object = '{count}', default: str = '0') str[source]

Extract a count from a record, and format it.

str(fmt) may contain the field count. The formatted string is generated by replacing the field with the state’s count.

Returns:

The formatted string if the record contains a state for the collector. Otherwise default.

tremors.collector.counter = (<class 'tremors.collector.CounterState'>, <class 'tremors.collector.CounterFactory'>, <function counter_formatter>)

The counter collector bundle.

Examples

We configure standard logging in a similar manner to the examples in identifier, except here we use counter collectors.

import copy
import logging

from tremors import from_logged, logged
from tremors.collector import counter


def flt(record):
    own_record = copy.copy(record)
    count = counter.formatter(own_record, fmt="[{count}] ", default="")
    own_record.count = count
    return own_record


@logged(counter.factory())
def parent(*, logger=from_logged):
    logger.info("foo")
    child()


@logged(counter.factory())
def child(*, logger=from_logged):
    logger.info("bar")


logging.basicConfig(
    format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO
)
logging.root.handlers[0].addFilter(flt)
parent()

The logged messages include the count, where the parent and child counts are distinct from each other because each loggers uses its own counter collector.

[1] INFO:root:entered: parent
[2] INFO:root:foo
[1] INFO:root:entered: child
[2] INFO:root:bar
[3] INFO:root:exited: child
[3] INFO:root:exited: parent

Now let’s modify the previous example to have parent and child use a shared counter by passing a collector instance that we get from the factory’s create method.

shared_counter = counter.factory().create()


@logged(shared_counter)
def parent(*, logger=from_logged) -> None:
    logger.info("foo")
    child()


@logged(shared_counter)
def child(*, logger=from_logged) -> None:
    logger.info("bar")


parent()

Here the count is shared between the two functions.

[1] INFO:root:entered: parent
[2] INFO:root:foo
[3] INFO:root:entered: child
[4] INFO:root:bar
[5] INFO:root:exited: child
[6] INFO:root:exited: parent
class tremors.collector.EnterCounterState(cache_key: str | None, step: int)[source]

Bases: object

The state of a collector created by the enter counter factory.

cache_key: str | None
step: int
property count: int

Get the count from the cache.

class tremors.collector.EnterCounterFactory(name: str = 'enter_counter', *, cache_key: str | None = None, level: int = 20, inherit: bool = False, step: int = 1)[source]

Bases: CollectorFactory[EnterCounterState]

Create collectors to count when a logger is entered.

create() Collector[EnterCounterState][source]

Create a collector.

tremors.collector.enter_counter_formatter(record: LogRecord, *, name: str = 'enter_counter', fmt: object = '{enter_count}', default: str = '0') str[source]

Extract a count from a record, and format it.

str(fmt) may contain the field enter_count. The formatted string is generated by replacing the field with the state’s count.

Returns:

The formatted string if the record contains a state for the collector. Otherwise default.

tremors.collector.enter_counter = (<class 'tremors.collector.EnterCounterState'>, <class 'tremors.collector.EnterCounterFactory'>, <function enter_counter_formatter>)

The enter counter collector bundle.

Examples

We configure standard logging in a similar manner to the examples in counter, except here we use enter counter collectors, which will only count entered logs.

import copy
import logging

from tremors import from_logged, logged
from tremors.collector import enter_counter


def flt(record):
    own_record = copy.copy(record)
    count = enter_counter.formatter(
        own_record, fmt="[{enter_count}] ", default=""
    )
    own_record.count = count
    return own_record


@logged(enter_counter.factory())
def foo(run: int, *, logger=from_logged):
    logger.info("foo %d", run)


@logged(enter_counter.factory())
def bar(run: int, *, logger=from_logged):
    logger.info("bar %d", run)


logging.basicConfig(
    format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO
)
logging.root.handlers[0].addFilter(flt)
foo(1)
bar(1)
foo(2)
foo(3)
bar(2)

The logged messages include the total number of times each function has been called.

[1] INFO:root:entered: foo
[1] INFO:root:foo 1
[1] INFO:root:exited: foo
[1] INFO:root:entered: bar
[1] INFO:root:bar 1
[1] INFO:root:exited: bar
[2] INFO:root:entered: foo
[2] INFO:root:foo 2
[2] INFO:root:exited: foo
[3] INFO:root:entered: foo
[3] INFO:root:foo 3
[3] INFO:root:exited: foo
[2] INFO:root:entered: bar
[2] INFO:root:bar 2
[2] INFO:root:exited: bar

Let’s see what happens if a function doesn’t log the entered message.

@logged(enter_counter.factory(), enter_msg=False, exit_msg=False)
def baz(run: int, *, logger=from_logged):
    logger.info("baz %d", run)


foo(4)
baz(1)
foo(5)
foo(6)
baz(2)

The foo counter keeps counting up from the previous example, but the baz counter doesn’t increase at all because the baz logger doesn’t log entered messages.

[4] INFO:root:entered: foo
[4] INFO:root:foo 4
[4] INFO:root:exited: foo
[0] INFO:root:baz 1
[5] INFO:root:entered: foo
[5] INFO:root:foo 5
[5] INFO:root:exited: foo
[6] INFO:root:entered: foo
[6] INFO:root:foo 6
[6] INFO:root:exited: foo
[0] INFO:root:baz 2
class tremors.collector.ElapsedState(t0: int | None, t: int | None)[source]

Bases: object

The state of a collector created by ElapsedFactory.

t0: int | None
t: int | None
class tremors.collector.ElapsedFactory(name: str = 'elapsed', *, level: int = 20, inherit: bool = False)[source]

Bases: CollectorFactory[ElapsedState]

Create collectors to measure how much time has elapsed.

create() Collector[ElapsedState][source]

Create a collector.

tremors.collector.elapsed_formatter(record: LogRecord, *, name: str = 'elapsed', fmt: object = '{elapsed:.3f}', default: str = '') str[source]

Extract an ElapsedState from a record, and format it.

str(fmt) may contain the field elapsed. The formatted string is generated by replacing the field with the state’s elapsed time in seconds.

Returns:

The formatted string if the record contains a state for the collector. Otherwise default.

tremors.collector.elapsed = (<class 'tremors.collector.ElapsedState'>, <class 'tremors.collector.ElapsedFactory'>, <function elapsed_formatter>)

The elapsed collector bundle.

Examples

We configure standard logging in a similar manner to the previous examples identifier, except here we use elapsed collectors, and identifier collectors with a custom format to only show the path.

import copy
import logging
import time

from tremors import from_logged, logged
from tremors.collector import elapsed, identifier


def flt(record):
    own_record = copy.copy(record)
    ident_msg = identifier.formatter(own_record, fmt="{path}")
    elapsed_msg = elapsed.formatter(own_record)
    own_record.ident = f"{ident_msg} > " if ident_msg else ""
    own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
    return own_record


@logged(identifier.factory(), elapsed.factory())
def parent(*, logger=from_logged):
    logger.info("sleeping for 1s...")
    time.sleep(1)
    child()


@logged(identifier.factory(), elapsed.factory())
def child(*, logger=from_logged):
    logger.info("sleeping for 1s...")
    time.sleep(1)


logging.basicConfig(
    format="%(elapsed)s%(ident)s%(levelname)s:%(name)s:%(message)s",
    level=logging.INFO,
)
logging.root.handlers[0].addFilter(flt)
parent()

The logged messages include the elapsed time since each elapsed collector started.

0.000 parent > INFO:root:entered: parent
0.000 parent > INFO:root:sleeping for 1s...
0.000 parent/child > INFO:root:entered: child
0.000 parent/child > INFO:root:sleeping for 1s...
1.001 parent/child > INFO:root:exited: child
2.001 parent > INFO:root:exited: parent

We can try the same thing again, only this time with a shared collectors. The parent’s identifier, and elapsed collectors will be inherited by its descendants.

@logged(identifier.factory(inherit=True), elapsed.factory(inherit=True))
def parent(*, logger=from_logged):
    logger.info("sleeping for 1s...")
    time.sleep(1)
    child()


@logged
def child(*, logger=from_logged):
    logger.info("sleeping for 1s...")
    time.sleep(1)
    grandchild()


@logged
def grandchild(*, logger=from_logged):
    logger.info("sleeping for 0.5s...")
    time.sleep(0.5)


parent()

Notice that this time, the child elapsed time starts at 1s instead of resetting to 0s.

0.000 parent > INFO:root:entered: parent
0.000 parent > INFO:root:sleeping for 1s...
1.000 parent/child > INFO:root:entered: child
1.001 parent/child > INFO:root:sleeping for 1s...
2.001 parent/child/grandchild > INFO:root:entered: grandchild
2.001 parent/child/grandchild > INFO:root:sleeping for 0.5s...
2.502 parent/child/grandchild > INFO:root:exited: grandchild
2.502 parent/child > INFO:root:exited: child
2.502 parent > INFO:root:exited: parent