Coverage for bzfs_main/loggers.py: 100%
229 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Logging helpers that build default and syslog-enabled loggers; Centralizes logger setup so that all bzfs tools share
16uniform formatting and configuration."""
18from __future__ import annotations
19import argparse
20import logging
21import logging.config
22import logging.handlers
23import os
24import re
25import socket
26import sys
27from datetime import datetime
28from logging import Logger
29from pathlib import Path
30from typing import (
31 IO,
32 TYPE_CHECKING,
33 Any,
34)
36from bzfs_main.utils import (
37 LOG_STDERR,
38 LOG_STDOUT,
39 LOG_TRACE,
40 PROG_NAME,
41 die,
42 open_nofollow,
43)
45if TYPE_CHECKING: # pragma: no cover - for type hints only
46 from bzfs_main.configuration import LogParams
49def get_logger_name() -> str:
50 """Returns the canonical logger name used throughout bzfs."""
51 return "bzfs_main.bzfs"
54def get_logger_subname() -> str:
55 """Returns the logger name for use by --log-config-file."""
56 return get_logger_name() + ".sub"
59def reset_logger() -> None:
60 """Removes and closes logging handlers (and closes their files) and resets loggers to default state."""
61 for log in [logging.getLogger(get_logger_name()), logging.getLogger(get_logger_subname())]:
62 for handler in log.handlers.copy():
63 log.removeHandler(handler)
64 handler.flush()
65 handler.close()
66 for _filter in log.filters.copy():
67 log.removeFilter(_filter)
68 log.setLevel(logging.NOTSET)
69 log.propagate = True
72def get_logger(log_params: LogParams, args: argparse.Namespace, log: Logger | None = None) -> Logger:
73 """Returns a logger configured from CLI arguments or an optional base logger."""
74 _add_trace_loglevel()
75 logging.addLevelName(LOG_STDERR, "STDERR")
76 logging.addLevelName(LOG_STDOUT, "STDOUT")
78 if log is not None:
79 assert isinstance(log, Logger)
80 return log # use third party provided logger object
81 elif args.log_config_file:
82 clog = _get_dict_config_logger(log_params, args) # use logger defined in config file, and afterwards ...
83 # ... add our own handlers unless matching handlers are already present
84 default_log = _get_default_logger(log_params, args)
85 return clog if args.log_config_file else default_log
88def _get_default_logger(log_params: LogParams, args: argparse.Namespace) -> Logger:
89 """Creates the default logger with stream, file and optional syslog handlers."""
90 sublog = logging.getLogger(get_logger_subname())
91 log = logging.getLogger(get_logger_name())
92 log.setLevel(log_params.log_level)
93 log.propagate = False # don't propagate log messages up to the root logger to avoid emitting duplicate messages
95 if not any(isinstance(h, logging.StreamHandler) and h.stream in [sys.stdout, sys.stderr] for h in sublog.handlers):
96 handler: logging.Handler = logging.StreamHandler(stream=sys.stdout)
97 handler.setFormatter(get_default_log_formatter(log_params=log_params))
98 handler.setLevel(log_params.log_level)
99 log.addHandler(handler)
101 abs_log_file: str = os.path.abspath(log_params.log_file)
102 if not any(isinstance(h, logging.FileHandler) and h.baseFilename == abs_log_file for h in sublog.handlers):
103 handler = logging.FileHandler(log_params.log_file, encoding="utf-8")
104 handler.setFormatter(get_default_log_formatter())
105 handler.setLevel(log_params.log_level)
106 log.addHandler(handler)
108 address = args.log_syslog_address
109 if address: # optionally, also log to local or remote syslog
110 address, socktype = _get_syslog_address(address, args.log_syslog_socktype)
111 log_syslog_prefix = str(args.log_syslog_prefix).strip().replace("%", "") # sanitize
112 handler = logging.handlers.SysLogHandler(address=address, facility=args.log_syslog_facility, socktype=socktype)
113 handler.setFormatter(get_default_log_formatter(prefix=log_syslog_prefix + " "))
114 handler.setLevel(args.log_syslog_level)
115 log.addHandler(handler)
116 if handler.level < sublog.getEffectiveLevel():
117 log_level_name: str = logging.getLevelName(sublog.getEffectiveLevel())
118 log.warning(
119 "%s",
120 f"No messages with priority lower than {log_level_name} will be sent to syslog because syslog "
121 f"log level {args.log_syslog_level} is lower than overall log level {log_level_name}.",
122 )
124 # perf: tell logging framework not to gather unnecessary expensive info for each log record
125 logging.logProcesses = False
126 logging.logThreads = False
127 logging.logMultiprocessing = False
128 return log
131LOG_LEVEL_PREFIXES: dict[int, str] = {
132 logging.CRITICAL: "[C] CRITICAL:",
133 logging.ERROR: "[E] ERROR:",
134 logging.WARNING: "[W]",
135 logging.INFO: "[I]",
136 logging.DEBUG: "[D]",
137 LOG_TRACE: "[T]",
138}
141def get_default_log_formatter(prefix: str = "", log_params: LogParams | None = None) -> logging.Formatter:
142 """Returns a formatter for bzfs logs with optional prefix and column padding."""
143 level_prefixes_: dict[int, str] = LOG_LEVEL_PREFIXES
144 log_stderr_: int = LOG_STDERR
145 log_stdout_: int = LOG_STDOUT
146 terminal_cols: list[int | None] = [0 if log_params is None else None] # 'None' indicates "configure value later"
148 class DefaultLogFormatter(logging.Formatter):
149 """Formatter adding timestamps and padding for progress output."""
151 def format(self, record: logging.LogRecord) -> str:
152 """Formats the given record, adding timestamp and level prefix and padding."""
153 levelno: int = record.levelno
154 if levelno != log_stderr_ and levelno != log_stdout_: # emit stdout and stderr "as-is" (no formatting)
155 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15
156 ts_level: str = f"{timestamp} {level_prefixes_.get(levelno, '')} "
157 msg: str = record.msg
158 i: int = msg.find("%s")
159 msg = ts_level + msg
160 if i >= 1:
161 i += len(ts_level)
162 msg = msg[0:i].ljust(54) + msg[i:] # right-pad msg if record.msg contains "%s" unless at start
163 if record.exc_info or record.exc_text or record.stack_info:
164 record.msg = msg
165 msg = super().format(record)
166 elif record.args:
167 msg = msg % record.args
168 else:
169 msg = super().format(record)
171 msg = prefix + msg
172 cols: int | None = terminal_cols[0]
173 if cols is None:
174 cols = self.ljust_cols()
175 msg = msg.ljust(cols) # w/ progress line, "overwrite" trailing chars of previous msg with spaces
176 return msg
178 @staticmethod
179 def ljust_cols() -> int:
180 """Lazily determines padding width from ProgressReporter settings."""
181 # lock-free yet thread-safe late configuration-based init for prettier ProgressReporter output
182 # log_params.params and available_programs are not fully initialized yet before detect_available_programs() ends
183 cols: int = 0
184 assert log_params is not None
185 p = log_params.params
186 if p is not None and "local" in p.available_programs:
187 if "pv" in p.available_programs["local"]:
188 cols = p.terminal_columns
189 assert cols is not None
190 terminal_cols[0] = cols # finally, resolve to use this specific value henceforth
191 return cols
193 return DefaultLogFormatter()
196def get_simple_logger(program: str) -> Logger:
197 """Returns a minimal logger for simple tools."""
199 class LevelFormatter(logging.Formatter):
200 """Injects level prefix and program name into log records."""
202 def format(self, record: logging.LogRecord) -> str:
203 """Attaches extra fields before delegating to base formatter."""
204 record.level_prefix = LOG_LEVEL_PREFIXES.get(record.levelno, "")
205 record.program = program
206 return super().format(record)
208 _add_trace_loglevel()
209 log = logging.getLogger(program)
210 log.setLevel(logging.INFO)
211 log.propagate = False
212 if not any(isinstance(h, logging.StreamHandler) for h in log.handlers):
213 handler = logging.StreamHandler()
214 handler.setFormatter(
215 LevelFormatter(fmt="%(asctime)s %(level_prefix)s [%(program)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
216 )
217 log.addHandler(handler)
218 return log
221def _add_trace_loglevel() -> None:
222 """Registers a custom TRACE logging level with the standard python logging framework."""
223 logging.addLevelName(LOG_TRACE, "TRACE")
226def _get_syslog_address(address: str, log_syslog_socktype: str) -> tuple[str | tuple[str, int], socket.SocketKind | None]:
227 """Normalizes syslog address to tuple form and returns socket type."""
228 address = address.strip()
229 socktype: socket.SocketKind | None = None
230 if ":" in address:
231 host, port_str = address.rsplit(":", 1)
232 addr = (host.strip(), int(port_str.strip()))
233 scktype: socket.SocketKind = (
234 socket.SOCK_DGRAM
235 if log_syslog_socktype == "UDP"
236 else socket.SOCK_STREAM # for TCP # pytype: disable=annotation-type-mismatch
237 )
238 return addr, scktype
239 return address, socktype
242def _remove_json_comments(config_str: str) -> str: # not standard but practical
243 """Strips line and end-of-line comments from a JSON string."""
244 lines: list[str] = []
245 for line in config_str.splitlines():
246 stripped: str = line.strip()
247 if stripped.startswith("#"):
248 line = "" # replace comment line with empty line to preserve line numbering
249 elif stripped.endswith("#"):
250 i = line.rfind("#", 0, line.rindex("#"))
251 if i >= 0:
252 line = line[0:i] # strip line-ending comment
253 lines.append(line)
254 return "\n".join(lines)
257def _get_dict_config_logger(log_params: LogParams, args: argparse.Namespace) -> Logger:
258 """Creates a logger from a JSON config file with variable substitution."""
259 import json
261 prefix: str = PROG_NAME + "."
262 log_config_vars: dict[str, str] = {
263 prefix + "sub.logger": get_logger_subname(),
264 prefix + "get_default_log_formatter": __name__ + ".get_default_log_formatter",
265 prefix + "log_level": log_params.log_level,
266 prefix + "log_dir": log_params.log_dir,
267 prefix + "log_file": os.path.basename(log_params.log_file),
268 prefix + "timestamp": log_params.timestamp,
269 prefix + "dryrun": "dryrun" if args.dryrun else "",
270 }
271 log_config_vars.update(log_params.log_config_vars) # merge variables passed into CLI with convenience variables
273 log_config_file_str: str = log_params.log_config_file
274 if log_config_file_str.startswith("+"):
275 path: str = log_config_file_str[1:]
276 basename_stem: str = Path(path).stem # stem is basename without file extension ("bzfs_log_config")
277 if not ("bzfs_log_config" in basename_stem and os.path.basename(path).endswith(".json")):
278 die(f"--log-config-file: basename must contain 'bzfs_log_config' and end with '.json': {path}")
279 with open_nofollow(path, "r", encoding="utf-8") as fd:
280 log_config_file_str = fd.read()
282 def substitute_log_config_vars(config_str: str, log_config_variables: dict[str, str]) -> str:
283 """Substitutes ${name[:default]} placeholders within JSON with values from log_config_variables."""
285 def substitute_fn(match: re.Match) -> str:
286 """Returns JSON replacement for variable placeholder."""
287 varname: str = match.group(1)
288 error_msg: str | None = _validate_log_config_variable_name(varname)
289 if error_msg:
290 raise ValueError(error_msg)
291 replacement: str | None = log_config_variables.get(varname)
292 if not replacement:
293 default: str | None = match.group(3)
294 if default is None:
295 raise ValueError("Missing default value in JSON for empty log config variable: ${" + varname + "}")
296 replacement = default
297 replacement = json.dumps(replacement) # JSON escape special chars such as newlines, quotes, etc
298 assert len(replacement) >= 2
299 assert replacement.startswith('"')
300 assert replacement.endswith('"')
301 return replacement[1:-1] # strip surrounding quotes added by dumps()
303 pattern = re.compile(r"\$\{([^}:]*?)(:([^}]*))?}") # Any char except } and :, followed by optional default part
304 return pattern.sub(substitute_fn, config_str)
306 log_config_file_str = _remove_json_comments(log_config_file_str)
307 if not log_config_file_str.strip().startswith("{"):
308 log_config_file_str = "{\n" + log_config_file_str # lenient JSON parsing
309 if not log_config_file_str.strip().endswith("}"):
310 log_config_file_str = log_config_file_str + "\n}" # lenient JSON parsing
311 log_config_file_str = substitute_log_config_vars(log_config_file_str, log_config_vars)
312 # if args is not None and args.verbose >= 2:
313 # print("[T] Substituted log_config_file_str:\n" + log_config_file_str, flush=True)
314 log_config_dict: dict = json.loads(log_config_file_str)
315 _validate_log_config_dict(log_config_dict)
316 logging.config.dictConfig(log_config_dict)
317 return logging.getLogger(get_logger_subname())
320def validate_log_config_variable(var: str) -> str | None:
321 """Returns error message if NAME:VALUE pair is malformed else ``None``."""
322 if not var.strip():
323 return "Invalid log config NAME:VALUE variable. Variable must not be empty: " + var
324 if ":" not in var:
325 return "Invalid log config NAME:VALUE variable. Variable is missing a colon character: " + var
326 return _validate_log_config_variable_name(var[0 : var.index(":")])
329def _validate_log_config_variable_name(name: str) -> str | None:
330 """Validates log config variable name and return error message if invalid."""
331 if not name:
332 return "Invalid log config variable name. Name must not be empty: " + name
333 bad_chars: str = "${} " + '"' + "'"
334 if any(char in bad_chars for char in name):
335 return f"Invalid log config variable name. Name must not contain forbidden {bad_chars} characters: " + name
336 if any(char.isspace() for char in name):
337 return "Invalid log config variable name. Name must not contain whitespace: " + name
338 return None
341def _validate_log_config_dict(config: dict) -> None:
342 """Recursively scans the logging configuration dictionary to ensure that any instantiated objects via the '()' key are on
343 an approved whitelist; This prevents arbitrary code execution from a malicious config file."""
344 whitelist: set[str] = {
345 "logging.StreamHandler",
346 "logging.FileHandler",
347 "logging.handlers.SysLogHandler",
348 "socket.SOCK_DGRAM",
349 "socket.SOCK_STREAM",
350 "sys.stdout",
351 "sys.stderr",
352 __name__ + ".get_default_log_formatter",
353 }
354 if isinstance(config, dict):
355 for key, value in config.items():
356 if key == "()" and value not in whitelist:
357 die(f"--log-config-file: Disallowed callable '{value}'. For security, only specific classes are permitted.")
358 _validate_log_config_dict(value)
359 elif isinstance(config, list):
360 for item in config:
361 _validate_log_config_dict(item)
364#############################################################################
365class Tee:
366 """File-like object that duplicates writes to multiple streams."""
368 def __init__(self, *files: IO[Any]) -> None:
369 self.files = files
371 def write(self, obj: str) -> None:
372 """Write ``obj`` to all target files and flush immediately."""
373 for file in self.files:
374 file.write(obj)
375 file.flush()
377 def flush(self) -> None:
378 """Flush all target streams."""
379 for file in self.files:
380 file.flush()
382 def fileno(self) -> int:
383 """Return file descriptor of the first target stream."""
384 return self.files[0].fileno()