Skip to content

[bugfix] Introduce optional file locking for appending to perflogs #3523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 70 additions & 46 deletions docs/config_reference.rst

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ archspec==0.2.5
ClusterShell==1.9.3
docutils==0.18.1; python_version < '3.9'
docutils==0.21.2; python_version >= '3.9'
fasteners==0.19
jinja2==3.0.3; python_version == '3.6'
jinja2==3.1.6; python_version >= '3.7'
jsonschema==3.2.0
Expand Down
89 changes: 76 additions & 13 deletions reframe/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD-3-Clause

import abc
import atexit
import itertools
import logging
import logging.handlers
Expand Down Expand Up @@ -149,7 +150,8 @@ class MultiFileHandler(logging.FileHandler):
'''

def __init__(self, prefix, mode='a', encoding=None, fmt=None,
perffmt=None, ignore_keys=None):
perffmt=None, ignore_keys=None, use_locking=False,
lockfile_mode=None):
super().__init__(prefix, mode, encoding, delay=True)

# Reset FileHandler's filename
Expand All @@ -164,6 +166,9 @@ def __init__(self, prefix, mode='a', encoding=None, fmt=None,
self.__perffmt = perffmt
self.__attr_patt = re.compile(r'\%\((.*?)\)s(.*?(?=%|$))?')
self.__ignore_keys = set(ignore_keys) if ignore_keys else set()
self.__use_locking = use_locking
self.__lockfile_mode = lockfile_mode
self.__locks = {}

def __generate_header(self, record):
# Generate the header from the record and fmt
Expand Down Expand Up @@ -208,6 +213,14 @@ def __generate_header(self, record):

return header

def __lock_file_name(self, logfile=None):
if logfile is None:
logfile = self.baseFilename

prefix = os.path.dirname(logfile)
basename, _ = os.path.splitext(os.path.basename(logfile))
return os.path.join(prefix, f'.{basename}.lock')

def _emit_header(self, record):
if self.baseFilename in self.__streams:
return
Expand All @@ -234,13 +247,27 @@ def _emit_header(self, record):

os.rename(self.baseFilename, self.baseFilename + f'.h{hcnt}')
finally:
# Open the file for writing and write the header
fp = open(self.baseFilename,
mode=self.mode, encoding=self.encoding)
if record_header != header:
fp.write(f'{record_header}\n')
if self.__use_locking:
# When using locking, we need to open, append and write to
# the file at once
rwlock = osext.ReadWriteFileLock(self.__lock_file_name(),
self.__lockfile_mode)
with rwlock.write_lock():
with open(self.baseFilename, mode=self.mode,
encoding=self.encoding) as fp:
if record_header != header:
fp.write(f'{record_header}\n')

self.__streams[self.baseFilename] = None
self.__locks[self.baseFilename] = rwlock
else:
# Open the file for writing and write the header
fp = open(self.baseFilename,
mode=self.mode, encoding=self.encoding)
if record_header != header:
fp.write(f'{record_header}\n')

self.__streams[self.baseFilename] = fp
self.__streams[self.baseFilename] = fp

def emit(self, record):
try:
Expand All @@ -255,14 +282,22 @@ def emit(self, record):
check_basename = type(record.__rfm_check__).variant_name()
self.baseFilename = os.path.join(dirname, f'{check_basename}.log')
self._emit_header(record)
self.stream = self.__streams[self.baseFilename]
super().emit(record)
if self.__use_locking:
with self.__locks[self.baseFilename].write_lock():
with open(self.baseFilename, mode=self.mode,
encoding=self.encoding) as fp:
self.stream = fp
super().emit(record)
else:
self.stream = self.__streams[self.baseFilename]
super().emit(record)

def close(self):
# Close all open streams
for s in self.__streams.values():
self.stream = s
super().close()
for stream in self.__streams.values():
if stream:
self.stream = stream
super().close()


def _format_time_rfc3339(timestamp, datefmt):
Expand Down Expand Up @@ -459,9 +494,16 @@ def _create_filelog_handler(site_config, config_prefix):
format = site_config.get(f'{config_prefix}/format')
format_perf = site_config.get(f'{config_prefix}/format_perfvars')
ignore_keys = site_config.get(f'{config_prefix}/ignore_keys')
use_locking = site_config.get(f'{config_prefix}/locking_enable')
lockfile_mode = site_config.get(f'{config_prefix}/locking_file_mode')
if lockfile_mode is not None:
lockfile_mode = int(lockfile_mode, base=8)

return MultiFileHandler(filename_patt, mode='a+' if append else 'w+',
fmt=format, perffmt=format_perf,
ignore_keys=ignore_keys)
ignore_keys=ignore_keys,
use_locking=use_locking,
lockfile_mode=lockfile_mode)


@register_log_handler('syslog')
Expand Down Expand Up @@ -806,6 +848,13 @@ def debug(self, message, *args, **kwargs):
def debug2(self, message, *args, **kwargs):
self.log(DEBUG2, message, *args, **kwargs)

def shutdown(self):
'''Shutdown logger by removing all handlers and closing any files.'''

for h in list(self.handlers):
h.close()
self.removeHandler(h)


# This is a cache for warnings that we don't want to repeat
_WARN_ONCE = set()
Expand Down Expand Up @@ -1027,6 +1076,8 @@ def configure_logging(site_config):
_context_logger = null_logger
return

# Shutdown the previously setup loggers and close any files
shutdown()
_logger = _create_logger(site_config, 'handlers$', 'handlers')
_perf_logger = _create_logger(site_config, 'handlers_perflog')
_context_logger = LoggerAdapter(_logger)
Expand Down Expand Up @@ -1079,6 +1130,17 @@ def _fn(*args, **kwargs):
return _fn


@atexit.register
def shutdown():
'''Shutdown logging'''

if _logger:
_logger.shutdown()

if _perf_logger:
_perf_logger.shutdown()


# The following is meant to be used only by the unit tests

class logging_sandbox:
Expand All @@ -1098,6 +1160,7 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
global _logger, _perf_logger, _context_logger

shutdown()
_logger = self._logger
_perf_logger = self._perf_logger
_context_logger = self._context_logger
24 changes: 6 additions & 18 deletions reframe/frontend/reporting/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import os
import re
import sqlite3
import sys
from filelock import FileLock

import reframe.utility.jsonext as jsonext
import reframe.utility.osext as osext
Expand Down Expand Up @@ -88,6 +86,11 @@ def __init__(self):
else:
self.__db_file_mode = mode

self.__db_lock = osext.ReadWriteFileLock(
os.path.join(os.path.dirname(self.__db_file), '.db.lock'),
self.__db_file_mode
)

def _db_file(self):
prefix = os.path.dirname(self.__db_file)
if not os.path.exists(self.__db_file):
Expand Down Expand Up @@ -124,22 +127,7 @@ def _db_connect(self, *args, **kwargs):
return sqlite3.connect(*args, **kwargs)

def _db_lock(self):
prefix = os.path.dirname(self.__db_file)
if sys.version_info >= (3, 7):
kwargs = {'mode': self.__db_file_mode}
else:
# Python 3.6 forces us to use an older filelock version that does
# not support file modes. File modes where introduced in
# filelock 3.10
kwargs = {}

# Create parent directories of the lock file
#
# NOTE: This is not necessary for filelock >= 3.12.3 and Python >= 3.8
# However, we do create it here, in order to support the older Python
# versions.
os.makedirs(prefix, exist_ok=True)
return FileLock(os.path.join(prefix, '.db.lock'), **kwargs)
return self.__db_lock.write_lock()

def _db_create(self):
clsname = type(self).__name__
Expand Down
6 changes: 5 additions & 1 deletion reframe/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@
"ignore_keys": {
"type": "array",
"items": {"type": "string"}
}
},
"locking_enable": {"type": "boolean"},
"locking_file_mode": {"type": ["string", "null"]}
},
"required": ["prefix"]
}
Expand Down Expand Up @@ -633,6 +635,8 @@
"logging/handlers_perflog/filelog_append": true,
"logging/handlers_perflog/filelog_basedir": "./perflogs",
"logging/handlers_perflog/filelog_ignore_keys": [],
"logging/handlers_perflog/filelog_locking_enable": false,
"logging/handlers_perflog/filelog_locking_file_mode": null,
"logging/handlers_perflog/graylog_extras": {},
"logging/handlers_perflog/httpjson_extras": {},
"logging/handlers_perflog/httpjson_ignore_keys": [],
Expand Down
31 changes: 31 additions & 0 deletions reframe/utility/osext.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import collections.abc
import errno
import fasteners
import getpass
import grp
import os
Expand Down Expand Up @@ -863,6 +864,36 @@ def unique_abs_paths(paths, prune_children=True):
return list(unique_paths - children)


class temp_umask:
'''Temporarily change the umask'''
def __init__(self, mask):
self.__new_mask = mask
self.__old_mask = None

def __enter__(self):
self.__old_mask = os.umask(self.__new_mask)

def __exit__(self, exc_type, exc_val, exc_tb):
os.umask(self.__old_mask)


class ReadWriteFileLock(fasteners.InterProcessReaderWriterLock):
def __init__(self, path, mode=None):
super().__init__(path)
self._mode = mode

def _do_open(self, *args, **kwargs):
if self._mode is not None:
# We create the directory structure ourselves, so that the mask
# applies strictly to the lock file if the parent directories do
# not exist
os.makedirs(os.path.dirname(self.path), exist_ok=True)
with temp_umask(0o0777 ^ self._mode):
return super()._do_open(*args, **kwargs)
else:
return super()._do_open(*args, **kwargs)


def cray_cdt_version():
'''Return either the Cray Development Toolkit (CDT) version, the Cray
Programming Environment (CPE) version or :class:`None` if the version
Expand Down
5 changes: 1 addition & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ archspec==0.2.5
argcomplete==3.1.2; python_version < '3.8'
argcomplete==3.6.1; python_version >= '3.8'
ClusterShell==1.9.3
filelock==3.4.1; python_version == '3.6'
filelock==3.12.2; python_version == '3.7'
filelock==3.16.1; python_version == '3.8'
filelock==3.18.0; python_version > '3.8'
fasteners==0.19
importlib_metadata==4.0.1; python_version < '3.8'
jinja2==3.0.3; python_version == '3.6'
jinja2==3.1.6; python_version >= '3.7'
Expand Down
5 changes: 1 addition & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ install_requires =
argcomplete
argcomplete <= 3.1.2; python_version < '3.8'
ClusterShell
filelock
filelock<=3.16.1; python_version == '3.8'
filelock<=3.12.2; python_version == '3.7'
filelock<=3.4.1; python_version == '3.6'
fasteners
jinja2==3.0.3; python_version == '3.6'
jinja2
jsonschema
Expand Down
5 changes: 3 additions & 2 deletions unittests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def handler(logfile, rfc3339formatter):
def logger(handler):
logger = rlog.Logger('reframe')
logger.addHandler(handler)
return logger
yield logger
logger.shutdown()


