Coverage for bzfs_main/loggers.py: 100%

229 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Logging helpers that build default and syslog-enabled loggers; Centralizes logger setup so that all bzfs tools share 

16uniform formatting and configuration.""" 

17 

18from __future__ import annotations 

19import argparse 

20import logging 

21import logging.config 

22import logging.handlers 

23import os 

24import re 

25import socket 

26import sys 

27from datetime import datetime 

28from logging import Logger 

29from pathlib import Path 

30from typing import ( 

31 IO, 

32 TYPE_CHECKING, 

33 Any, 

34) 

35 

36from bzfs_main.utils import ( 

37 LOG_STDERR, 

38 LOG_STDOUT, 

39 LOG_TRACE, 

40 PROG_NAME, 

41 die, 

42 open_nofollow, 

43) 

44 

45if TYPE_CHECKING: # pragma: no cover - for type hints only 

46 from bzfs_main.configuration import LogParams 

47 

48 

49def get_logger_name() -> str: 

50 """Returns the canonical logger name used throughout bzfs.""" 

51 return "bzfs_main.bzfs" 

52 

53 

54def get_logger_subname() -> str: 

55 """Returns the logger name for use by --log-config-file.""" 

56 return get_logger_name() + ".sub" 

57 

58 

59def reset_logger() -> None: 

60 """Removes and closes logging handlers (and closes their files) and resets loggers to default state.""" 

61 for log in [logging.getLogger(get_logger_name()), logging.getLogger(get_logger_subname())]: 

62 for handler in log.handlers.copy(): 

63 log.removeHandler(handler) 

64 handler.flush() 

65 handler.close() 

66 for _filter in log.filters.copy(): 

67 log.removeFilter(_filter) 

68 log.setLevel(logging.NOTSET) 

69 log.propagate = True 

70 

71 

72def get_logger(log_params: LogParams, args: argparse.Namespace, log: Logger | None = None) -> Logger: 

73 """Returns a logger configured from CLI arguments or an optional base logger.""" 

74 _add_trace_loglevel() 

75 logging.addLevelName(LOG_STDERR, "STDERR") 

76 logging.addLevelName(LOG_STDOUT, "STDOUT") 

77 

78 if log is not None: 

79 assert isinstance(log, Logger) 

80 return log # use third party provided logger object 

81 elif args.log_config_file: 

82 clog = _get_dict_config_logger(log_params, args) # use logger defined in config file, and afterwards ... 

83 # ... add our own handlers unless matching handlers are already present 

84 default_log = _get_default_logger(log_params, args) 

85 return clog if args.log_config_file else default_log 

86 

87 

88def _get_default_logger(log_params: LogParams, args: argparse.Namespace) -> Logger: 

89 """Creates the default logger with stream, file and optional syslog handlers.""" 

90 sublog = logging.getLogger(get_logger_subname()) 

91 log = logging.getLogger(get_logger_name()) 

92 log.setLevel(log_params.log_level) 

93 log.propagate = False # don't propagate log messages up to the root logger to avoid emitting duplicate messages 

94 

95 if not any(isinstance(h, logging.StreamHandler) and h.stream in [sys.stdout, sys.stderr] for h in sublog.handlers): 

96 handler: logging.Handler = logging.StreamHandler(stream=sys.stdout) 

97 handler.setFormatter(get_default_log_formatter(log_params=log_params)) 

98 handler.setLevel(log_params.log_level) 

99 log.addHandler(handler) 

100 

101 abs_log_file: str = os.path.abspath(log_params.log_file) 

102 if not any(isinstance(h, logging.FileHandler) and h.baseFilename == abs_log_file for h in sublog.handlers): 

103 handler = logging.FileHandler(log_params.log_file, encoding="utf-8") 

104 handler.setFormatter(get_default_log_formatter()) 

105 handler.setLevel(log_params.log_level) 

106 log.addHandler(handler) 

107 

108 address = args.log_syslog_address 

109 if address: # optionally, also log to local or remote syslog 

110 address, socktype = _get_syslog_address(address, args.log_syslog_socktype) 

111 log_syslog_prefix = str(args.log_syslog_prefix).strip().replace("%", "") # sanitize 

112 handler = logging.handlers.SysLogHandler(address=address, facility=args.log_syslog_facility, socktype=socktype) 

113 handler.setFormatter(get_default_log_formatter(prefix=log_syslog_prefix + " ")) 

114 handler.setLevel(args.log_syslog_level) 

115 log.addHandler(handler) 

116 if handler.level < sublog.getEffectiveLevel(): 

117 log_level_name: str = logging.getLevelName(sublog.getEffectiveLevel()) 

118 log.warning( 

119 "%s", 

120 f"No messages with priority lower than {log_level_name} will be sent to syslog because syslog " 

121 f"log level {args.log_syslog_level} is lower than overall log level {log_level_name}.", 

122 ) 

123 

124 # perf: tell logging framework not to gather unnecessary expensive info for each log record 

125 logging.logProcesses = False 

126 logging.logThreads = False 

127 logging.logMultiprocessing = False 

128 return log 

129 

130 

131LOG_LEVEL_PREFIXES: dict[int, str] = { 

132 logging.CRITICAL: "[C] CRITICAL:", 

133 logging.ERROR: "[E] ERROR:", 

134 logging.WARNING: "[W]", 

135 logging.INFO: "[I]", 

136 logging.DEBUG: "[D]", 

137 LOG_TRACE: "[T]", 

138} 

139 

140 

141def get_default_log_formatter(prefix: str = "", log_params: LogParams | None = None) -> logging.Formatter: 

142 """Returns a formatter for bzfs logs with optional prefix and column padding.""" 

143 level_prefixes_: dict[int, str] = LOG_LEVEL_PREFIXES 

144 log_stderr_: int = LOG_STDERR 

145 log_stdout_: int = LOG_STDOUT 

146 terminal_cols: list[int | None] = [0 if log_params is None else None] # 'None' indicates "configure value later" 

147 

148 class DefaultLogFormatter(logging.Formatter): 

149 """Formatter adding timestamps and padding for progress output.""" 

150 

151 def format(self, record: logging.LogRecord) -> str: 

152 """Formats the given record, adding timestamp and level prefix and padding.""" 

153 levelno: int = record.levelno 

154 if levelno != log_stderr_ and levelno != log_stdout_: # emit stdout and stderr "as-is" (no formatting) 

155 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15 

156 ts_level: str = f"{timestamp} {level_prefixes_.get(levelno, '')} " 

157 msg: str = record.msg 

158 i: int = msg.find("%s") 

159 msg = ts_level + msg 

160 if i >= 1: 

161 i += len(ts_level) 

162 msg = msg[0:i].ljust(54) + msg[i:] # right-pad msg if record.msg contains "%s" unless at start 

163 if record.exc_info or record.exc_text or record.stack_info: 

164 record.msg = msg 

165 msg = super().format(record) 

166 elif record.args: 

167 msg = msg % record.args 

168 else: 

169 msg = super().format(record) 

170 

171 msg = prefix + msg 

172 cols: int | None = terminal_cols[0] 

173 if cols is None: 

174 cols = self.ljust_cols() 

175 msg = msg.ljust(cols) # w/ progress line, "overwrite" trailing chars of previous msg with spaces 

176 return msg 

177 

178 @staticmethod 

179 def ljust_cols() -> int: 

180 """Lazily determines padding width from ProgressReporter settings.""" 

181 # lock-free yet thread-safe late configuration-based init for prettier ProgressReporter output 

182 # log_params.params and available_programs are not fully initialized yet before detect_available_programs() ends 

183 cols: int = 0 

184 assert log_params is not None 

185 p = log_params.params 

186 if p is not None and "local" in p.available_programs: 

187 if "pv" in p.available_programs["local"]: 

188 cols = p.terminal_columns 

189 assert cols is not None 

190 terminal_cols[0] = cols # finally, resolve to use this specific value henceforth 

191 return cols 

192 

193 return DefaultLogFormatter() 

194 

195 

196def get_simple_logger(program: str) -> Logger: 

197 """Returns a minimal logger for simple tools.""" 

198 

199 class LevelFormatter(logging.Formatter): 

200 """Injects level prefix and program name into log records.""" 

201 

202 def format(self, record: logging.LogRecord) -> str: 

203 """Attaches extra fields before delegating to base formatter.""" 

204 record.level_prefix = LOG_LEVEL_PREFIXES.get(record.levelno, "") 

205 record.program = program 

206 return super().format(record) 

207 

208 _add_trace_loglevel() 

209 log = logging.getLogger(program) 

210 log.setLevel(logging.INFO) 

211 log.propagate = False 

212 if not any(isinstance(h, logging.StreamHandler) for h in log.handlers): 

213 handler = logging.StreamHandler() 

214 handler.setFormatter( 

215 LevelFormatter(fmt="%(asctime)s %(level_prefix)s [%(program)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") 

216 ) 

217 log.addHandler(handler) 

218 return log 

219 

220 

221def _add_trace_loglevel() -> None: 

222 """Registers a custom TRACE logging level with the standard python logging framework.""" 

223 logging.addLevelName(LOG_TRACE, "TRACE") 

224 

225 

226def _get_syslog_address(address: str, log_syslog_socktype: str) -> tuple[str | tuple[str, int], socket.SocketKind | None]: 

227 """Normalizes syslog address to tuple form and returns socket type.""" 

228 address = address.strip() 

229 socktype: socket.SocketKind | None = None 

230 if ":" in address: 

231 host, port_str = address.rsplit(":", 1) 

232 addr = (host.strip(), int(port_str.strip())) 

233 scktype: socket.SocketKind = ( 

234 socket.SOCK_DGRAM 

235 if log_syslog_socktype == "UDP" 

236 else socket.SOCK_STREAM # for TCP # pytype: disable=annotation-type-mismatch 

237 ) 

238 return addr, scktype 

239 return address, socktype 

240 

241 

242def _remove_json_comments(config_str: str) -> str: # not standard but practical 

243 """Strips line and end-of-line comments from a JSON string.""" 

244 lines: list[str] = [] 

245 for line in config_str.splitlines(): 

246 stripped: str = line.strip() 

247 if stripped.startswith("#"): 

248 line = "" # replace comment line with empty line to preserve line numbering 

249 elif stripped.endswith("#"): 

250 i = line.rfind("#", 0, line.rindex("#")) 

251 if i >= 0: 

252 line = line[0:i] # strip line-ending comment 

253 lines.append(line) 

254 return "\n".join(lines) 

255 

256 

257def _get_dict_config_logger(log_params: LogParams, args: argparse.Namespace) -> Logger: 

258 """Creates a logger from a JSON config file with variable substitution.""" 

259 import json 

260 

261 prefix: str = PROG_NAME + "." 

262 log_config_vars: dict[str, str] = { 

263 prefix + "sub.logger": get_logger_subname(), 

264 prefix + "get_default_log_formatter": __name__ + ".get_default_log_formatter", 

265 prefix + "log_level": log_params.log_level, 

266 prefix + "log_dir": log_params.log_dir, 

267 prefix + "log_file": os.path.basename(log_params.log_file), 

268 prefix + "timestamp": log_params.timestamp, 

269 prefix + "dryrun": "dryrun" if args.dryrun else "", 

270 } 

271 log_config_vars.update(log_params.log_config_vars) # merge variables passed into CLI with convenience variables 

272 

273 log_config_file_str: str = log_params.log_config_file 

274 if log_config_file_str.startswith("+"): 

275 path: str = log_config_file_str[1:] 

276 basename_stem: str = Path(path).stem # stem is basename without file extension ("bzfs_log_config") 

277 if not ("bzfs_log_config" in basename_stem and os.path.basename(path).endswith(".json")): 

278 die(f"--log-config-file: basename must contain 'bzfs_log_config' and end with '.json': {path}") 

279 with open_nofollow(path, "r", encoding="utf-8") as fd: 

280 log_config_file_str = fd.read() 

281 

282 def substitute_log_config_vars(config_str: str, log_config_variables: dict[str, str]) -> str: 

283 """Substitutes ${name[:default]} placeholders within JSON with values from log_config_variables.""" 

284 

285 def substitute_fn(match: re.Match) -> str: 

286 """Returns JSON replacement for variable placeholder.""" 

287 varname: str = match.group(1) 

288 error_msg: str | None = _validate_log_config_variable_name(varname) 

289 if error_msg: 

290 raise ValueError(error_msg) 

291 replacement: str | None = log_config_variables.get(varname) 

292 if not replacement: 

293 default: str | None = match.group(3) 

294 if default is None: 

295 raise ValueError("Missing default value in JSON for empty log config variable: ${" + varname + "}") 

296 replacement = default 

297 replacement = json.dumps(replacement) # JSON escape special chars such as newlines, quotes, etc 

298 assert len(replacement) >= 2 

299 assert replacement.startswith('"') 

300 assert replacement.endswith('"') 

301 return replacement[1:-1] # strip surrounding quotes added by dumps() 

302 

303 pattern = re.compile(r"\$\{([^}:]*?)(:([^}]*))?}") # Any char except } and :, followed by optional default part 

304 return pattern.sub(substitute_fn, config_str) 

305 

306 log_config_file_str = _remove_json_comments(log_config_file_str) 

307 if not log_config_file_str.strip().startswith("{"): 

308 log_config_file_str = "{\n" + log_config_file_str # lenient JSON parsing 

309 if not log_config_file_str.strip().endswith("}"): 

310 log_config_file_str = log_config_file_str + "\n}" # lenient JSON parsing 

311 log_config_file_str = substitute_log_config_vars(log_config_file_str, log_config_vars) 

312 # if args is not None and args.verbose >= 2: 

313 # print("[T] Substituted log_config_file_str:\n" + log_config_file_str, flush=True) 

314 log_config_dict: dict = json.loads(log_config_file_str) 

315 _validate_log_config_dict(log_config_dict) 

316 logging.config.dictConfig(log_config_dict) 

317 return logging.getLogger(get_logger_subname()) 

318 

319 

320def validate_log_config_variable(var: str) -> str | None: 

321 """Returns error message if NAME:VALUE pair is malformed else ``None``.""" 

322 if not var.strip(): 

323 return "Invalid log config NAME:VALUE variable. Variable must not be empty: " + var 

324 if ":" not in var: 

325 return "Invalid log config NAME:VALUE variable. Variable is missing a colon character: " + var 

326 return _validate_log_config_variable_name(var[0 : var.index(":")]) 

327 

328 

329def _validate_log_config_variable_name(name: str) -> str | None: 

330 """Validates log config variable name and return error message if invalid.""" 

331 if not name: 

332 return "Invalid log config variable name. Name must not be empty: " + name 

333 bad_chars: str = "${} " + '"' + "'" 

334 if any(char in bad_chars for char in name): 

335 return f"Invalid log config variable name. Name must not contain forbidden {bad_chars} characters: " + name 

336 if any(char.isspace() for char in name): 

337 return "Invalid log config variable name. Name must not contain whitespace: " + name 

338 return None 

339 

340 

341def _validate_log_config_dict(config: dict) -> None: 

342 """Recursively scans the logging configuration dictionary to ensure that any instantiated objects via the '()' key are on 

