# File: _session_.py
# Author: Ryoichi Ando (ryoichi.ando@zozo.com)
# License: Apache v2.0
from ._scene_ import FixedScene
from ._plot_ import Plot
from ._utils_ import Utils
from ._parse_ import ParamParser, CppRustDocStringParser
from tqdm import tqdm
import pandas as pd
import signal
import subprocess
import numpy as np
import shutil
import psutil
import os
import threading
import time
import copy
from typing import Any, Optional
PROCESS_NAME = "ppf-contact"
CONSOLE_STYLE = """
<style>
.no-scroll {
overflow: hidden;
white-space: pre-wrap;
font-family: monospace;
}
</style>
"""
[docs]
class Param:
"""Class to manage simulation parameters."""
def __init__(self, app_root: str):
"""Initialize the Param class.
Args:
app_root (str): The root directory of the application.
"""
path = os.path.abspath(os.path.join(app_root, "src", "args.rs"))
self._key = None
self._default_param = ParamParser.get_default_params(path)
self._param = self._default_param.copy()
self._time = 0.0
self._dyn_param = {}
[docs]
def copy(self) -> "Param":
"""Copy the Param object.
Returns:
Param: The copied Param object.
"""
return copy.deepcopy(self)
[docs]
def set(self, key: str, value: Any) -> "Param":
"""Set a parameter value.
Args:
key (str): The parameter key.
value (Any): The parameter value.
Returns:
Param: The updated Param object.
"""
if "_" in key:
raise ValueError("Key cannot contain underscore. Use '-' instead.")
elif key not in self._param.keys():
raise ValueError(f"Key {key} does not exist")
else:
self._param[key]["value"] = value
return self
[docs]
def clear(self, key: str) -> "Param":
"""Clear a parameter.
Args:
key (str): The parameter key.
"""
self._param[key]["value"] = self._default_param[key]["value"]
if key in self._dyn_param.keys():
del self._dyn_param[key]
return self
[docs]
def dyn(self, key: str) -> "Param":
"""Set a current dynamic parameter key.
Args:
key (str): The dynamic parameter key.
Returns:
Param: The updated Param object.
"""
if key not in self._param.keys():
raise ValueError(f"Key {key} does not exist")
else:
self._key = key
return self
[docs]
def change(self, value: float) -> "Param":
"""Change the value of the dynamic parameter at the current time.
Args:
value (float): The new value of the dynamic parameter.
Returns:
Param: The updated Param object.
"""
if self._key is None:
raise ValueError("Key is not set")
else:
if self._key in self._dyn_param.keys():
self._dyn_param[self._key].append((self._time, value))
else:
initial_val = self._param[self._key]
self._dyn_param[self._key] = [
(0.0, initial_val),
(self._time, value),
]
return self
[docs]
def hold(self) -> "Param":
"""Hold the current value of the dynamic parameter.
Returns:
Param: The updated Param object.
"""
if self._key is None:
raise ValueError("Key is not set")
else:
if self._key in self._dyn_param.keys():
last_val = self._dyn_param[self._key][-1][1]
self.change(last_val)
else:
val = self._param[self._key]["value"]
if isinstance(val, float):
self.change(val)
else:
raise ValueError("Key must be float")
return self
[docs]
def export(self, path: str):
"""Export the parameters to a file.
Args:
path (str): The path to the export directory.
"""
if len(self._param.keys()):
with open(os.path.join(path, "param.toml"), "w") as f:
f.write("[param]\n")
for key, value in self._param.items():
val = value["value"]
key = key.replace("-", "_")
if val is not None:
if isinstance(val, str):
f.write(f'{key} = "{val}"\n')
elif isinstance(val, bool):
if val:
f.write(f"{key} = true\n")
else:
f.write(f"{key} = {val}\n")
else:
f.write(f"{key} = false\n")
if len(self._dyn_param.keys()):
with open(os.path.join(path, "dyn_param.txt"), "w") as f:
for key, vals in self._dyn_param.items():
f.write(f"[{key}]\n")
for entry in vals:
time, val = entry
f.write(f"{time} {val}\n")
[docs]
def time(self, time: float) -> "Param":
"""Set the current time for the dynamic parameter.
Args:
time (float): The current time.
Returns:
Param: The updated Param object.
"""
if time <= self._time:
raise ValueError("Time must be increasing")
else:
self._time = time
return self
[docs]
def get(self, key: Optional[str] = None):
"""Get the value of a parameter.
Args:
key (Optional[str], optional): The parameter key.
If not specified, all parameters are returned.
Returns:
Any: The value of the parameter.
"""
if key is None:
return self._param
else:
return self._param[key]["value"]
[docs]
def items(self):
"""Get all parameter items.
Returns:
ItemsView: The parameter items.
"""
return self._param.items()
[docs]
class SessionManager:
"""Class to manage simulation sessions."""
def __init__(self, app_root: str, proj_root: str, save_func):
"""Initialize the SessionManager class.
Args:
app_root (str): The root directory of the application.
proj_root (str): The root directory of the project.
save_func (Callable): The save function.
"""
self._app_root = app_root
self._proj_root = proj_root
self._save_func = save_func
self._sessions = {}
self._curr = None
[docs]
def list(self):
"""List all sessions.
Returns:
dict: The sessions.
"""
return self._sessions
[docs]
def select(self, name: str):
"""Select a session.
Args:
name (str): The name of the session.
Returns:
Session: The selected session.
"""
if name not in self._sessions.keys():
raise ValueError(f"Session {name} does not exist")
self._curr = name
return self._sessions[name]
[docs]
def current(self):
"""Get the current session.
Returns:
Session: The current session.
"""
if self._curr is None:
return None
else:
return self._sessions[self._curr]
[docs]
def create(
self, scene: FixedScene, name: str = "", delete_if_exists: bool = True
) -> "Session":
"""Create a new session.
Args:
scene (FixedScene): The scene object.
name (str): The name of the session. If not specified, current time is used.
delete_if_exists (bool, optional): Whether to delete the session if it exists.
Returns:
Session: The created session.
"""
if name == "":
name = time.strftime("%Y-%m-%d-%H-%M-%S")
if name in self._sessions.keys():
if delete_if_exists:
session = self._sessions[name]
if is_running():
terminate()
self._sessions[name].delete()
else:
raise ValueError(f"Session {name} already exists")
session = Session(self._app_root, self._proj_root, name, self._save_func)
self._sessions[name] = session
self._curr = name
return session.init(scene)
def _terminate_or_raise(self, force: bool):
"""Terminate the solver if it is running, or raise an exception.
Args:
force (bool): Whether to force termination.
"""
if is_running():
if force:
terminate()
else:
raise ValueError("Solver is running. Terminate first.")
[docs]
def delete(self, name: str, force: bool = True):
"""Delete a session.
Args:
name (str): The name of the session.
force (bool, optional): Whether to force deletion.
"""
self._terminate_or_raise(force)
if name in self._sessions.keys():
self._sessions[name].delete()
del self._sessions[name]
if name == self._curr:
self._curr = None
[docs]
def clear(self, force: bool = True):
"""Clear all sessions.
Args:
force (bool, optional): Whether to force clearing.
"""
self._terminate_or_raise(force)
for session in self._sessions.values():
session.delete()
self._sessions = {}
self._curr = None
[docs]
def param(self) -> Param:
"""Get a new Param object.
Returns:
Param: The Param object.
"""
return Param(self._proj_root)
[docs]
class SessionInfo:
"""Class to store session information."""
def __init__(self, name: str):
"""Initialize the SessionInfo class.
Args:
name (str): The name of the session.
path (str): The path to the session directory.
"""
self._name = name
self._path = ""
[docs]
def set_path(self, path: str):
"""Set the path to the session directory.
Args:
path (str): The path to the session directory.
"""
self._path = path
@property
def name(self) -> str:
"""Get the name of the session."""
return self._name
@property
def path(self) -> str:
"""Get the path to the session directory."""
return self._path
class Zippable:
def __init__(self, dirpath: str):
self._dirpath = dirpath
def zip(self):
"""Zip the directory."""
path = f"{self._dirpath}.zip"
print(f"zipping to {path}")
shutil.make_archive(self._dirpath, "zip", self._dirpath)
print("done")
[docs]
class SessionExport:
"""Class to handle session export operations."""
def __init__(self, session: "Session"):
"""Initialize the SessionExport class.
Args:
session (Session): The session object.
"""
self._session = session
[docs]
def shell_command(
self,
param: Param,
) -> str:
"""Generate a shell command to run the solver.
Args:
param (Param): The simulation parameters.
Returns:
str: The shell command.
"""
param.export(self._session.info.path)
program_path = os.path.join(
self._session._proj_root, "target", "release", "ppf-contact-solver"
)
if os.path.exists(program_path):
command = " ".join(
[
program_path,
f"--path {self._session.info.path}",
f"--output {self._session.output.path}",
]
)
path = os.path.join(self._session.info.path, "command.sh")
with open(path, "w") as f:
f.write(command)
os.chmod(path, 0o755)
return path
else:
raise ValueError("Solver does not exist")
[docs]
def animation(self, path: str, ext="ply", include_static: bool = True) -> Zippable:
"""Export the animation frames.
Args:
path (str): The path to the export directory.
ext (str, optional): The file extension. Defaults to "ply".
include_static (bool, optional): Whether to include the static mesh.
"""
if os.path.exists(path):
shutil.rmtree(path)
else:
os.makedirs(path)
for i in tqdm(range(self._session.get.latest_frame()), desc="export", ncols=70):
self.frame(os.path.join(path, f"frame_{i}.{ext}"), i, include_static)
return Zippable(path)
[docs]
def frame(
self, path: str, frame: Optional[int] = None, include_static: bool = True
) -> "Session":
"""Export a specific frame.
Args:
path (str): The path to the export file.
frame (Optional[int], optional): The frame number. Defaults to None.
include_static (bool, optional): Whether to include the static mesh.
Returns:
Session: The session object.
"""
if self._session._fixed is None:
raise ValueError("Scene must be initialized")
else:
vert = self._session._fixed._vert
if frame is not None:
result = self._session.get.vertex(frame)
if result is not None:
vert, _ = result
else:
result = self._session.get.vertex()
if result is not None:
vert, _ = result
self._session._fixed.export(vert, path, include_static)
return self._session
[docs]
class SessionOutput:
"""Class to handle session output operations."""
def __init__(self, session: "Session"):
"""Initialize the SessionOutput class.
Args:
session (Session): The session object.
"""
self._session = session
@property
def path(self) -> str:
"""Get the path to the output directory."""
return os.path.join(self._session.info.path, "output")
class SessionLog:
"""Class to handle session log retrieval operations."""
def __init__(self, session: "Session") -> None:
src_path = os.path.join(session._proj_root, "src")
self._session = session
self._log = CppRustDocStringParser.get_logging_docstrings(src_path)
def names(self) -> list[str]:
"""Get the list of log names.
Returns:
list[str]: The list of log names.
"""
return list(self._log.keys())
def _tail_file(self, path: str, n_lines: Optional[int] = None) -> list[str]:
"""Get the last n lines of a file.
Args:
path (str): The path to the file.
n_lines (Optional[int], optional): The number of lines. Defaults to None.
Returns:
list[str]: The last n lines of the file.
"""
if os.path.exists(path):
with open(path, "r") as f:
lines = f.readlines()
lines = [line.rstrip("\n") for line in lines]
if n_lines is not None:
return lines[-n_lines:]
else:
return lines
return []
def stream(self, n_lines: Optional[int] = None) -> list[str]:
"""Get the last n lines of the log file.
Args:
n_lines (Optional[int], optional): The number of lines. Defaults to None.
Returns:
list[str]: The last n lines of the log file.
"""
return self._tail_file(
os.path.join(self._session.info.path, "output", "cudasim_log.txt"), n_lines
)
def stdout(self, n_lines: Optional[int] = None) -> list[str]:
"""Get the last n lines of the stdout log file.
Args:
n_lines (Optional[int], optional): The number of lines. Defaults to None.
Returns:
list[str]: The last n lines of the stdout log file.
"""
return self._tail_file(
os.path.join(self._session.info.path, "stdout.log"), n_lines
)
def stderr(self, n_lines: Optional[int] = None) -> list[str]:
"""Get the last n lines of the stderr log file.
Args:
n_lines (Optional[int], optional): The number of lines. Defaults to None.
Returns:
list[str]: The last n lines of the stderr log file.
"""
return self._tail_file(
os.path.join(self._session.info.path, "error.log"), n_lines
)
def numbers(self, name: str):
"""Get a pair of numbers from a log file.
Args:
name (str): The name of the log file.
Returns:
list[list[float]]: The list of pair of numbers.
"""
def float_or_int(var):
var = float(var)
if var.is_integer():
return int(var)
else:
return var
filename = self._log[name]["filename"]
path = os.path.join(self._session.info.path, "output", "data", filename)
entries = []
if os.path.exists(path):
with open(path, "r") as f:
lines = f.readlines()
for line in lines:
entry = line.split(" ")
entries.append([float_or_int(entry[0]), float_or_int(entry[1])])
return entries
else:
return None
def number(self, name: str):
"""Get the latest value from a log file.
Args:
name (str): The name of the log file.
Returns:
float: The latest value.
"""
entries = self.numbers(name)
if entries:
return entries[-1][1]
else:
return None
[docs]
class SessionGet:
"""Class to handle session data retrieval operations."""
def __init__(self, session: "Session"):
"""Initialize the SessionGet class.
Args:
session (Session): The session object.
"""
self._session = session
self._log = SessionLog(session)
@property
def log(self) -> SessionLog:
"""Get the session log object."""
return self._log
[docs]
def vertex_frame_count(self) -> int:
"""Get the vertex count.
Returns:
int: The vertex count.
"""
path = os.path.join(self._session.info.path, "output")
max_frame = 0
if os.path.exists(path):
files = os.listdir(path)
for file in files:
if file.startswith("vert") and file.endswith(".bin"):
frame = int(file.split("_")[1].split(".")[0])
max_frame = max(max_frame, frame)
return max_frame
[docs]
def latest_frame(self) -> int:
"""Get the latest frame number.
Returns:
int: The latest frame number.
"""
path = os.path.join(self._session.info.path, "output")
if os.path.exists(path):
files = os.listdir(path)
frames = []
for file in files:
if file.startswith("vert") and file.endswith(".bin"):
frame = int(file.split("_")[1].split(".")[0])
frames.append(frame)
if len(frames) > 0:
return sorted(frames)[-1]
return 0
[docs]
def vertex(self, n: Optional[int] = None) -> Optional[tuple[np.ndarray, int]]:
"""Get the vertex data for a specific frame.
Args:
n (Optional[int], optional): The frame number. If not specified, the latest frame is returned. Defaults to None.
Returns:
Optional[tuple[np.ndarray, int]]: The vertex data and frame number.
"""
if self._session._fixed is None:
raise ValueError("Scene must be initialized")
else:
path = os.path.join(self._session.info.path, "output")
if os.path.exists(path):
if n is None:
files = os.listdir(path)
frames = []
for file in files:
if file.startswith("vert") and file.endswith(".bin"):
frame = int(file.split("_")[1].split(".")[0])
frames.append(frame)
if len(frames) > 0:
frames = sorted(frames)
last_frame = frames[-1]
path = os.path.join(path, f"vert_{last_frame}.bin")
try:
with open(path, "rb") as f:
data = f.read()
vert = np.frombuffer(data, dtype=np.float32).reshape(
-1, 3
)
if len(vert) == len(self._session._fixed._vert):
return (vert, last_frame)
else:
return None
except ValueError:
return None
else:
try:
path = os.path.join(path, f"vert_{n}.bin")
if os.path.exists(path):
with open(path, "rb") as f:
data = f.read()
vert = np.frombuffer(data, dtype=np.float32).reshape(
-1, 3
)
return (vert, n)
except ValueError:
pass
return None
return None
[docs]
class Session:
"""Class to manage a simulation session."""
def __init__(self, app_root: str, proj_root: str, name: str, save_func):
"""Initialize the Session class.
Args:
app_root (str): The root directory of the application.
proj_root (str): The root directory of the project.
name (str): The name of the session.
save_func (Callable): The save function.
"""
self._in_jupyter_notebook = Utils.in_jupyter_notebook()
self._app_root = app_root
self._proj_root = proj_root
self._fixed = None
self._save_func = save_func
self._update_preview_interval = 1.0 / 60.0
self._update_terminal_interval = 1.0 / 30.0
self._update_table_interval = 0.25
self._info = SessionInfo(name)
self._export = SessionExport(self)
self._get = SessionGet(self)
self._output = SessionOutput(self)
self.delete()
@property
def info(self) -> SessionInfo:
"""Get the session information."""
return self._info
@property
def export(self) -> SessionExport:
"""Get the session export object."""
return self._export
@property
def get(self) -> SessionGet:
"""Get the session get object."""
return self._get
@property
def output(self) -> SessionOutput:
"""Get the session output object."""
return self._output
[docs]
def print(self, message):
"""Print a message.
Args:
message (str): The message to print.
"""
if self._in_jupyter_notebook:
from IPython.display import display
display(message)
else:
print(message)
[docs]
def delete(self):
"""Delete the session."""
if os.path.exists(self.info.path):
shutil.rmtree(self.info.path)
def _check_ready(self):
"""Check if the session is ready."""
if self._fixed is None:
raise ValueError("Scene must be initialized")
[docs]
def init(self, scene: FixedScene) -> "Session":
"""Initialize the session with a fixed scene.
Args:
scene (FixedScene): The fixed scene.
Returns:
Session: The initialized session.
"""
path = os.path.expanduser(
os.path.join(self._app_root, "session", scene._name, self.info.name)
)
self.info.set_path(path)
if is_running():
self.print("Solver is already running. Teriminate first.")
if self._in_jupyter_notebook:
from IPython.display import display
display(self._terminate_button("Terminate Now"))
return self
self._fixed = scene
if os.path.exists(self.info.path):
shutil.rmtree(self.info.path)
else:
os.makedirs(self.info.path)
if self._fixed is not None:
self._fixed.export_fixed(self.info.path, True)
else:
raise ValueError("Scene and param must be initialized")
self._save_func()
return self
[docs]
def finished(self) -> bool:
"""Check if the session is finished.
Returns:
bool: True if the session is finished, False otherwise.
"""
finished_path = os.path.join(self.output.path, "finished.txt")
error = self.get.log.stderr()
if len(error) > 0:
for line in error:
print(line)
return os.path.exists(finished_path)
[docs]
def start(self, param: Param, force: bool = True, blocking=False) -> "Session":
"""Start the session.
For Jupyter Notebook, the function will return immediately and the solver
ill run in the background. If blocking is set to True, the function will block
until the solver is finished.
When Jupiter Notebook is not detected, the function will block until the solver
is finished.
Args:
param (Param): The simulation parameters.
force (bool, optional): Whether to force start.
blocking (bool, optional): Whether to block the execution.
Returns:
Session: The started session.
"""
self._check_ready()
if is_running():
if force:
terminate()
else:
from IPython.display import display
self.print("Solver is already running. Teriminate first.")
display(self._terminate_button("Terminate Now"))
return self
cmd_path = self.export.shell_command(param)
err_path = os.path.join(self.info.path, "error.log")
log_path = os.path.join(self.info.path, "stdout.log")
command = open(cmd_path, "r").read()
process = subprocess.Popen(
command.split(),
stdout=open(log_path, "w"),
stderr=open(err_path, "w"),
start_new_session=True,
cwd=self._proj_root,
)
if process.poll() is not None:
raise ValueError("Solver failed to start")
else:
init_path = os.path.join(self.info.path, "output", "data", "initialize.out")
time.sleep(1)
while is_running():
if os.path.exists(init_path):
break
time.sleep(1)
if not os.path.exists(init_path):
err_content = open(err_path, "r").readlines()
display_log(err_content)
raise ValueError("Solver failed to start")
if blocking or not self._in_jupyter_notebook:
print(f">>> Log path: {log_path}")
print(">>> Waiting for solver to finish...")
total_frames = param.get("frames")
assert isinstance(total_frames, int)
with tqdm(total=total_frames, desc="Progress") as pbar:
last_frame = 0
while process.poll() is None:
frame = self.get.latest_frame()
if frame > last_frame:
pbar.update(frame - last_frame)
last_frame = frame
time.sleep(1)
if os.path.exists(err_path):
err_lines = open(err_path, "r").readlines()
else:
err_lines = []
if len(err_lines) > 0:
print("*** Solver FAILED ***")
else:
print("*** Solver finished ***")
n_logs = 16
log_lines = open(log_path, "r").readlines()
print(">>> Log:")
for line in log_lines[-n_logs:]:
print(line.rstrip())
if len(err_lines) > 0:
print(">>> Error:")
for line in err_lines:
print(line.rstrip())
print(f">>> Error log path: {err_path}")
return self
def _terminate_button(self, description: str = "Terminate Solver"):
"""Create a terminate button.
Args:
description (str, optional): The button description.
Returns:
Optional[widgets.Button]: The terminate button.
"""
if self._in_jupyter_notebook:
import ipywidgets as widgets
def _terminate(button):
button.disabled = True
button.description = "Terminating..."
terminate()
while is_running():
time.sleep(0.25)
button.description = "Terminated"
button = widgets.Button(description=description)
button.on_click(_terminate)
return button
else:
return None
[docs]
def preview(self, live_update: bool = True) -> Optional["Plot"]:
"""Live view the session.
Args:
live_update (bool, optional): Whether to enable live update.
Returns:
Optional[Plot]: The plot object.
"""
if self._in_jupyter_notebook:
import ipywidgets as widgets
from IPython.display import display
shading = {"wireframe": False}
if self._fixed is None:
raise ValueError("Scene must be initialized")
else:
result = self.get.vertex()
if result is None:
vert, curr_frame = self._fixed._vert, 0
else:
vert, curr_frame = result
plot = self._fixed.preview(
vert, shading=shading, show_pin=False, show_stitch=False
)
table = widgets.HTML()
button = self._terminate_button()
def convert_integer(number) -> str:
if number is None:
return "N/A"
elif number < 1000:
return str(number)
elif number < 1_000_000:
return f"{number/1_000:.2f}k"
elif number < 1_000_000_000:
return f"{number/1_000_000:.2f}M"
else:
return f"{number/1_000_000_000:.2f}B"
def convert_time(time) -> str:
if time is None:
return "N/A"
elif time < 1_000:
return f"{int(time)}ms"
elif time < 60_000:
return f"{time/1_000:.2f}s"
else:
return f"{time/60_000:.2f}m"
if live_update and is_running():
def update_dataframe(table, curr_frame):
time_per_frame = convert_time(self.get.log.number("time-per-frame"))
time_per_step = convert_time(self.get.log.number("time-per-step"))
n_contact = convert_integer(self.get.log.number("num-contact"))
n_newton = convert_integer(self.get.log.number("newton-steps"))
max_sigma = self.get.log.number("max-sigma")
n_pcg = convert_integer(self.get.log.number("pcg-iter"))
data = {
"Frame": [str(curr_frame)],
"Time/Frame": [time_per_frame],
"Time/Step": [time_per_step],
"#Contact": [n_contact],
"#Newton": [n_newton],
"#PCG": [n_pcg],
}
if max_sigma is not None:
stretch = f"{100.0 * (max_sigma - 1.0):.2f}%"
data["Max Stretch"] = [stretch]
df = pd.DataFrame(data)
table.value = df.to_html(
classes="table table-striped", border=0, index=False
)
def live_preview(self):
nonlocal plot
nonlocal button
nonlocal table
nonlocal curr_frame
assert plot is not None
while True:
last_frame = self.get.latest_frame()
if curr_frame != last_frame:
curr_frame = last_frame
result = self.get.vertex(curr_frame)
if result is not None:
vert, _ = result
update_dataframe(table, curr_frame)
plot.update(vert)
if not is_running():
break
time.sleep(self._update_preview_interval)
assert button is not None
button.disabled = True
button.description = "Terminated"
time.sleep(self._update_preview_interval)
last_frame = self.get.latest_frame()
update_dataframe(table, last_frame)
vert, _ = self.get.vertex(last_frame)
plot.update(vert)
def live_table(self):
nonlocal table
while True:
update_dataframe(table, curr_frame)
if not is_running():
break
time.sleep(self._update_table_interval)
threading.Thread(target=live_preview, args=(self,)).start()
threading.Thread(target=live_table, args=(self,)).start()
display(button)
display(table)
return plot
else:
return None
[docs]
def animate(self) -> "Session":
"""Show the animation.
Returns:
Session: The animated session.
"""
if self._in_jupyter_notebook:
import ipywidgets as widgets
shading = {"wireframe": False}
if self._fixed is None:
raise ValueError("Scene must be initialized")
else:
plot = self._fixed.preview(
self._fixed._vert,
shading=shading,
show_pin=False,
show_stitch=False,
)
try:
if self._fixed is not None:
frame_count = self.get.vertex_frame_count()
vert_list = []
for i in tqdm(
range(frame_count), desc="Loading frames", ncols=70
):
result = self.get.vertex(i)
if result is not None:
vert, _ = result
vert_list.append(vert)
def update(frame=1):
nonlocal vert_list
nonlocal plot
assert plot is not None
if self._fixed is not None:
plot.update(vert_list[frame - 1])
widgets.interact(update, frame=(1, frame_count))
except Exception as _:
pass
return self
[docs]
def stream(self, n_lines=40) -> "Session":
"""Stream the session logs.
Args:
n_lines (int, optional): The number of lines to stream. Defaults to 40.
Returns:
Session: The session object.
"""
if self._in_jupyter_notebook:
import ipywidgets as widgets
from IPython.display import display
log_widget = widgets.HTML()
display(log_widget)
button = widgets.Button(description="Stop Live Stream")
terminate_button = self._terminate_button()
display(widgets.HBox((button, terminate_button)))
stop = False
log_path = os.path.join(self.info.path, "output", "cudasim_log.txt")
err_path = os.path.join(self.info.path, "error.log")
if os.path.exists(log_path):
def live_stream(self):
nonlocal stop
nonlocal button
nonlocal log_widget
nonlocal log_path
nonlocal err_path
nonlocal terminate_button
assert terminate_button is not None
while not stop:
result = subprocess.run(
["tail", f"-n{n_lines}", log_path],
capture_output=True,
text=True,
)
log_widget.value = (
CONSOLE_STYLE
+ f"<pre style='no-scroll'>{result.stdout.strip()}</pre>"
)
if not is_running():
log_widget.value += "<p style='color: red;'>Terminated.</p>"
if os.path.exists(err_path):
file = open(err_path, "r")
lines = file.readlines()
if len(lines) > 0:
log_widget.value += "<p style='color: red;'>"
for line in lines:
log_widget.value += line + "\n"
log_widget.value += "</p>"
button.disabled = True
terminate_button.disabled = True
break
time.sleep(self._update_terminal_interval)
thread = threading.Thread(target=live_stream, args=(self,))
thread.start()
def toggle_stream(b):
nonlocal stop
nonlocal thread
if thread.is_alive():
stop = True
thread.join()
b.description = "Start Live Stream"
else:
thread = threading.Thread(target=live_stream, args=(self,))
stop = False
thread.start()
b.description = "Stop Live Stream"
button.on_click(toggle_stream)
else:
log_widget.value = "No log file found."
button.disabled = True
return self
def is_running() -> bool:
"""Check if the solver is running.
Returns:
bool: True if the solver is running, False otherwise.
"""
for proc in psutil.process_iter(["pid", "name", "status"]):
if (
PROCESS_NAME in proc.info["name"]
and proc.info["status"] != psutil.STATUS_ZOMBIE
):
return True
return False
def terminate():
"""Terminate the solver."""
for proc in psutil.process_iter(["pid", "name", "status"]):
if (
PROCESS_NAME in proc.info["name"]
and proc.info["status"] != psutil.STATUS_ZOMBIE
):
pid = proc.info["pid"]
os.kill(pid, signal.SIGTERM)
def display_log(lines: list[str]):
"""Display the log lines.
Args:
lines (list[str]): The log lines.
"""
if Utils.in_jupyter_notebook():
import ipywidgets as widgets
from IPython.display import display
log_widget = widgets.HTML()
text = "\n".join(lines)
log_widget.value = CONSOLE_STYLE + f"<pre style='no-scroll'>{text}</pre>"
display(log_widget)