"""Module to handle logging in edflow.
Can be imported by application code to get loggers and find out where outputs
should be stored.
"""
import logging
import os, sys
import datetime
import shutil
import subprocess
from socket import gethostname
from tqdm import tqdm
[docs]class run(object):
"""Singleton managing all directories for a run.
Calling the init method below will set up a logging directory structure
that should be used for this run. Application code can import this class
and use its attributes to figure out where to store their outputs.
Note
----
This class is intended to provide run information without the need to pass
it through. Thus it behaves like a singleton by storing all information on
the class object itself and not an instance of the class.
Attributes
----------
exists : bool
True if log structure was initialized.
now : str
Representing time of initialization.
postfix : str
User specified postfix of run directory or eval directory.
name : str
The name of the current run. Stays consistent on resuming.
git_tag : str
If activated and a git repo was found, this attribute contains the tag
name pointing to a commit recording the state of the repository when
this run was started.
resumed : bool
True if this run was resumed.
code_root : str
Path where code is copied from.
code : str
Path where code is copied to.
root : str
Path under which all outputs of the run should be stored.
train : str
Path to store train outputs in.
eval : str
Path to eval subfolders.
latest_eval : str
Path to store eval outputs in.
configs : str
Path to store configs in.
checkpoints : str
Path to store checkpoints in.
"""
exists = False
[docs] @classmethod
def init(
cls,
log_dir=None,
run_dir=None,
code_root=".",
postfix=None,
log_level="info",
git=False,
):
"""Initialize logging for this run.
After execution of this method, the log directory structure was
created, code was copied and commited if desired, and some basic system
information has been logged. Subsequent use of loggers from
log.get_logger will result in log files written to the run directory.
Parameters
----------
log_dir : str
Create new run directory under this directory.
run_dir : str
Resume in existing run directory.
code_root : str
Path to where the code lives. py and yaml files will be copied into
run directory.
postfix : str
Identifier appended to run directory if non-existent else to latest
eval directory.
log_level : str
Default log level for loggers.
git : bool
If True, put code into tagged commit.
"""
has_info = log_dir is not None or run_dir is not None
if not cls.exists and has_info:
cls.now = now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
cls.postfix = postfix
cls.code_root = code_root
if run_dir is None:
if postfix is not None:
name = now + "_" + postfix
else:
name = now
cls.resumed = False
cls.root = os.path.join(log_dir, name)
else:
if run_dir[-1] == "/":
run_dir = run_dir[:-1]
cls.resumed = True
cls.root = run_dir
cls.name = os.path.split(cls.root)[1]
# create directory structure
cls._setup()
cls._setup_new_eval()
cls.exists = True
# log run information
log.set_log_level(log_level)
cls.logger = log.get_logger("run")
cls.logger.info(" ".join(sys.argv))
cls.logger.info("root: {}".format(cls.root))
cls.logger.info("hostname: {}".format(gethostname()))
try:
# try to match tmux target by tty.
# only works if stdin was not messed.
tty = os.ttyname(sys.stdin.fileno())
tmux_target = subprocess.run(
[
"tmux list-panes -a -F"
+ "'#{session_id}:#{window_id}.#{pane_id} #{pane_tty}'"
+ "| grep {}".format(tty)
],
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
).stdout
tmux_target = tmux_target.split("\n")[0].split(" ")[0]
# output can be used as tmux target, eg 'tmux a {tmux_target}'
# to attach to the pane running the logged run
cls.logger.info("tmux: {}".format(tmux_target))
except Exception:
pass
cls.logger.info("pid: {}".format(os.getpid()))
cls.logger.info("pgid: {}".format(os.getpgrp()))
if "CUDA_VISIBLE_DEVICES" in os.environ:
cls.logger.info(
"cuda_devices: {}".format(os.environ["CUDA_VISIBLE_DEVICES"])
)
# log code
if not cls.resumed and cls.code_root is not None:
cls._copy_code()
if git:
cls.git_tag = cls._git_commit()
cls.logger.info("git_tag: {}".format(cls.git_tag))
cls.logger.info(cls())
@classmethod
def _setup(cls):
"""Make all the directories."""
subdirs = ["code", "train", "eval", "configs"]
subsubdirs = {"code": [], "train": ["checkpoints"], "eval": [], "configs": []}
root = cls.root
cls.repr = "Project structure:\n{}\n".format(root)
for sub in subdirs:
path = os.path.join(root, sub)
setattr(cls, sub, path)
if sub != "code":
# Code directory will be created by copy code
os.makedirs(path, exist_ok=True)
cls.repr += "├╴{}\n".format(sub)
for subsub in subsubdirs[sub]:
path = os.path.join(root, sub, subsub)
setattr(cls, subsub, path)
os.makedirs(path, exist_ok=True)
cls.repr += " ├╴{}\n".format(subsub)
@classmethod
def _setup_new_eval(cls):
"""Always create subfolder in eval to avoid clashes between
evaluations."""
name = cls.now
if cls.postfix is not None:
name = name + "_" + cls.postfix
cls.latest_eval = os.path.join(cls.eval, name)
os.makedirs(cls.latest_eval)
@classmethod
def _copy_code(cls):
"""Copies all code to the code directory of the run."""
src = cls.code_root
dst = "./" + cls.code
try:
filtered_dirs = ["__pycache__"]
def ignore(directory, files):
filtered = []
for f in files:
full_path = os.path.join(directory, f)
is_cool = False
if f[-3:] == ".py":
is_cool = True
elif f[-5:] == ".yaml":
is_cool = True
elif os.path.isdir(full_path):
if not (f.startswith(".") or f in filtered_dirs):
is_cool = True
if not is_cool:
filtered += [f]
return filtered
shutil.copytree(src, dst, symlinks=False, ignore=ignore)
except shutil.Error as err:
cls.logger.warning(err)
@classmethod
def _git_commit(cls):
"""commit code, tag that commit and reset repositories state
performs something like the following
CHEAD=$(git rev-parse HEAD)
git add <files...>
git add -u
git commit -m "edflow ..."
git tag -a edflow_date-and-time-project -m "more"
git reset --mixed $CHEAD
"""
try:
CHEAD = subprocess.run(
["git rev-parse HEAD"],
shell=True,
check=True,
stdout=subprocess.PIPE,
universal_newlines=True,
).stdout.strip()
except subprocess.CalledProcessError:
cls.logger.warning(
"Tried to commit state of project but the current working directory does not appear to be a git repository."
)
tagname = "error: no git repository found"
else:
tagname = "{}_{}".format(cls.now, cls.postfix)
message = "command: {}\nroot: {}".format(" ".join(sys.argv), cls.root)
try:
addcommand = "git add {pyfiles}; git add {yamlfiles}; git add -u".format(
pyfiles=os.path.join(cls.code_root, "\*.py"),
yamlfiles=os.path.join(cls.code_root, "\*.yaml"),
)
output = subprocess.run(
[addcommand],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
).stdout
cls.logger.debug(output)
if (
subprocess.run(
["git diff-index --quiet HEAD"], shell=True
).returncode
!= 0
):
# dirty working directory - add commit
command = "git commit -m '{commitmessage}'; git tag '{tagname}'".format(
commitmessage=message, tagname=tagname
)
else:
# nothing to add - put message into annotated tag
command = "git tag -a '{tagname}' -m '{tagmessage}'".format(
tagname=tagname, tagmessage=message
)
cls.logger.debug(command)
output = subprocess.run(
[command],
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
).stdout
cls.logger.debug(output)
except Exception as e:
cls.logger.warning(
"Tried to commit state of project but error occured: {}".format(e)
)
tagname = "error: {}".format(e)
finally:
cls.logger.debug("git reset --mixed {}".format(CHEAD))
output = subprocess.run(
["git reset --mixed {}".format(CHEAD)],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
).stdout
cls.logger.debug(output)
return tagname
def __repr__(self):
"""Nice file structure representation."""
return type(self).repr
[docs]class TqdmHandler(logging.StreamHandler):
"""A logging handler compatible with tqdm progress bars.
"""
[docs] def emit(self, record):
# check if stderr and stdout are two different ptys.
# this detects tampering by wandb which messes up tqdm logging.
# fix it by writing to stderr instead of stdout.
try:
file_ = sys.stdout
if not os.ttyname(sys.stdout.fileno()) == os.ttyname(sys.stderr.fileno()):
file_ = sys.stderr
except OSError:
# stdout or stderr is not a pty. default to stdout.
file_ = sys.stdout
msg = self.format(record)
tqdm.write(msg, file=file_)
[docs]class log(object):
"""Singleton managing all loggers for a run.
Note
----
This class is intended to provide logging facilities without the need to
pass it through. Thus it behaves like a singleton by storing all
information on the class object itself and not an instance of the class.
Attributes
----------
target : str
Current default target to write log file to.
level
Current default log level for new loggers.
loggers
List of all loggers.
"""
target = "root" # default directory of ProjectManager to log into
level = logging.INFO
loggers = []
[docs] @classmethod
def set_log_target(cls, which):
"""Set default target where log file is written to."""
cls.target = which
[docs] @classmethod
def get_logger(cls, name, which=None, level=None):
"""Get logger.
If run was initialized, returns a logger which is compatible with tqdm
progress bars and logs into a file in the run directory. Otherwise,
returns a basic logger.
Parameters
----------
name : str or object
Name of the logger. If not a string, the name of the given object
class is used.
which : str
Subdirectory in the project folder where log file is written to.
level : str
Log level of the logger.
"""
_fix_abseil()
which = which or cls.target
level = level or cls.level
if not isinstance(name, str):
name = type(name).__name__
if not run.exists:
if not isinstance(name, str):
name = type(name).__name__
logging.basicConfig(level=level)
logger = logging.getLogger(name)
logger.debug("edflow not initialized.")
return logger
log_dir = getattr(run, which)
logger = cls._create_logger(name, log_dir, level=level)
cls.loggers += [logger]
return logger
[docs] @classmethod
def set_log_level(cls, level):
"""Set log level of all existing and default log level of all future
loggers."""
level = getattr(logging, level.upper())
log.level = level
for logger in log.loggers:
logger.setLevel(level)
cls.get_logger("log").debug("Log level set to {}".format(level))
@staticmethod
def _create_logger(name, out_dir, level=logging.INFO):
"""Creates a logger with tqdm- and file-handler."""
# init logging
logger = logging.getLogger(name)
logger.setLevel(level)
if not len(logger.handlers) > 0:
ch = TqdmHandler()
fh = logging.FileHandler(filename=os.path.join(out_dir, "log.txt"))
fmt_string = "[%(levelname)s] [%(name)s]: %(message)s"
formatter = logging.Formatter(fmt_string)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
def _fix_abseil():
# https://github.com/tensorflow/tensorflow/issues/26691#issuecomment-500369493
try:
import absl.logging
logging.root.removeHandler(absl.logging._absl_handler)
absl.logging._warn_preinit_stderr = False
except Exception:
pass
# backwards compatibility
LogSingleton = log
get_logger = log.get_logger