Tremors¶
Tremors is a library for logging while collecting metrics. Tremors loggers are drop-in replacements for standard loggers. But Tremors loggers have metrics collectors that run when messages are logged. The loggers are also context managers. The library maintains a hierarchy of nested contexts, where all logs and metrics are grouped together. You can create a new hierarchy at anytime to group related logs.
Installation¶
pip install tremors
Usage¶
A function can be wrapped in a logger context with the logged decorator. If
you call the function without a logger argument, one will automatically be
injected into it.
import logging
from tremors import from_logged, logged
@logged
def fn(*, logger=from_logged):
logger.info("hello")
logging.basicConfig(
format="Tremors > %(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
)
fn()
The context automatically logs entered, and exited messages before,
and after each function call. The logger uses the configured standard root
logger by default to log the messages.
Tremors > INFO:root:entered: fn
Tremors > INFO:root:hello
Tremors > INFO:root:exited: fn
You may specify a standard logger by name for the Tremors logger to use as its underlying logger.
@loggeds.logged(logger_name=__name__)
def fn((*, logger: tremors.Logger = tremors.from_logged) -> None:
logger.info("hello")
fn()
The messages are logged by the specified underlying logger. Based on our standard logging configuration, the messages propagate from the underlying logger to the standard root logger, which emits them.
Tremors > INFO:__main__:entered: fn
Tremors > INFO:__main__:hello
Tremors > INFO:__main__:exited: fn
Next let’s use a collector to measure the elapsed time since the function
started each time a message is logged. When a message is logged, the logger
runs the collector, and adds its updated state to the message’s LogRecord. We
use a standard logging filter to inspect and modify the record before it is
emitted. We format the collector state, then add the formatted state to the
elapsed custom attribute of the record. Finally, we configure the root
logger’s formatter to incorporate the elapsed attribute.
Note
The elapsed collector bundle included with Tremors has a factory for
creating a collector. It also has a formatter that we use in flt to
extract the state from the record, and format it. In flt we make a copy
of the record, then modify and return the copy, instead of modifying the
original record, so as not to have side effects on other loggers that may
process the message. We also make sure to attach flt to the root
logger’s handler, and not to the logger itself; messages that originate
from descendant loggers will not go through logger filters when they are
propagated, but they will go through handler filters before they are
emitted.
import copy
import time
from tremors.collector import elapsed
def flt(record):
own_record = copy.copy(record)
elapsed_msg = elapsed.formatter(own_record)
own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
return own_record
@logged(elapsed.factory())
def fn(*, logger=from_logged):
logger.info("sleeping for 1s...")
time.sleep(1)
logging.basicConfig(
format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()
The messages contain elapsed information, according to the formatter configuration, that is sourced from the record’s elapsed custom attribute.
0.000 INFO:root:entered: fn
0.000 INFO:root:sleeping for 1s...
1.000 INFO:root:exited: fn
A Logger can have any number of collectors. Here, in addition to the elapsed
collector from the previous example, we add a counter collector. A collector
has a level, and will only run if the message is being logged at that level or
higher. Our counter level is ERROR. We can also control which custom record
attribute has the formatted collector state via the collector’s name. This is
useful if you have multiple of the same collector on a single logger. Here,
we name the counter errors, so record.errors will contain a formatted
string with the running total number of errors that have been logged by a
single function call. Finally, we an control the format of the counter state
via the fmt argument of the counter’s formatter.
from tremors.collector import counter
def flt(record):
own_record = copy.copy(record)
errors = counter.formatter(
own_record, name="errors", fmt="errors={count}"
)
own_record.errors = f"{errors} " if errors else ""
elapsed_msg = elapsed.formatter(own_record)
own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
return own_record
@logged(
elapsed.factory(), counter.factory(name="errors", level=logging.ERROR)
)
def fn(*, logger=from_logged):
logger.info("hello")
time.sleep(1)
logger.error("uh-ho!")
logging.basicConfig(
format="%(elapsed)s%(errors)s%(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()
The messages contain information from both collectors.
0.000 errors=0 INFO:root:entered: fn
0.000 errors=0 INFO:root:hello
1.001 errors=1 ERROR:root:uh-ho!
1.001 errors=1 INFO:root:exited: fn
Passing a collector factory, as in the previous example, will result in a new
counter collector being used each time the function is called. Let’s reuse the
same collector to keep a tally of errors across all calls to the function by
passing a collector instance that we get by calling the factory’s create
method.
fn_errors = counter.factory(name="errors", level=logging.ERROR).create()
@logged(fn_errors)
def fn(*, logger=from_logged):
logger.error("uh-ho!")
fn()
fn()
The error count doesn’t reset in the second function call.
errors=0 INFO:root:entered: fn
errors=1 ERROR:root:uh-ho!
errors=1 INFO:root:exited: fn
errors=1 INFO:root:entered: fn
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: fn
Another way we can tally the count across all function calls is to pass the same logger with each call.
from tremors import Logger
def fn(*, logger):
logger.error("uh-ho!")
with Logger(
counter.factory(name="errors", level=logging.ERROR), name="context"
) as logger:
fn(logger=logger)
fn(logger=logger)
We only get entered and exited messages for the context block. But the single logger used in both function calls maintains its state between calls.
errors=0 INFO:root:entered: context
errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:uh-ho!
errors=2 INFO:root:exited: context
Collectors may be inherited by descendant loggers. Let’s count errors across nested loggers.
@logged(
counter.factory(name="errors", level=logging.ERROR, inherit=True),
enter_msg=False,
exit_msg=False,
)
def parent(*, logger=from_logged):
logger.error("uh-ho!")
child()
child()
@logged(enter_msg=False, exit_msg=False)
def child(*, logger=from_logged):
logger.error("doh!")
grandchild()
@logged(enter_msg=False, exit_msg=False)
def grandchild(*, logger=from_logged):
logger.info("so far, so good")
logger.error("spoke too soon!")
parent()
We’ve disabled the entered and exited messages with the enter_msg and
exit_msg parameters. The parent counter is used in the child and
grandchild functions.
errors=1 ERROR:root:uh-ho!
errors=2 ERROR:root:doh!
errors=2 INFO:root:so far, so good
errors=3 ERROR:root:spoke too soon!
errors=4 ERROR:root:doh!
errors=4 INFO:root:so far, so good
errors=5 ERROR:root:spoke too soon!
Asynchronous functions and methods may be decorated with async_logged.
import asyncio
from tremors import async_logged
@async_logged(elapsed.factory())
async def async_work_long(*, logger=from_logged):
logger.info("long work starting")
await asyncio.sleep(1)
@async_logged(elapsed.factory())
async def async_work_short(*, logger=from_logged):
logger.info("short work starting")
await asyncio.sleep(0.1)
@async_logged(elapsed.factory())
async def async_fn(*, logger=from_logged):
coros = async_work_long(), async_work_short()
logger.info("awaiting %d works", len(coros))
await asyncio.gather(*coros)
logging.basicConfig(
format="%(elapsed)s%(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
force=True,
)
logging.root.handlers[0].addFilter(flt)
asyncio.run(async_fn())
0.000 INFO:root:entered: async_fn
0.000 INFO:root:awaiting 2 works
0.000 INFO:root:entered: async_work_long
0.000 INFO:root:long work starting
0.000 INFO:root:entered: async_work_short
0.000 INFO:root:short work starting
0.101 INFO:root:exited: async_work_short
1.002 INFO:root:exited: async_work_long
1.003 INFO:root:exited: async_fn
We can capture, and run collectors on standard logs using
enrich_std_record, which runs the current logger’s collectors with a
LogItem that includes the standard log record. Collectors may inspect this
record, and update their state accordingly. flt calls enrich_std_record
to run the collectors on any standard records it receives.
For tremors loggers, collectors are run before a record is created with the data that will be used to create the record. The resultant extras are then added to the record by the standard logging framework. On the other hand, for standard logs, after the record is created, a filter or handler may run the current logger’s collectors on the record itself.
from tremors import enrich_std_record
from tremors.collector import identifier
def flt(record):
own_record = copy.copy(record)
enrich_std_record(own_record)
elapsed_msg = elapsed.formatter(own_record)
own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else ""
ident_msg = identifier.formatter(own_record)
own_record.identifier = f"{ident_msg} " if ident_msg else ""
return own_record
@logged(elapsed.factory(), identifier.factory())
def fn(*, logger=from_logged):
logger.info("attempting to capture a standard log")
std()
def std():
time.sleep(0.1)
logging.getLogger().info("capture me")
std2()
def std2():
time.sleep(0.1)
logging.getLogger().info("capture me too")
logging.basicConfig(
format="%(elapsed)s%(identifier)s%(levelname)s:%(name)s:%(message)s",
level=logging.INFO,
force=True,
)
logging.root.handlers[0].addFilter(flt)
fn()
The identifiers collector uses the record from the standard logs to append a path component identifying the standard logger. The elapsed collector doesn’t need any additional information from the record to mark the duration.
0.000 [p:... t:... g:...] fn INFO:root:entered: fn
0.000 [p:... t:... g:...] fn INFO:root:attempting to capture a standard log
0.101 [p:... t:... g:...] fn/std:root INFO:root:capture me
0.201 [p:... t:... g:...] fn/std:root INFO:root:capture me too
0.201 [p:... t:... g:...] fn INFO:root:exited: fn
Let’s finish up with a full example that demonstrates how to turn log messages with collector state into structured JSON that can be persisted in a log store.
import copy
import logging
import time
import uuid
import pydantic
import tremors
from tremors import collector
class AppLogMessage(pydantic.BaseModel):
"""The model for app logs."""
msg: str
timestamp: float
group_id: uuid.UUID | None
path_list: list[str] | None = pydantic.Field(exclude=True)
count: int | None
t0: float | None = pydantic.Field(exclude=True)
t: float | None = pydantic.Field(exclude=True)
tt0: float | None = pydantic.Field(exclude=True)
tt: float | None = pydantic.Field(exclude=True)
@pydantic.computed_field
@property
def path(self) -> str | None:
if self.path_list is None:
return None
return "/".join(self.path_list)
@pydantic.computed_field
@property
def elapsed_ms(self) -> int | None:
if self.t is None or self.t0 is None:
return None
return round((self.t - self.t0) / 1e6)
@pydantic.computed_field
@property
def total_elapsed_ms(self) -> int | None:
if self.tt is None or self.tt0 is None:
return None
return round((self.tt - self.tt0) / 1e6)
def flt(record: logging.LogRecord) -> logging.LogRecord:
"""Convert records with multiple collector states into structured logs.
Also capture standard logs.
"""
own_record = copy.copy(record)
tremors.enrich_std_record(own_record)
if not hasattr(own_record, tremors.EXTRA_KEY):
return own_record
if not hasattr(own_record, "message"):
own_record.message = own_record.getMessage()
ident_state = collector.get_state(
collector.identifier.state, "identifier", own_record
)
count_state = collector.get_state(
collector.enter_counter.state, "enter_counter", own_record
)
elapsed_state = collector.get_state(
collector.elapsed.state, "elapsed", own_record
)
tot_elapsed_state = collector.get_state(
collector.elapsed.state, "total_elapsed", own_record
)
group_id, path_list = (
(ident_state.group_id, ident_state.path)
if ident_state
else (None, None)
)
count = count_state.count if count_state else None
t0, t = (
(elapsed_state.t0, elapsed_state.t)
if elapsed_state
else (None, None)
)
tt0, tt = (
(tot_elapsed_state.t0, tot_elapsed_state.t)
if tot_elapsed_state
else (None, None)
)
try:
msg = AppLogMessage(
msg=own_record.message,
timestamp=own_record.created,
group_id=group_id,
path_list=path_list,
count=count,
t0=t0,
t=t,
tt0=tt0,
tt=tt,
)
except pydantic.ValidationError:
return own_record
else:
own_record.app_log_message = msg
return own_record
class TremorsHandler(logging.Handler):
"""A handler to persist structured logs in a store."""
def emit(self, record: logging.LogRecord) -> None:
"""Queue a structured log for persistence to the log store.
This is just a fake that only prints the JSON to stdout.
"""
msg = getattr(record, "app_log_message", None)
if not msg:
return
msg_json = msg.model_dump_json(indent=4)
print(f"queue for log store >>> \033[0;34m{msg_json}\033[0m") # noqa: T201
global_elapsed = collector.elapsed.factory(
"total_elapsed", inherit=True
).create()
"""A global elapsed collector instance.
This collector can be added to all loggers to track the elapsed time since the
very first log.
"""
root_logged = tremors.logged(
collector.identifier.factory(name="identifier", inherit=True),
collector.enter_counter.factory(name="enter_counter"),
collector.elapsed.factory(name="elapsed"),
global_elapsed,
is_root=True,
)
"""A canned decorator for creating root loggers."""
child_logged = tremors.logged(
collector.enter_counter.factory(name="enter_counter"),
collector.elapsed.factory(name="elapsed"),
)
"""A canned decorator for creating child loggers."""
@root_logged
def run_app(logger: tremors.Logger = tremors.from_logged) -> None:
"""The application entry point.
This function has a root logger. The logger's identifiers collector will be
inherited by all descendants, so they will all share a group_id, and a
common path.
"""
logger.info("app initialized")
do_something()
do_something_else()
do_something_else()
some_library()
@child_logged
def do_something(logger: tremors.Logger = tremors.from_logged) -> None:
"""This is a function you maintain, and can use a tremors logger.
Its logger inherits its parent's identifiers collector, and the global
elapsed collector.
"""
logger.info("doing something")
do_something_something()
@child_logged
def do_something_something(logger: tremors.Logger = tremors.from_logged):
"""This is another function you maintain, and can use a tremors logger.
Its logger also inherits its parent's identifiers collector, and the global
elapsed collector.
"""
logger.info("doing something, something")
time.sleep(0.1)
@root_logged
def do_something_else(logger: tremors.Logger = tremors.from_logged) -> None:
"""This is a function you maintain, and has a tremors root logger.
Its logger gets its own identifiers collector, along with a new group_id,
and begins a new root path. All of its descendants will share this new
group_id, have paths starting from this logger's path. However, the logger
also gets the shared global elapsed collector.
"""
logger.info("doing something else")
time.sleep(0.1)
def some_library() -> None:
"""This is a function you don't maintain that uses standard logging.
As long as this function's logs flow to the standard root logger, its logs
will be enriched by the current tremors logger's collectors, and structured
logs will be persisted to the store. These logs will share the current
logger's group_id, and have a path based on the current logger's path.
"""
logging.getLogger("some_library").info("doing library stuff")
time.sleep(0.1)
another_library()
def another_library() -> None:
"""This is another function you don't maintain that uses standard logging.
Its logs will be treated similarly to those of ``some_library``. However,
path information for loggers between this function, and the current logger
in the call stack will not be recorded. E.g., the path for this function's
logs will be ``run_app/std:another_library``, and not
``run_app/std:some_library/std:another_library``.
"""
logging.getLogger("another_library").info("doing other library stuff")
time.sleep(0.1)
if __name__ == "__main__":
handler = TremorsHandler()
handler.addFilter(flt)
logging.basicConfig(level=logging.INFO, handlers=(handler,), force=True)
run_app()
You can see from the logs that logs descended from the same root logger share a group_id, and the path follows the call stack. The count is increased once for each time a tremors logged function is called, and separate counts are maintained for each function. The duration of each function is recorded, as well as the total time that has elapsed since the entry point started. And standard logs are also captured as if they were descended from the closes tremors logged function up the call stack.
queue for log store >>> {
"msg": "entered: run_app",
"timestamp": 1772951375.6724658,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app",
"elapsed_ms": 0,
"total_elapsed_ms": 0
}
queue for log store >>> {
"msg": "app initialized",
"timestamp": 1772951375.6726198,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app",
"elapsed_ms": 0,
"total_elapsed_ms": 0
}
queue for log store >>> {
"msg": "entered: do_something",
"timestamp": 1772951375.6726973,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/do_something",
"elapsed_ms": 0,
"total_elapsed_ms": 0
}
queue for log store >>> {
"msg": "doing something",
"timestamp": 1772951375.6727448,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/do_something",
"elapsed_ms": 0,
"total_elapsed_ms": 0
}
queue for log store >>> {
"msg": "entered: do_something_something",
"timestamp": 1772951375.6727943,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/do_something/do_something_something",
"elapsed_ms": 0,
"total_elapsed_ms": 0
}
queue for log store >>> {
"msg": "doing something, something",
"timestamp": 1772951375.672832,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/do_something/do_something_something",
"elapsed_ms": 0,
"total_elapsed_ms": 0
}
queue for log store >>> {
"msg": "exited: do_something_something",
"timestamp": 1772951375.773108,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/do_something/do_something_something",
"elapsed_ms": 100,
"total_elapsed_ms": 101
}
queue for log store >>> {
"msg": "exited: do_something",
"timestamp": 1772951375.7734556,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/do_something",
"elapsed_ms": 101,
"total_elapsed_ms": 101
}
queue for log store >>> {
"msg": "entered: do_something_else",
"timestamp": 1772951375.7737124,
"group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
"count": 1,
"path": "do_something_else",
"elapsed_ms": 0,
"total_elapsed_ms": 101
}
queue for log store >>> {
"msg": "doing something else",
"timestamp": 1772951375.7738376,
"group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
"count": 1,
"path": "do_something_else",
"elapsed_ms": 0,
"total_elapsed_ms": 101
}
queue for log store >>> {
"msg": "exited: do_something_else",
"timestamp": 1772951375.8741887,
"group_id": "17a2ac21-f3cf-4384-bb85-0f49c644947b",
"count": 1,
"path": "do_something_else",
"elapsed_ms": 100,
"total_elapsed_ms": 202
}
queue for log store >>> {
"msg": "entered: do_something_else",
"timestamp": 1772951375.874693,
"group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
"count": 2,
"path": "do_something_else",
"elapsed_ms": 0,
"total_elapsed_ms": 202
}
queue for log store >>> {
"msg": "doing something else",
"timestamp": 1772951375.874823,
"group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
"count": 2,
"path": "do_something_else",
"elapsed_ms": 0,
"total_elapsed_ms": 202
}
queue for log store >>> {
"msg": "exited: do_something_else",
"timestamp": 1772951375.9751017,
"group_id": "45f2a590-2107-4e6d-8f64-8e0f5e462fc3",
"count": 2,
"path": "do_something_else",
"elapsed_ms": 100,
"total_elapsed_ms": 303
}
queue for log store >>> {
"msg": "doing library stuff",
"timestamp": 1772951375.975387,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/std:some_library",
"elapsed_ms": 303,
"total_elapsed_ms": 303
}
queue for log store >>> {
"msg": "doing other library stuff",
"timestamp": 1772951376.0755894,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app/std:another_library",
"elapsed_ms": 403,
"total_elapsed_ms": 403
}
queue for log store >>> {
"msg": "exited: run_app",
"timestamp": 1772951376.176055,
"group_id": "1a2433be-0a56-48af-8852-0f52a68f4284",
"count": 1,
"path": "run_app",
"elapsed_ms": 504,
"total_elapsed_ms": 504
}
See the collector module in the full documentation for how you can define your own collectors, and bundles.
API¶
logger Module¶
This module contains the Tremors Logger and Collector definition.
A Tremors logger can be used as a drop-in replacement for a standard
Logger. Collectors may be attached to a logger. Each time
a message is logged, the collectors run if they are enabled for the message
level. When a collector runs, it may update its state. A logger adds the
current state of all of its collectors to any LogRecord
that it produces.
A logger may also be used as a context manager. When used this way, loggers
maintain a hierarchy. Each logger has a parent logger, unless it is the root
logger. The root logger assigns a group_id that is shared
by all loggers in the hierarchy. Each logger has a path from
its root logger to itself. Hierarchies may be nested by explicitly creating
a new root logger within an existing hierarchy. The loggers in the nested
hierarchy will have a different group ID than the loggers in the containing
hierarchy. The paths in the nested hierarchy will also start from the new
root logger.
When a logger context is entered, or exited, a message is automatically logged at a specified level, and collectors will run if they are enabled for the message level. This allows us to define collectors that measure lifecycle information, such as the duration of a context, or how much memory was allocated at the beginning, and end of a context.
Each Tremors logger uses an underlying standard logger that may
be specified. The underlying logger can be configured in the normal
fashion. Underlying loggers may be shared by Tremors loggers. In fact,
it is common for all Tremors loggers to use the standard root logger. The
Tremors logger simply adds the states of its collectors to the records
produced by its standard logger. These augmented records can then be
processed by any Filter, Formatter,
or Handler that knows about the extra record attributes.
A collector may be bundled with a formatter that can extract the collector state from a record, and format it. This formatter can be used in a filter, formatter, or handler that has been configured for the underling logger. Tremors comes with many useful collector bundles. But you can also define custom collectors, and bundles.
- class tremors.logger.LogItem(logger: Logger | None, level: int, msg: object, args: tuple[object, ...] | Mapping[str, object] | None, exc_info: bool | tuple[type[BaseException], BaseException, TracebackType | None] | tuple[None, None, None] | BaseException | None, stack_info: bool, stacklevel: int | None, extra: Mapping[str, object] | None, record: logging.LogRecord | None)[source]¶
Bases:
NamedTupleThe properties of a logged message.
- class tremors.logger.Collector(id: uuid.UUID, name: str, level: int, inherit: bool, state: TState, collect: Callable[[TState, LogItem], TState])[source]¶
Bases:
NamedTuple,GenericThe collector specification.
- name¶
The name of the collector. When the collector runs for a logged message, the collector’s state is added to the
LogRecord, and may be retrieved using this name.- Type:
- inherit¶
If True, the collector will also be added to descendant loggers, provided its name does not conflict that of any of the other collectors on the descendant loggers.
- Type:
- state¶
The initial state of the collector. Each time the collector runs, it may update this state.
- Type:
TState
- collect¶
When the collector runs, this function is called with the current state, and the LogItem. It returns the new state.
- Type:
collections.abc.Callable[[TState, tremors.logger.LogItem], TState]
- class tremors.logger.Logger(*collectors: Collector[Any] | CollectorFactory[Any], name: str, logger_name: str | None = None, ctx_level: int = 20, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False)[source]¶
Bases:
LoggerAdapter[Logger]The Tremors logger.
- Parameters:
*collectors – Collectors or CollectorFactories. The collectors, or resultant collectors will be attached to the logger, and will be run for every logged message for which the collector is enabled.
name – The name of the logger. This name is logged in the default entered, and exited messages. This name is used to generate the path, but may be altered in the path to be unique in the hierarchy.
logger_name – The name of the underlying
Loggerthat will be used to log messages. If None, the root logger will be used.ctx_level – The level at which entered and exited messages will be logged.
enter_msg – If a non-empty string, this will be the logged entered message. If truthy, the default entered message will be logged. Otherwise, no entered message will be logged.
exit_msg – If a non-empty string, this will be the logged exited message. If truthy, the default exited message will be logged. Otherwise, no exited message will be logged.
is_root – If True, a root logger with no parent that starts a new hierarchy will be created.
- property name: str¶
The logger name.
This is not the name of the underlying Python logger. This is the name of the logger in the path if the logger has been entered. Otherwise, it is the
namethat the logger was initialized with.
- property path: tuple[str, ...] | None¶
The logger path.
The path is a sequence of logger names following the hierarchy of loggers from the root logger to this one. The final name in the path will be the name that the logger was initialized with if a logger with the same name has not been registered at that path index yet. Otherwise, an incremental number is appended to the initial name to get a unique name for that path index.
Example
If there are 2 loggers named “baz” at the third level of the hierarchy, their respecitve paths will be:
["foo", "bar", "baz"] ["foo", "bar", "baz2"]
- register_path(logger: Logger) tuple[str, ...][source]¶
A logger can register a path for itself with its parent.
This method should not be called directly. The logger will automatically register itself with its parent.
- collect(*identifiers: str | UUID, log_item: LogItem, extra_tremors: MutableMapping[str, object] | None = None) MutableMapping[str, object][source]¶
Run the specified collectors.
- Parameters:
*identifiers – Collector names, or IDs to run.
log_item – The LogItem to pass to collectors.
extra_tremors – The initial
EXTRA_KEYextras. This mapping is mutated, and the new collector states are added to it. If none, an empty dict will be used.
- Returns:
The mutated
extra_tremors, or a new dict with the collector states.
- log(level: int, msg: object, *args: object, exc_info: bool | tuple[type[BaseException], BaseException, TracebackType | None] | tuple[None, None, None] | BaseException | None = None, stack_info: bool = False, stacklevel: int = 1, extra: Mapping[str, object] | None = None, **_kwargs: object) None[source]¶
Run collectors, and log a message at the given level.
The arguments are interpreted as for
debug().This method mutates, or replaces
extra[EXTRA_KEY].
- tremors.logger.enrich_std_record(record: LogRecord) None[source]¶
Enrich a record not created by a tremors logger.
This function mutates
recordif the record doesn’t have the tremors attribute,EXTRA_KEY, i.e., a standard logger produced the record. The current logger’s collectors are run on a LogItem derived from the record’s attributes. Then the results are stored on the record in theEXTRA_KEYattribute, so the record may subsequently be processed by collector formatters.The function may be used in filters to capture, and treat standard logs, as if they were logged by the current logger.
If there is no current logger, the record won’t be modified.
decorator Module¶
This module contains decorators for using Tremors loggers.
The logged() decorator wraps a callable in a Tremors logger context,
and injects that logger into the callable.
- tremors.decorator.logged(fn_or_collector: Callable[P, TRet]) Callable[P, TRet][source]¶
- tremors.decorator.logged(fn_or_collector: Collector[Any] | CollectorFactory[Any], *collectors: Collector[Any] | CollectorFactory[Any], name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, TRet]], Callable[P, TRet]]
- tremors.decorator.logged(*, name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, TRet]], Callable[P, TRet]]
Inject a
Loggerinto a callable.The callable must have a keyword-only parameter
logger. If the value forloggeris the default value,from_logged, it is replaced by a logger with the specified collectors, name, and underlying logger. This logger is entered, and exited immediately before and after the callable runs. Otherwise, the supplied logger will be used in the call. A supplied logger will not be automatically entered, and exited.If
fn_or_collectoris a callable, e.g., the decorator was used like@logged, it is assumed that no collectors were specified, and the default collectors are used. Otherwise, it is assumed this is the first collector, e.g., the decorator was used like@logged(some_collector, ...).*collectorsis the rest of the collectors, with the first collector beingfn_or_collector.The rest of the arguments are passed to :class:~`tremors.logger.Logger`.
- tremors.decorator.async_logged(fn_or_collector: Callable[P, Coroutine[object, object, TRet]]) Callable[P, Coroutine[object, object, TRet]][source]¶
- tremors.decorator.async_logged(fn_or_collector: Collector[Any] | CollectorFactory[Any], *collectors: Collector[Any] | CollectorFactory[Any], name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, Coroutine[object, object, TRet]]], Callable[P, Coroutine[object, object, TRet]]]
- tremors.decorator.async_logged(*, name: str | None = None, logger_name: str | None = None, ctx_level: int = logging.INFO, enter_msg: str | bool = True, exit_msg: str | bool = True, is_root: bool = False) Callable[[Callable[P, Coroutine[object, object, TRet]]], Callable[P, Coroutine[object, object, TRet]]]
Inject a
Loggerinto a async callable.Apart for the async callable, the arguments are the same as for
logged().
- tremors.decorator.from_logged = <Logger root (WARNING)>¶
A sentinel logger value.
When used as the
loggerargument to alogged()callable, the callable is wrapped in aLoggercontext that is injected into the callable as theloggerargument.
collector Module¶
This module contains collector bundles and tools.
A collector bundle is an object that contains the definition of
the collector state, a factory function to create a collector, and a
formatter function that will extract, and format the collector state from
a LogRecord.
In addition to the bundles in this module, you can create custom bundles. A
collector is generic in the type of its state, which can be of an arbitrary
type. A collector factory can be defined to have arbitrary parameters that
depend on how the collector is initialized. A collector formatter has a
fixed signature as described in Formatter.
- tremors.collector.get_state(typ: type[T], name: str, record: LogRecord) T | None[source]¶
Get the collector state from a
LogRecord.The function can be used by custom collector formatters to get the collector state from a record.
- Parameters:
typ – The collector state class.
name – The name of the collector.
record – The LogRecord the may contain the collector state.
- Returns:
An object of type
typif the LogRecord contains such an object for the collector. Otherwise None.
- class tremors.collector.Formatter(*args, **kwargs)[source]¶
Bases:
ProtocolCallback proctocol for a collector formatter.
- __call__(record: LogRecord, *, name: str = '', fmt: object = '', default: str = '') str[source]¶
Extract and format a collector state.
- Parameters:
record – A
LogRecordthat may contain the collector state.name – The name of the collector.
fmt – A format specification. Implementations commonly expect this to adhere to the Format specification mini-language. Then
str(fmt)may be used to obtain the specification, sofmtdoes not have to be a string, but could be any object with an appropriate string representation.default – A value that may be returned if the formatter could not obtain, or format the state.
- Returns:
The state formatted as a string, or the default value.
- class tremors.collector.CollectorBundle(state: type[T], factory: U, formatter: Formatter)[source]¶
Bases:
NamedTuple,GenericA bundle to create, and work with a collector.
- class tremors.collector.IndentifierState(group_id: uuid.UUID, parent: tremors.logger.Logger | None, path: tuple[str, ...])[source]¶
Bases:
NamedTupleThe state of a collector created by
IdentifierFactory.
- class tremors.collector.IdentifierFactory(name: str = 'identifier', *, level: int = 20, inherit: bool = False)[source]¶
Bases:
CollectorFactory[IndentifierState|None]Create collectors to gather logger identifiers.
- create() Collector[IndentifierState | None][source]¶
Create a collector.
- tremors.collector.identifier_formatter(record: LogRecord, *, name: str = 'identifier', fmt: object = '[p:{process} t:{thread} g:{group_id}] {path}', default: str = '') str[source]¶
Extract an
IndentifierStatefrom a record, and format it.str(fmt)may contain any of the fields in this table. The formatted string is generated by replacing the fields with the values described in the table.Field
Value
process
f"p:{process} "whereprocessis the process ID if it’s available. Otherwise the empty string.thread
f"t:{thread} "wherethreadis the thread ID if it’s available. Otherwise the empty string.group_id
f"t:{group_id}"wheregroup_idis the group ID.path
The path represented as a POSIX-like path.
- Returns:
The formatted string if the record contains a state for the collector. Otherwise
default.
- tremors.collector.identifier = (<class 'tremors.collector.IndentifierState'>, <class 'tremors.collector.IdentifierFactory'>, <function identifier_formatter>)¶
The identifier collector bundle.
Examples
With
basicConfig()we configure the standard root logger with a stream handler that has a formatter that incorporates the record’sidentattribute in the message. We add the filter functionfltto the handler. The filter uses the formatter from the identifier collector bundle to extract the collector state from a record, and format it. Then the filter returns a modified record with the ident attribute set to the formatted state.Note
If we had added the filter to the logger instead of the handler, the filter would not be called for messages logged by descendant standard loggers. But by adding the filter to the handler, we ensure that it is called for all messages that the handler could emit.
import copy import logging from tremors import from_logged, logged from tremors.collector import identifier def flt(record): own_record = copy.copy(record) ident = identifier.formatter(own_record) own_record.ident = f"{ident} > " if ident else "" return own_record @logged(identifier.factory()) def parent(*, logger=from_logged): logger.info("foo") child() @logged(identifier.factory()) def child(*, logger=from_logged): logger.info("bar") logging.basicConfig( format="%(ident)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO ) logging.root.handlers[0].addFilter(flt) parent()
The logged messages contain the process ID, thread ID, group ID, and hierarchy as a path.
[p:... t:... g:...] parent > INFO:root:entered: parent [p:... t:... g:...] parent > INFO:root:foo [p:... t:... g:...] parent/child > INFO:root:entered: child [p:... t:... g:...] parent/child > INFO:root:bar [p:... t:... g:...] parent/child > INFO:root:exited: child [p:... t:... g:...] parent > INFO:root:exited: parent
We can expand on the previous example by throwing a function that uses standard logging into the mix.
child2callsother, which could be a function that you can’t change, and doesn’t use Tremors loggers. By default, standard loggers propagate their messages, so descendant messages will eventually be processed by the root logger’s handler with our filter. The collector formatter will not be able to extract the state from a descendant record, and will return the default value, the empty string. Thenrecord.identwill be the empty string, which will be interpolated into the formatted message.Note
Since the formatter expects the record to have an ident attribute, the filter must always add this attribute to the record.
@logged(identifier.factory()) def child(*, logger=from_logged) -> None: logger.info("bar") other() def other() -> None: logging.getLogger(__name__).info("baz") parent()
The functions that use Tremors loggers have entered and exited messages, and their messages contain identifier information, as before. The
otherfunction, which logs with a standard logger simply emits messages with empty ident sections.[p:... t:... g:...] parent > INFO:root:entered: parent [p:... t:... g:...] parent > INFO:root:foo [p:... t:... g:...] parent/child > INFO:root:entered: child [p:... t:... g:...] parent/child > INFO:root:bar INFO:__main__:baz [p:... t:... g:...] parent/child > INFO:root:exited: child [p:... t:... g:...] parent > INFO:root:exited: parent
- class tremors.collector.CounterState(count: int, step: int)[source]¶
Bases:
objectThe state of a collector created by the counter factory.
- class tremors.collector.CounterFactory(name: str = 'counter', *, level: int = 20, inherit: bool = False, initial: int = 0, step: int = 1)[source]¶
Bases:
CollectorFactory[CounterState]Create collectors to count logging calls.
- create() Collector[CounterState][source]¶
Create a collector.
- tremors.collector.counter_formatter(record: LogRecord, *, name: str = 'counter', fmt: object = '{count}', default: str = '0') str[source]¶
Extract a count from a record, and format it.
str(fmt)may contain the fieldcount. The formatted string is generated by replacing the field with the state’s count.- Returns:
The formatted string if the record contains a state for the collector. Otherwise
default.
- tremors.collector.counter = (<class 'tremors.collector.CounterState'>, <class 'tremors.collector.CounterFactory'>, <function counter_formatter>)¶
The counter collector bundle.
Examples
We configure standard logging in a similar manner to the examples in
identifier, except here we use counter collectors.import copy import logging from tremors import from_logged, logged from tremors.collector import counter def flt(record): own_record = copy.copy(record) count = counter.formatter(own_record, fmt="[{count}] ", default="") own_record.count = count return own_record @logged(counter.factory()) def parent(*, logger=from_logged): logger.info("foo") child() @logged(counter.factory()) def child(*, logger=from_logged): logger.info("bar") logging.basicConfig( format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO ) logging.root.handlers[0].addFilter(flt) parent()
The logged messages include the count, where the
parentandchildcounts are distinct from each other because each loggers uses its own counter collector.[1] INFO:root:entered: parent [2] INFO:root:foo [1] INFO:root:entered: child [2] INFO:root:bar [3] INFO:root:exited: child [3] INFO:root:exited: parent
Now let’s modify the previous example to have
parentandchilduse a shared counter by passing a collector instance that we get from the factory’screatemethod.shared_counter = counter.factory().create() @logged(shared_counter) def parent(*, logger=from_logged) -> None: logger.info("foo") child() @logged(shared_counter) def child(*, logger=from_logged) -> None: logger.info("bar") parent()
Here the count is shared between the two functions.
[1] INFO:root:entered: parent [2] INFO:root:foo [3] INFO:root:entered: child [4] INFO:root:bar [5] INFO:root:exited: child [6] INFO:root:exited: parent
- class tremors.collector.EnterCounterState(cache_key: str | None, step: int)[source]¶
Bases:
objectThe state of a collector created by the enter counter factory.
- class tremors.collector.EnterCounterFactory(name: str = 'enter_counter', *, cache_key: str | None = None, level: int = 20, inherit: bool = False, step: int = 1)[source]¶
Bases:
CollectorFactory[EnterCounterState]Create collectors to count when a logger is entered.
- create() Collector[EnterCounterState][source]¶
Create a collector.
- tremors.collector.enter_counter_formatter(record: LogRecord, *, name: str = 'enter_counter', fmt: object = '{enter_count}', default: str = '0') str[source]¶
Extract a count from a record, and format it.
str(fmt)may contain the fieldenter_count. The formatted string is generated by replacing the field with the state’s count.- Returns:
The formatted string if the record contains a state for the collector. Otherwise
default.
- tremors.collector.enter_counter = (<class 'tremors.collector.EnterCounterState'>, <class 'tremors.collector.EnterCounterFactory'>, <function enter_counter_formatter>)¶
The enter counter collector bundle.
Examples
We configure standard logging in a similar manner to the examples in
counter, except here we use enter counter collectors, which will only count entered logs.import copy import logging from tremors import from_logged, logged from tremors.collector import enter_counter def flt(record): own_record = copy.copy(record) count = enter_counter.formatter( own_record, fmt="[{enter_count}] ", default="" ) own_record.count = count return own_record @logged(enter_counter.factory()) def foo(run: int, *, logger=from_logged): logger.info("foo %d", run) @logged(enter_counter.factory()) def bar(run: int, *, logger=from_logged): logger.info("bar %d", run) logging.basicConfig( format="%(count)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO ) logging.root.handlers[0].addFilter(flt) foo(1) bar(1) foo(2) foo(3) bar(2)
The logged messages include the total number of times each function has been called.
[1] INFO:root:entered: foo [1] INFO:root:foo 1 [1] INFO:root:exited: foo [1] INFO:root:entered: bar [1] INFO:root:bar 1 [1] INFO:root:exited: bar [2] INFO:root:entered: foo [2] INFO:root:foo 2 [2] INFO:root:exited: foo [3] INFO:root:entered: foo [3] INFO:root:foo 3 [3] INFO:root:exited: foo [2] INFO:root:entered: bar [2] INFO:root:bar 2 [2] INFO:root:exited: bar
Let’s see what happens if a function doesn’t log the entered message.
@logged(enter_counter.factory(), enter_msg=False, exit_msg=False) def baz(run: int, *, logger=from_logged): logger.info("baz %d", run) foo(4) baz(1) foo(5) foo(6) baz(2)
The
foocounter keeps counting up from the previous example, but thebazcounter doesn’t increase at all because thebazlogger doesn’t log entered messages.[4] INFO:root:entered: foo [4] INFO:root:foo 4 [4] INFO:root:exited: foo [0] INFO:root:baz 1 [5] INFO:root:entered: foo [5] INFO:root:foo 5 [5] INFO:root:exited: foo [6] INFO:root:entered: foo [6] INFO:root:foo 6 [6] INFO:root:exited: foo [0] INFO:root:baz 2
- class tremors.collector.ElapsedState(t0: int | None, t: int | None)[source]¶
Bases:
objectThe state of a collector created by
ElapsedFactory.
- class tremors.collector.ElapsedFactory(name: str = 'elapsed', *, level: int = 20, inherit: bool = False)[source]¶
Bases:
CollectorFactory[ElapsedState]Create collectors to measure how much time has elapsed.
- create() Collector[ElapsedState][source]¶
Create a collector.
- tremors.collector.elapsed_formatter(record: LogRecord, *, name: str = 'elapsed', fmt: object = '{elapsed:.3f}', default: str = '') str[source]¶
Extract an
ElapsedStatefrom a record, and format it.str(fmt)may contain the fieldelapsed. The formatted string is generated by replacing the field with the state’s elapsed time in seconds.- Returns:
The formatted string if the record contains a state for the collector. Otherwise
default.
- tremors.collector.elapsed = (<class 'tremors.collector.ElapsedState'>, <class 'tremors.collector.ElapsedFactory'>, <function elapsed_formatter>)¶
The elapsed collector bundle.
Examples
We configure standard logging in a similar manner to the previous examples
identifier, except here we use elapsed collectors, and identifier collectors with a custom format to only show the path.import copy import logging import time from tremors import from_logged, logged from tremors.collector import elapsed, identifier def flt(record): own_record = copy.copy(record) ident_msg = identifier.formatter(own_record, fmt="{path}") elapsed_msg = elapsed.formatter(own_record) own_record.ident = f"{ident_msg} > " if ident_msg else "" own_record.elapsed = f"{elapsed_msg} " if elapsed_msg else "" return own_record @logged(identifier.factory(), elapsed.factory()) def parent(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) child() @logged(identifier.factory(), elapsed.factory()) def child(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) logging.basicConfig( format="%(elapsed)s%(ident)s%(levelname)s:%(name)s:%(message)s", level=logging.INFO, ) logging.root.handlers[0].addFilter(flt) parent()
The logged messages include the elapsed time since each elapsed collector started.
0.000 parent > INFO:root:entered: parent 0.000 parent > INFO:root:sleeping for 1s... 0.000 parent/child > INFO:root:entered: child 0.000 parent/child > INFO:root:sleeping for 1s... 1.001 parent/child > INFO:root:exited: child 2.001 parent > INFO:root:exited: parent
We can try the same thing again, only this time with a shared collectors. The parent’s identifier, and elapsed collectors will be inherited by its descendants.
@logged(identifier.factory(inherit=True), elapsed.factory(inherit=True)) def parent(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) child() @logged def child(*, logger=from_logged): logger.info("sleeping for 1s...") time.sleep(1) grandchild() @logged def grandchild(*, logger=from_logged): logger.info("sleeping for 0.5s...") time.sleep(0.5) parent()
Notice that this time, the
childelapsed time starts at 1s instead of resetting to 0s.0.000 parent > INFO:root:entered: parent 0.000 parent > INFO:root:sleeping for 1s... 1.000 parent/child > INFO:root:entered: child 1.001 parent/child > INFO:root:sleeping for 1s... 2.001 parent/child/grandchild > INFO:root:entered: grandchild 2.001 parent/child/grandchild > INFO:root:sleeping for 0.5s... 2.502 parent/child/grandchild > INFO:root:exited: grandchild 2.502 parent/child > INFO:root:exited: child 2.502 parent > INFO:root:exited: parent