Coverage for bzfs_main/loggers.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Logging helpers that build default and syslog-enabled loggers; Centralizes logger setup so that all bzfs tools share 

16uniform formatting and configuration. 

17 

18Each bzfs.Job has its own separate Logger object such that multiple Jobs can run concurrently in the same Python process 

19without interfering with each other's logging semantics. This is achieved by passing a unique ``logger_name_suffix`` per job 

20to ``get_logger()`` so each job receives an isolated Logger instance. Callers are responsible for closing any loggers they 

21own via ``reset_logger()``. 

22""" 

23from __future__ import ( 

24 annotations, 

25) 

26import argparse 

27import contextlib 

28import logging 

29import sys 

30from datetime import ( 

31 datetime, 

32) 

33from logging import ( 

34 Logger, 

35) 

36from typing import ( 

37 TYPE_CHECKING, 

38 Any, 

39 Final, 

40) 

41 

42from bzfs_main.utils import ( 

43 LOG_STDERR, 

44 LOG_STDOUT, 

45 LOG_TRACE, 

46) 

47 

48if TYPE_CHECKING: # pragma: no cover - for type hints only 

49 from bzfs_main.configuration import ( 

50 LogParams, 

51 ) 

52 

53 

54def _get_logger_name() -> str: 

55 """Returns the canonical logger name used throughout bzfs.""" 

56 return "bzfs_main.bzfs" 

57 

58 

59def _resolve_logger_name(logger_name_suffix: str) -> str: 

60 """Returns the logger name for the given optional logger suffix. 

61 

62 Each bzfs.Job instance gets a distinct Logger name and hence uses a separate private (thread-safe) logging.Logger object. 

