Source code for magnumnp.loggers.scalar_logger

#
# This file is part of the magnum.np distribution
# (https://gitlab.com/magnum.np/magnum.np).
# Copyright (c) 2023 magnum.np team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import torch
import os
from collections.abc import Iterable
from functools import reduce
from magnumnp.common import logging

__all__ = ["ScalarLogger"]

[docs] class ScalarLogger(object):
[docs] def __init__(self, filename, columns, every = 1, fsync_every = 1): """ Simple logger class to log scalar values into a tab separated file. *Arguments* filename (:class:`str`) The name of the log file columns ([:class:`str` | :class:`function`]) The columns to be written to the log file every (:class:`int`) Write row to log file every nth call fsync_every (:class:`int`) Call fsync every nth write to empty OS buffer *Example* .. code-block:: python # provide key strings with are available in state logger = ScalarLogger('log.dat', ['t','m']) # provide func(state) or tuple (name, func(state)) logger = ScalarLogger('log.dat', [('t[ns]', lambda state: state.t*1e9)]) # provide predifined functions logger = ScalarLogger('log.dat', [demag.h, demag.E]) # Actually log a row state = State(mesh) logger << state """ # create directory if not existent if not os.path.dirname(filename) == '' and \ not os.path.exists(os.path.dirname(filename)): try: os.makedirs(os.path.dirname(filename)) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise self._filename = filename self._file = None self._every = every self._fsync_every = fsync_every * every self._i = 0 self._columns = columns
def add_column(self, column): if self._file is not None: raise RuntimeError("You cannot add columns after first log row has been written.") self._columns.append(column) def log(self, state): self._i += 1 if ((self._i - 1) % self._every > 0): return values = [] for column in self._columns: if isinstance(column, str): name = column raw_value = getattr(state, column) elif hasattr(column, '__call__'): try: name = column.__self__.__class__.__name__ + "." + column.__name__ except: name = 'unnamed' raw_value = column(state) elif isinstance(column, tuple) or isinstance(column, list): name = column[0] raw_value = column[1](state) else: raise RuntimeError('Column type not supported.') if isinstance(raw_value, torch.Tensor): value = state.avg(raw_value).tolist() else: value = raw_value values.append((name, value)) if self._file is None: self._file = open(self._filename, 'w') self._write_header(values) self._write_row(values) if ((self._i - 1) % self._fsync_every == 0): os.fsync(self._file.fileno()) def __lshift__(self, state): self.log(state) def _write_header(self, columns): headings = [] for column in columns: if isinstance(column[1], Iterable): if (len(column[1]) == 3): for i in ('x', 'y', 'z'): headings.append(column[0] + '_' + i) else: for i in range(len(column[1])): headings.append(column[0] + '_' + str(i)) else: headings.append(column[0]) format_str = "#" + " ".join(["%-22s"] * len(headings)) + "\n" self._file.write(format_str % tuple(headings)) self._file.flush() def _write_row(self, columns): flat_values = reduce(lambda x,y: x+y, map(lambda x: tuple(x[1]) if isinstance(x[1], Iterable) else (x[1],), columns)) format_str = " ".join(["%+.15e"] * len(flat_values)) + "\n" self._file.write(format_str % flat_values) self._file.flush() def __del__(self): if self._file is not None: self._file.close()
[docs] def resumable_step(self): """ Returns the last step the logger can resume from, e.g. if the logger logs every 10th step and the first (i = 0) step was already logged, the result is 10. *Returns* :class:`int` The step number the logger is able to resume from """ if self._file is not None: raise RuntimeError("Cannot resume from log file that is already open for writing.") i = 0 with open(self._filename, 'r') as f: for i, l in enumerate(f): pass return i * self._every
[docs] def resume(self, i): """ Try to resume existing log file from log step i. The log file is truncated accordingly. *Arguments* i (:class:`int`) The log step to resume from """ number = (self.resumable_step() - i) / self._every # from https://superuser.com/questions/127786/efficiently-remove-the-last-two-lines-of-an-extremely-large-text-file count = 0 #with open(self._filename, 'r+b') as f: # f.seek(0, os.SEEK_END) # end = f.tell() # while f.tell() > 0: # f.seek(-1, os.SEEK_CUR) # char = f.read(1) # if char != '\n' and f.tell() == end: # raise RuntimeError("Cannot resume: logfile does not end with a newline.") # if char == '\n': # count += 1 # if count == number + 1: # f.truncate() # break # f.seek(-1, os.SEEK_CUR) with open(self._filename, 'r+b', buffering=0) as f: f.seek(0, os.SEEK_END) end = f.tell() while f.tell() > 0: f.seek(-1, os.SEEK_CUR) #print(f.tell()) char = f.read(1) if char != b'\n' and f.tell() == end: raise RuntimeError("Cannot resume: logfile does not end with a newline.") #print ("No change: file does not end with a newline") #exit(1) if char == b'\n': count += 1 if count == number + 1: f.truncate() break #print ("Removed " + str(number) + " lines from end of file") #exit(0) f.seek(-1, os.SEEK_CUR) self._i = i if self._i > 0: self._file = open(self._filename, 'a')