343 an approved whitelist; This prevents arbitrary code execution from a malicious config file.""" 

344 whitelist: set[str] = { 

345 "logging.StreamHandler", 

346 "logging.FileHandler", 

347 "logging.handlers.SysLogHandler", 

348 "socket.SOCK_DGRAM", 

349 "socket.SOCK_STREAM", 

350 "sys.stdout", 

351 "sys.stderr", 

352 __name__ + ".get_default_log_formatter", 

353 } 

354 if isinstance(config, dict): 

355 for key, value in config.items(): 

356 if key == "()" and value not in whitelist: 

357 die(f"--log-config-file: Disallowed callable '{value}'. For security, only specific classes are permitted.") 

358 _validate_log_config_dict(value) 

359 elif isinstance(config, list): 

360 for item in config: 

361 _validate_log_config_dict(item) 

362 

363 

364############################################################################# 

365class Tee: 

366 """File-like object that duplicates writes to multiple streams.""" 

367 

368 def __init__(self, *files: IO[Any]) -> None: 

369 self.files = files 

370 

371 def write(self, obj: str) -> None: 

372 """Write ``obj`` to all target files and flush immediately.""" 

373 for file in self.files: 

374 file.write(obj) 

375 file.flush() 

376 

377 def flush(self) -> None: 

378 """Flush all target streams.""" 

379 for file in self.files: 

380 file.flush() 

381 

382 def fileno(self) -> int: 

383 """Return file descriptor of the first target stream.""" 

384 return self.files[0].fileno()