63 """ 

64 logger_name: str = _get_logger_name() 

65 return logger_name + "." + logger_name_suffix if logger_name_suffix else logger_name 

66 

67 

68def reset_logger(log: Logger) -> None: 

69 """Removes and closes logging handlers (and closes their files) and resets logger to default state.""" 

70 for handler in log.handlers.copy(): 

71 log.removeHandler(handler) 

72 with contextlib.suppress(BrokenPipeError): 

73 handler.flush() 

74 handler.close() 

75 for _filter in log.filters.copy(): 

76 log.removeFilter(_filter) 

77 log.setLevel(logging.NOTSET) 

78 log.propagate = True 

79 

80 

81def get_logger( 

82 log_params: LogParams, args: argparse.Namespace, log: Logger | None = None, logger_name_suffix: str = "" 

83) -> Logger: 

84 """Returns a logger configured from CLI arguments or an optional base logger.""" 

85 if log is not None: 

86 assert isinstance(log, Logger) 

87 return log # use third party provided logger object 

88 return _get_default_logger(log_params, args, logger_name_suffix=logger_name_suffix) 

89 

90 

91def _get_default_logger(log_params: LogParams, args: argparse.Namespace, logger_name_suffix: str = "") -> Logger: 

92 """Creates the default logger with stream, file and optional syslog handlers.""" 

93 logger_name: str = _resolve_logger_name(logger_name_suffix) 

94 log = Logger(logger_name) # noqa: LOG001 do not register logger with Logger.manager to avoid potential memory leak 

95 log.setLevel(log_params.log_level) 

96 log.propagate = False # don't propagate log messages up to the root logger to avoid emitting duplicate messages 

97 

98 handler: logging.Handler = logging.StreamHandler(stream=sys.stdout) 

99 handler.setFormatter(get_default_log_formatter(log_params=log_params)) 

100 handler.setLevel(log_params.log_level) 

101 log.addHandler(handler) 

102 

103 handler = logging.FileHandler(log_params.log_file, encoding="utf-8") 

104 handler.setFormatter(get_default_log_formatter()) 

105 handler.setLevel(log_params.log_level) 

106 log.addHandler(handler) 

107 

108 address = args.log_syslog_address 

109 if address: # optionally, also log to local or remote syslog 

110 from logging import handlers # lazy import for startup perf 

111 

112 address, socktype = _get_syslog_address(address, args.log_syslog_socktype) 

113 log_syslog_prefix = str(args.log_syslog_prefix).strip().replace("%", "") # sanitize 

114 handler = handlers.SysLogHandler(address=address, facility=args.log_syslog_facility, socktype=socktype) 

115 handler.setFormatter(get_default_log_formatter(prefix=log_syslog_prefix + " ")) 

116 handler.setLevel(args.log_syslog_level) 

117 log.addHandler(handler) 

118 if handler.level < log.getEffectiveLevel(): 

119 log_level_name: str = logging.getLevelName(log.getEffectiveLevel()) 

120 log.warning( 

121 "%s", 

122 f"No messages with priority lower than {log_level_name} will be sent to syslog because syslog " 

123 f"log level {args.log_syslog_level} is lower than overall log level {log_level_name}.", 

124 ) 

125 return log 

126 

127 

128LOG_LEVEL_PREFIXES: Final[dict[int, str]] = { 

129 logging.CRITICAL: "[C] CRITICAL:", 

130 logging.ERROR: "[E] ERROR:", 

131 logging.WARNING: "[W]", 

132 logging.INFO: "[I]", 

133 logging.DEBUG: "[D]", 

134 LOG_TRACE: "[T]", 

135} 

136 

137 

138def get_default_log_formatter(prefix: str = "", log_params: LogParams | None = None) -> logging.Formatter: 

139 """Returns a formatter for bzfs logs with optional prefix and column padding.""" 

140 level_prefixes_: dict[int, str] = LOG_LEVEL_PREFIXES.copy() 

141 log_stderr_: int = LOG_STDERR 

142 log_stdout_: int = LOG_STDOUT 

143 terminal_cols: list[int | None] = [0 if log_params is None else None] # 'None' indicates "configure value later" 

144 

145 class DefaultLogFormatter(logging.Formatter): 

146 """Formatter adding timestamps and padding for progress output.""" 

147 

148 def format(self, record: logging.LogRecord) -> str: 

149 """Formats the given record, adding timestamp and level prefix and padding.""" 

150 levelno: int = record.levelno 

151 if levelno != log_stderr_ and levelno != log_stdout_: # emit stdout and stderr "as-is" (no formatting) 

152 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15 

153 ts_level: str = f"{timestamp} {level_prefixes_.get(levelno, '')} " 

154 msg: str = record.msg 

155 i: int = msg.find("%s") 

156 msg = ts_level + msg 

157 if i >= 1: 

158 i += len(ts_level) 

159 msg = msg[0:i].ljust(54) + msg[i:] # right-pad msg if record.msg contains "%s" unless at start 

160 if record.exc_info or record.exc_text or record.stack_info: 

161 record.msg = msg 

162 msg = super().format(record) 

163 elif record.args: 

164 msg = msg % record.args 

165 else: 

166 msg = super().format(record) 

167 

168 msg = prefix + msg 

169 cols: int | None = terminal_cols[0] 

170 if cols is None: 

171 cols = self.ljust_cols() 

172 msg = msg.ljust(cols) # w/ progress line, "overwrite" trailing chars of previous msg with spaces 

173 return msg 

174 

175 @staticmethod 

176 def ljust_cols() -> int: 

177 """Lazily determines padding width from ProgressReporter settings.""" 

178 # lock-free yet thread-safe late configuration-based init for prettier ProgressReporter output 

179 # log_params.params and available_programs are not fully initialized yet before detect_available_programs() ends 

180 cols: int = 0 

181 assert log_params is not None 

182 p = log_params.params 

183 if p is not None and "local" in p.available_programs: 

184 if "pv" in p.available_programs["local"]: 

185 cols = p.terminal_columns 

186 assert cols is not None 

187 terminal_cols[0] = cols # finally, resolve to use this specific value henceforth 

188 return cols 

189 

190 return DefaultLogFormatter() 

191 

192 

193def get_simple_logger(program: str, logger_name_suffix: str = "") -> Logger: 

194 """Returns a minimal logger for simple tools.""" 

195 

196 level_prefixes_: dict[int, str] = LOG_LEVEL_PREFIXES.copy() 

197 logger_name = program + "." + logger_name_suffix if logger_name_suffix else program 

198 

199 class LevelFormatter(logging.Formatter): 

200 """Injects level prefix and program name into log records.""" 

201 

202 def format(self, record: logging.LogRecord) -> str: 

203 """Attaches extra fields before delegating to base formatter.""" 

204 record.level_prefix = level_prefixes_.get(record.levelno, "") 

205 record.program = program 

206 return super().format(record) 

207 

208 log = Logger(logger_name) # noqa: LOG001 do not register logger with Logger.manager to avoid potential memory leak 

209 log.setLevel(logging.INFO) 

210 log.propagate = False 

211 handler = logging.StreamHandler() 

212 handler.setFormatter( 

213 LevelFormatter(fmt="%(asctime)s %(level_prefix)s [%(program)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") 

214 ) 

215 log.addHandler(handler) 

216 return log 

217 

218 

219def set_logging_runtime_defaults() -> None: 

220 """Applies process-wide logging runtime defaults for production use.""" 

221 logging.addLevelName(LOG_TRACE, "TRACE") 

222 logging.addLevelName(LOG_STDERR, "STDERR") 

223 logging.addLevelName(LOG_STDOUT, "STDOUT") 

224 # perf: tell logging framework not to gather unnecessary expensive info for each log record 

225 logging.logProcesses = False 

226 logging.logThreads = False 

227 logging.logMultiprocessing = False 

228 logging.raiseExceptions = False # avoid noisy tracebacks from logging handler errors like BrokenPipeError in production 

229 

230 

231def _get_syslog_address(address: str, log_syslog_socktype: str) -> tuple[str | tuple[str, int], Any]: 

232 """Normalizes syslog address to tuple form and returns socket type.""" 

233 import socket # lazy import for startup perf 

234 

235 address = address.strip() 

236 socktype: socket.SocketKind | None = None 

237 if ":" in address: 

238 host, port_str = address.rsplit(":", 1) 

239 addr = (host.strip(), int(port_str.strip())) 

240 scktype: socket.SocketKind = socket.SOCK_DGRAM if log_syslog_socktype == "UDP" else socket.SOCK_STREAM # for TCP 

241 return addr, scktype 

242 return address, socktype