#
# This file is part of the magnum.np distribution
# (https://gitlab.com/magnum.np/magnum.np).
# Copyright (c) 2023 magnum.np team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import torch
import os
from collections.abc import Iterable
from magnumnp.loggers import ScalarLogger, FieldLogger
from magnumnp.common import logging
__all__ = ["Logger"]
[docs]
class Logger(object):
"""
Combined Scalar- and Field-Logger class
*Arguments*
directory (:class:`str`)
The name of the log file
scalars ([:class:`str` | :class:`function`])
The columns to be written to the log file
scalars_every (:class:`int`)
Write scalar to log file every nth call
fields ([:class:`str` | :class:`function`])
The columns to be written to the log file
fields_every (:class:`int`)
Write fields to log file every nth call
*Example*
.. code-block:: python
# provide key strings with are available in state
logger = Logger('data', ['m', demag.h], ['m'], fields_every = 100)
# Actually log fields
state = State(mesh)
logger << state
"""
[docs]
def __init__(self, directory, scalars = [], fields = [], scalars_every = 1, fields_every = 1, scale = 1.):
self.loggers = {}
self._resume_time = None
if len(scalars) > 0:
self.loggers["scalars"] = ScalarLogger(os.path.join(directory, "log.dat"), scalars, every = scalars_every)
if len(fields) > 0:
self.loggers["fields"] = FieldLogger(os.path.join(directory, "fields.pvd"), fields, every = fields_every, scale = scale)
def log(self, state):
if state.t == self._resume_time: # avoid logging directly after resume
self._resume_time = None
return
for logger in self.loggers.values():
logger << state
def __lshift__(self, state):
self.log(state)
[docs]
def resume(self, state):
"""
Tries to resume from existing log files. If resume is possible
the state object is updated with latest possible values of t
and m and the different log files are aligned and resumed.
If resume is not possible the state is not modiefied and the
simulations starts from the beginning.
*Arguments*
state (:class:`State`)
The state to be resumed from the log data
"""
last_recorded_step = self.loggers["fields"].last_recorded_step()
if last_recorded_step is None:
logging.warning("Resume not possible. Start over.")
return
resumable_step = min(map(lambda logger: logger.resumable_step(), self.loggers.values()))
assert resumable_step >= last_recorded_step + 1
state.m, state.t = self.loggers["fields"].step_data(last_recorded_step, "m")
logging.info_green("Resuming from step %d (t = %g)." % (last_recorded_step, state.t))
for logger in self.loggers.values():
logger.resume(last_recorded_step + 1)
self._resume_time = state.t
[docs]
def is_resumable(self):
"""
Returns True if logger can resume from log files.
*Returns*
:class:`bool`
True if resumable, False otherwise
"""
return self.loggers["fields"].last_recorded_step() is not None