@pytest.fixture
Expand Down Expand Up @@ -545,7 +546,7 @@ def url_scheme(request):

def test_httpjson_handler_no_port(make_exec_ctx, config_file,
url_scheme, logging_sandbox):
ctx = make_exec_ctx(
make_exec_ctx(
config_file({
'level': 'info',
'handlers_perflog': [{
Expand Down
28 changes: 28 additions & 0 deletions unittests/test_perflogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,31 @@ def validate(self):

assert len(lines) == 2
assert lines[1] == 'fail|sanity error: no way|None|None\n'


def test_perf_logging_locking(make_runner, make_exec_ctx,
config_perflog, perf_test, tmp_path):
make_exec_ctx(config_perflog(
fmt='',
logging_opts={
'handlers_perflog': [{
'type': 'filelog',
'use_locking': True,
'prefix': '%(check_system)s/%(check_partition)s',
'level': 'info',
'format': (
'%(check_job_completion_time)s,%(version)s,'
'%(check_display_name)s,%(check_system)s,'
'%(check_partition)s,%(check_environ)s,'
'%(check_jobid)s,%(check_result)s,%(check_perfvalues)s'
)
}]
}
))
logging.configure_logging(rt.runtime().site_config)
runner = make_runner()
testcases = executors.generate_testcases([perf_test])
_assert_no_logging_error(runner.runall, testcases)
logfile = tmp_path / 'perflogs' / 'generic' / 'default' / '_MyPerfTest.log'
assert os.path.exists(logfile)
assert _count_lines(logfile) == 2
Loading