Source code for frontend._session_

# File: _session_.py
# Code: Claude Code and Codex
# Review: Ryoichi Ando (ryoichi.ando@zozo.com)
# License: Apache v2.0

"""Public entry point for session lifecycle classes.

This module owns ``SessionManager``, ``Session``, ``FixedSession``,
``SessionInfo``, and the ``fixed_session_to_cbor_dict`` helper. The
dynamic-parameter holder is in ``_session_param_``; the
inspection / export / log / get classes plus formatting helpers are
in ``_session_inspect_``.

Everything is re-exported here for backward compatibility (older code,
tests, and Blender integration import names directly from
``frontend._session_``).
"""

import asyncio
import copy
import os
import pickle
import platform
import shutil
import subprocess
import threading
import time

from typing import TYPE_CHECKING, Any, Optional

from tqdm.auto import tqdm

from . import _rust  # type: ignore[attr-defined]

from ._scene_ import FixedScene
from ._session_inspect_ import (
    CONSOLE_STYLE,
    SessionExport,
    SessionGet,
    SessionLog,
    SessionOutput,
    Zippable,
    display_log,
)
from ._session_param_ import ParamManager
from ._utils_ import Utils, get_export_base_path


if TYPE_CHECKING:
    from ._plot_ import Plot


[docs] class SessionManager: """Class to manage simulation sessions. Example: Create a session from a built scene and launch it:: session = app.session.create(scene) session.param.set("frames", 60).set("dt", 0.01) session = session.build() session.start(blocking=True) """ def __init__(self, app_name: str, app_root: str, proj_root: str, data_dirpath: str): """Initialize the SessionManager class. Args: app_name (str): The name of the application. app_root (str): The root directory of the application. proj_root (str): The root directory of the project. data_dirpath (str): The data directory path. """ self._app_name = app_name self._app_root = app_root self._proj_root = proj_root self._data_dirpath = data_dirpath self._sessions = {}
[docs] def list(self): """List all sessions. Returns: dict: The sessions. Example: Print the names of every session currently tracked by the app:: for name in app.session.list(): print(name) """ return self._sessions
[docs] def select(self, name: str = "session"): """Select an existing session by name. Args: name (str): The name of the session. Defaults to ``"session"``. Returns: Session: The selected session. Raises: ValueError: If no session with the given name exists. Example: Re-fetch a previously-created session by name:: app.session.create(scene, name="run-A") session = app.session.select("run-A") """ _rust.scene_validate_session_exists(name in self._sessions, name) return self._sessions[name]
[docs] def create(self, scene: FixedScene, name: str = "") -> "Session": """Create a new session. If ``name`` is empty, an auto-generated name is used: ``"session"`` for the first call, then ``"session-1"``, ``"session-2"``, ... for subsequent calls. Args: scene (FixedScene): The scene object. name (str): The name of the session. Defaults to ``""`` (auto-generated). Returns: Session: The created session. Raises: Exception: If the scene has violations (self-intersections, contact-offset violations, etc.). Example: Create a session from a built fixed scene and configure it:: scene = app.scene.create() scene.add("sheet").at(0, 0.5, 0) scene = scene.build() session = app.session.create(scene) session.param.set("frames", 60).set("dt", 0.01) session = session.build() """ assert isinstance(scene, FixedScene), "Scene must be a FixedScene object" if scene.has_violations: raise Exception( _rust.session_violations_message(scene.get_violation_messages()) ) autogenerated = None if name == "": name, counter = _rust.session_autogenerate_name( list(self._sessions.keys()), "session" ) autogenerated = int(counter) session = Session( self._app_name, self._app_root, self._proj_root, self._data_dirpath, name, autogenerated, ) self._sessions[name] = session return session.init(scene)
def _terminate_or_raise(self, force: bool): """Terminate the solver if it is running, or raise if ``force`` is ``False``. Args: force (bool): If ``True``, terminate a running solver; otherwise raise a ``ValueError``. Raises: ValueError: If the solver is running and ``force`` is ``False``. """ if Utils.busy(): if force: Utils.terminate() else: raise ValueError("Solver is running. Terminate first.")
[docs] def delete(self, name: str, force: bool = True): """Delete a session. Args: name (str): The name of the session. force (bool, optional): Whether to force deletion. Example: Tear down a named session, terminating the solver if it is still running:: app.session.delete("run-A", force=True) """ self._terminate_or_raise(force) if name in self._sessions: self._sessions[name].delete() del self._sessions[name]
[docs] def clear(self, force: bool = True): """Clear all sessions. Args: force (bool, optional): Whether to force clearing. Example: Remove every session and any running solver before starting fresh:: app.session.clear(force=True) """ self._terminate_or_raise(force) for session in self._sessions.values(): session.delete() self._sessions = {}
[docs] class SessionInfo: """Class to store session information. Example: Read the on-disk directory path for a built session:: fixed_session = session.build() print(fixed_session.info.name) print(fixed_session.info.path) """ def __init__(self, name: str): """Initialize the SessionInfo class. The session directory path is initialized empty and should be set via :meth:`set_path`. Args: name (str): The name of the session. """ self._name = name self._path = ""
[docs] def set_path(self, path: str) -> "SessionInfo": """Set the path to the session directory. Args: path (str): The path to the session directory. Returns: SessionInfo: This instance, for chaining. Example: Normally this is called by :class:`SessionManager` during session construction. A direct call may be useful when relocating an existing session on disk:: info = SessionInfo("my_run") info.set_path("/data/sessions/my_run") print(info.path) """ self._path = path return self
@property def name(self) -> str: """Get the name of the session. Example: Inspect the session name before starting a run:: session = app.session.create(fixed_scene).build() print(session.info.name) """ return self._name @property def path(self) -> str: """Get the path to the session directory. Example: Read the on-disk session directory after building:: session = app.session.create(fixed_scene).build() print(session.info.path) """ return self._path
[docs] class FixedSession: """Class to manage a fixed simulation session. Returned by :meth:`Session.build`. Use it to launch the solver, monitor it, pull results, and export. Example: Build a session from a configured :class:`Session` and run it to completion, then export:: fixed_session = session.build() fixed_session.start(blocking=True) fixed_session.export.animation().zip() """ def __init__(self, session: "Session"): """Initialize the FixedSession from a parent Session. Deletes any prior on-disk session directory, exports the fixed scene, and writes the solver launcher script. Args: session (Session): The parent session object. Raises: ValueError: If the parent session has no fixed scene. """ self._session = session self._process: Optional[subprocess.Popen] = None self._update_preview_interval = 0.1 self._update_terminal_interval = 0.1 self._update_table_interval = 0.1 self._info = SessionInfo(session.name).set_path( os.path.join(session.app_root, session.name) ) self._export = SessionExport(self) self._get = SessionGet(self) self._output = SessionOutput(self) self._param = session.param.copy() self._default_opts: dict[str, Any] = { "flat_shading": False, "wireframe": False, "pin": False, "stitch": False, } if self.fixed_scene is not None: self.delete() self.fixed_scene.export_fixed(self.info.path, True) else: raise ValueError("Scene and param must be initialized") self._cmd_path = self.export.shell_command(self._param) @property def info(self) -> SessionInfo: """Get the session information. Example: Read the session name and directory path:: session = app.session.create(fixed_scene).build() print(session.info.name, session.info.path) """ return self._info @property def export(self) -> SessionExport: """Get the session export object. Example: Export the session shell command for reproducible replays:: session = app.session.create(fixed_scene).build() cmd_path = session.export.shell_command(session.session.param) """ return self._export @property def get(self) -> SessionGet: """Get the session get object. Example: Retrieve solver-emitted log channel names:: session = session.build().start(blocking=True) channels = session.get.log.names() """ return self._get @property def output(self) -> SessionOutput: """Get the session output object. Example: Locate the solver output directory:: session = app.session.create(fixed_scene).build() print(session.output.path) """ return self._output @property def session(self) -> "Session": """Get the session object. Example: Reach back to the parent :class:`Session` for its parameters:: fixed = app.session.create(fixed_scene).build() params = fixed.session.param """ return self._session
[docs] def print(self, message): """Print a message. Args: message (str): The message to print. Example: Emit a status line that renders nicely in Jupyter or stdout:: session.print("Launching solver...") """ if Utils.in_jupyter_notebook(): from IPython.display import display display(message) else: print(message)
[docs] def is_running(self) -> bool: """Check if the solver process is running. This method first checks the stored process handle (most reliable), then falls back to Utils.busy() for broader detection. Returns: bool: True if the solver is running, False otherwise. Example: Poll the solver state after launching non-blocking:: session.start() while session.is_running(): time.sleep(1) """ # First check the stored process handle (most reliable) if self._process is not None: try: if self._process.poll() is None: return True # Process has exited, clear the reference self._process = None except OSError: # Process handle became invalid self._process = None # Fall back to Utils.busy() for cases where process was started externally return Utils.busy()
[docs] def run(self, blocking: Optional[bool] = None) -> "FixedSession": """Idempotent launcher: ensure the solver is running. - If the solver is already running (this process or another host process detected via ``Utils.busy()``), return without relaunching. - Otherwise, start a fresh simulation from frame 0. To pick up a previously-saved state, use :meth:`resume` explicitly, since ``run()`` never auto-resumes. Args: blocking (Optional[bool]): If ``True``, block until the solver finishes. If ``None``, defaults to blocking outside Jupyter and non-blocking inside Jupyter. Returns: FixedSession: This session. Example: Ensure the solver is running without restarting it if it already is:: session.run(blocking=True) """ if self.is_running(): return self # force=True here bypasses start()'s auto-resume branch so we # always begin from frame 0. It won't terminate anything because # is_running() was just False. return self.start(force=True, blocking=blocking)
def _analyze_solver_error(self, log_lines, err_lines): """Analyze log and error files for specific failure patterns. Args: log_lines (list): Lines from stdout log file err_lines (list): Lines from stderr log file Returns: str or None: Single most critical error message, or None if no specific error found """ return _rust.analyze_solver_error(list(log_lines), list(err_lines))
[docs] def delete(self): """Delete the session. Example: Remove the on-disk session directory before a clean rerun:: session.delete() """ _rust.delete_session_dir(self.info.path)
def _check_ready(self): """Check if the session is ready.""" if self.fixed_scene is None: raise ValueError("Scene must be initialized")
[docs] def finished(self) -> bool: """Check if the session has finished. Any stderr lines present are printed as a side effect. Returns: bool: ``True`` if a ``finished.txt`` marker exists in the output directory, ``False`` otherwise. Example: Assert the run completed cleanly in CI:: if app.ci: assert session.finished() """ error = self.get.log.stderr() if len(error) > 0: for line in error: print(line) return _rust.marker_exists(self.output.path, "finished.txt")
[docs] def initialize_finished(self) -> bool: """Check if the session initialization has finished. Any stderr lines present are printed as a side effect. Returns: bool: ``True`` if an ``initialize_finish.txt`` marker exists in the output directory, ``False`` otherwise. Example: Wait for solver initialization to complete before continuing:: while not session.initialize_finished(): time.sleep(1) """ error = self.get.log.stderr() if len(error) > 0: for line in error: print(line) return _rust.marker_exists(self.output.path, "initialize_finish.txt")
[docs] def resume( self, frame: int = -1, force: bool = True, blocking: Optional[bool] = None, ) -> "FixedSession": """Resume the solver from a saved state. Args: frame (int): The saved frame to resume from. If ``-1``, resumes from the most recent saved frame. Defaults to ``-1``. force (bool): Forwarded to :meth:`start`. Defaults to ``True``. blocking (Optional[bool]): Forwarded to :meth:`start`. Returns: FixedSession: This session. Example: Pick up where a previous run left off:: session.resume() # latest saved frame session.resume(frame=120) # specific saved frame """ if self._param is None: print("Session is not yet started") return self target = _rust.select_resume_frame(self.get.saved(), int(frame)) if target is None: if frame == -1: return self print(f"No saved state found: frame: {frame}") return self return self.start(force, blocking, int(target))
[docs] def start( self, force: bool = False, blocking: Optional[bool] = None, load: int = 0, ) -> "FixedSession": """Start the session. Inside a Jupyter notebook the function returns immediately by default and the solver runs in the background; outside Jupyter it blocks until the solver finishes. Pass ``blocking`` explicitly to override this behavior. If saved states exist and ``force`` is ``False``, this delegates to :meth:`resume` from the latest saved frame. Args: force (bool, optional): If ``True``, terminate any running solver and skip the auto-resume branch. Defaults to ``False``. blocking (Optional[bool], optional): Whether to block until the solver finishes. Defaults to ``None`` (auto-detect based on Jupyter). load (int, optional): The frame number to load from saved states. Defaults to ``0`` (start fresh). Returns: FixedSession: The started session. Example: Launch the solver and return immediately for notebook-style monitoring:: session.start().preview() session.stream() Or block until finished when running as a script:: session.start(blocking=True) """ Utils.check_gpu() # Driver version check + error message templating live in Rust; # detection (`nvidia-smi --query-gpu=driver_version`) stays in # Python because it shells out. err = _rust.validate_driver_version(Utils.get_driver_version(), 520) if err is not None: raise ValueError(err) nvidia_smi_dir = os.path.join(self.info.path, "nvidia-smi") os.makedirs(nvidia_smi_dir, exist_ok=True) nvidia_smi_path = os.path.join(nvidia_smi_dir, "nvidia-smi.txt") try: result = subprocess.run( ["nvidia-smi"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: with open(nvidia_smi_path, "w") as f: f.write(result.stdout) except (subprocess.TimeoutExpired, FileNotFoundError) as e: print(f"Warning: Could not export nvidia-smi output: {e}") nvidia_smi_q_path = os.path.join(nvidia_smi_dir, "nvidia-smi-q.txt") try: result = subprocess.run( ["nvidia-smi", "-q"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: with open(nvidia_smi_q_path, "w") as f: f.write(result.stdout) except (subprocess.TimeoutExpired, FileNotFoundError) as e: print(f"Warning: Could not export nvidia-smi -q output: {e}") if os.path.exists(self.save_and_quit_file_path()): os.remove(self.save_and_quit_file_path()) self._check_ready() if self.is_running(): if force: Utils.terminate() self._process = None else: from IPython.display import display self.print("Solver is already running. Terminate first.") display(self._terminate_button("Terminate Now")) return self frame = self.get.saved() if frame and not force: from IPython.display import display self.print(f"Solver has saved states. Resuming from {max(frame)}") return self.resume(max(frame), True, blocking) if self._cmd_path: if load == 0: export_path = _rust.export_base_path_for( get_export_base_path(), self._session.app_name, self.info.name, ) if os.path.exists(export_path): shutil.rmtree(export_path) log_path, err_path = _rust.stdout_error_log_paths(self.info.path) which = "windows" if platform.system() == "Windows" else "unix" command = _rust.solver_subprocess_command(self._cmd_path, load, which) with open(log_path, "w") as stdout_file, open(err_path, "w") as stderr_file: if platform.system() == "Windows": # Windows self._process = subprocess.Popen( command, shell=True, stdout=stdout_file, stderr=stderr_file, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, # pyright: ignore[reportAttributeAccessIssue] cwd=self._session.proj_root, ) else: self._process = subprocess.Popen( command, shell=True, stdout=stdout_file, stderr=stderr_file, start_new_session=True, cwd=self._session.proj_root, ) process = self._process if blocking is None: blocking = not Utils.in_jupyter_notebook() if not blocking: # Wait briefly for process to initialize and verify it's stable time.sleep(0.2) # Give process time to start if not self.is_running(): # Process exited within 0.2s. Could be a startup crash # (rc != 0) or a legitimately-short run that finished # before the poll (rc == 0). The latter happens in # practice when resuming from a saved frame near the # end of the simulation: load state, run remaining # frame(s), exit cleanly. Without this distinction # the solver wrapper would misclassify a successful # short run as ``"Solver failed to start"``. rc = process.poll() if process is not None else None if rc is None or rc == 0: # Clean exit (or process not yet observable but # is_running() reported false; give it the # benefit of the doubt rather than raise on # stale state). return self err_lines = _rust.read_lines_with_newlines(err_path) log_lines = _rust.read_lines_with_newlines(log_path) error_message = self._analyze_solver_error(log_lines, err_lines) if error_message: raise ValueError(error_message) elif err_lines: raise ValueError(_rust.solver_failed_short_message(err_lines)) else: raise ValueError(_rust.solver_failed_to_start_message(rc)) if blocking: while not os.path.exists(log_path) and not os.path.exists(err_path): time.sleep(1) if process.poll() is not None: log_lines = _rust.read_lines_with_newlines(log_path) err_lines = _rust.read_lines_with_newlines(err_path) display_log(log_lines) display_log(err_lines) error_message = self._analyze_solver_error(log_lines, err_lines) if error_message: raise ValueError(error_message) else: raise ValueError("Solver failed to start") else: time.sleep(1) while self.is_running(): if self.initialize_finished(): break time.sleep(1) if not self.initialize_finished(): log_lines = _rust.read_lines_with_newlines(log_path) err_lines = _rust.read_lines_with_newlines(err_path) display_log(log_lines) display_log(err_lines) error_message = self._analyze_solver_error(log_lines, err_lines) if error_message: raise ValueError(error_message) else: raise ValueError( "Solver initialization failed - check log files for details" ) print(f">>> Log path: {log_path}") print(">>> Waiting for solver to finish...") total_frames = self._param.get("frames") assert isinstance(total_frames, int) with tqdm(total=total_frames, desc="progress") as pbar: last_frame = 0 while process.poll() is None: frame = self.get.latest_frame() if frame > last_frame: pbar.update(frame - last_frame) last_frame = frame time.sleep(1) err_lines = _rust.read_lines_with_newlines(err_path) if len(err_lines) > 0: print("*** Solver FAILED ***") else: print("*** Solver finished ***") n_logs = 32 log_lines = _rust.read_lines_with_newlines(log_path) print(">>> Log:") for line in log_lines[-n_logs:]: print(line.rstrip()) if len(err_lines) > 0: print(">>> Error:") for line in err_lines: print(line.rstrip()) print(f">>> Error log path: {err_path}") fixed_scene = self.fixed_scene vals = ( [ x for x in fixed_scene.tri_param.get("strain-limit", []) if isinstance(x, float) ] if fixed_scene is not None else [] ) self._default_opts["max-area"] = 1.0 + _rust.max_strain_limit_default_zero(vals) else: raise ValueError("Command path is not set. Call build() first.") return self
def _terminate_button(self, description: str = "Terminate Solver"): """Create a terminate button. Args: description (str, optional): The button description. Returns: Optional[widgets.Button]: The terminate button. """ if Utils.in_jupyter_notebook(): import ipywidgets as widgets async def _terminate_async(button): button.disabled = True button.description = "Terminating..." Utils.terminate() self._process = None while self.is_running(): await asyncio.sleep(0.25) button.description = "Terminated" def _terminate(button): asyncio.ensure_future(_terminate_async(button)) button = widgets.Button(description=description) button.on_click(_terminate) return button else: return None
[docs] def save_and_quit_file_path(self) -> str: """Get the flag-file path that signals the solver to save and quit. If this file exists, the solver will save the session and quit. After the session is saved, the file is removed. Example: Check for the save-and-quit sentinel file:: path = session.save_and_quit_file_path() print(os.path.exists(path)) """ return _rust.save_and_quit_file_path(self.info.path)
[docs] def save_and_quit(self): """Save the session and quit the solver. Example: Ask a running solver to checkpoint and exit gracefully:: session.save_and_quit() while session.is_running(): time.sleep(1) """ _rust.touch_save_and_quit(self.info.path)
def _save_and_quit_button(self, description: str = "Save and Quit"): """Create a save-and-quit button. Args: description (str, optional): The button description. Returns: Optional[widgets.Button]: The save-and-quit button. """ if Utils.in_jupyter_notebook(): import ipywidgets as widgets async def _save_and_quit_async(button): button.disabled = True button.description = "Requesting..." self.save_and_quit() while self.is_running(): await asyncio.sleep(0.25) self._process = None button.description = "Done" def _save_and_quit(button): asyncio.ensure_future(_save_and_quit_async(button)) button = widgets.Button(description=description) button.on_click(_save_and_quit) return button else: return None
[docs] def update_options(self, options: dict) -> dict: """Return a copy of ``options`` with missing defaults filled in. Args: options (dict): User-supplied render options. Returns: dict: A new dictionary combining ``options`` with the session's default option values. Example: Add the session's default render flags to a user-provided options dict:: opts = session.update_options({"flat_shading": True}) """ options = dict(options) for key, value in self._default_opts.items(): if key not in options: options[key] = value return options
[docs] def preview( self, options: Optional[dict] = None, live_update: bool = True, engine: str = "threejs", ) -> Optional["Plot"]: """Live-view the session inside a Jupyter notebook. Outside Jupyter this is a no-op and returns ``None``. Args: options (dict, optional): The render options. live_update (bool, optional): Whether to enable live updates. Defaults to ``True``. engine (str, optional): The rendering engine. Defaults to ``"threejs"``. Returns: Optional[Plot]: The plot object, or ``None`` when not running inside a Jupyter notebook. Example: Kick off a run and watch it in-notebook while tailing stdout:: session.start().preview() # live frame playback session.stream() # tail solver stdout """ if options is None: options = {} options = self.update_options(options) if Utils.in_jupyter_notebook(): import ipywidgets as widgets from IPython.display import display fixed_scene = self.fixed_scene if fixed_scene is None: raise ValueError("Scene must be initialized") else: result = self.get.vertex() if result is None: vert, curr_frame = fixed_scene.vertex(True), 0 else: vert, curr_frame = result plot = fixed_scene.preview( vert, options, show_slider=False, engine=engine ) table = widgets.HTML() terminate_button = self._terminate_button() save_and_quit_button = self._save_and_quit_button() if live_update and self.is_running(): def update_dataframe(table, curr_frame): summary = self.get.log.summary() max_stretch = summary.get("stretch") data = { "Frame": [curr_frame], "Time/Frame": [summary.get("time-per-frame")], "Time/Step": [summary.get("time-per-step")], "#Contact": [summary.get("num-contact")], "#Newton": [summary.get("newton-steps")], "#PCG": [summary.get("pcg-iter")], } if max_stretch is not None: data["Max Stretch"] = [max_stretch] from ._utils_ import dict_to_html_table table.value = dict_to_html_table( data, classes="table table-striped" ) async def live_preview_async(): """Async coroutine for live preview updates. Using async instead of threading allows the event loop to process button events between updates, preventing UI unresponsiveness. """ nonlocal plot nonlocal terminate_button nonlocal save_and_quit_button nonlocal table nonlocal options nonlocal curr_frame try: assert plot is not None assert self.fixed_scene is not None while True: last_frame = self.get.latest_frame() if curr_frame != last_frame: curr_frame = last_frame result = self.get.vertex(curr_frame) if result is not None: vert, _ = result color = self.fixed_scene.color(vert, options) update_dataframe(table, curr_frame) plot.update(vert, color) if not self.is_running(): break await asyncio.sleep(self._update_preview_interval) assert terminate_button is not None assert save_and_quit_button is not None terminate_button.disabled = True terminate_button.description = "Terminated" save_and_quit_button.disabled = True await asyncio.sleep(self._update_preview_interval) last_frame = self.get.latest_frame() update_dataframe(table, last_frame) vertex_data = self.get.vertex(last_frame) if vertex_data is not None: vert, _ = vertex_data color = self.fixed_scene.color(vert, options) plot.update(vert, color) except Exception as e: print(f"live_preview error: {e}") async def live_table_async(): """Async coroutine for table updates.""" nonlocal table try: while True: update_dataframe(table, curr_frame) if not self.is_running(): break await asyncio.sleep(self._update_table_interval) except Exception as e: print(f"live_table error: {e}") # Use async coroutines instead of threads to allow event loop # to process button events between updates asyncio.ensure_future(live_preview_async()) asyncio.ensure_future(live_table_async()) display(widgets.HBox((terminate_button, save_and_quit_button))) display(table) return plot else: return None
[docs] def animate( self, options: Optional[dict] = None, engine: str = "threejs" ) -> "FixedSession": """Show the animation inside a Jupyter notebook. Outside Jupyter this is a no-op. Loads all available frames from disk and exposes a slider plus a reload button to pull in frames produced after the initial load. Args: options (dict, optional): The render options. engine (str, optional): The rendering engine. Defaults to ``"threejs"``. Returns: FixedSession: This session. Example: Replay frames once the run has finished (or has enough frames on disk):: session.animate() """ if options is None: options = {} options = self.update_options(options) if Utils.in_jupyter_notebook(): import ipywidgets as widgets from IPython.display import display fixed_scene = self.fixed_scene if fixed_scene is None: raise ValueError("Scene must be initialized") else: plot = fixed_scene.preview( fixed_scene.vertex(True), options, show_slider=False, engine=engine, ) try: if fixed_scene is not None: # Wait for at least one frame to be ready frame_count = self.get.latest_frame() if frame_count == 0: print( "Waiting for simulation to generate at least one frame..." ) while self.get.latest_frame() == 0: if not self.is_running(): print( "Simulation finished but no frames were generated." ) return self time.sleep(0.5) frame_count = self.get.latest_frame() print(f"Found {frame_count} frame(s). Loading animation...") vert_list = [] for i in tqdm(range(frame_count), desc="loading frames"): result = self.get.vertex(i) if result is not None: vert, _ = result vert_list.append(vert) # Create status label and reload button status_label = widgets.Label( value=f"Loaded {len(vert_list)} frames" ) reload_button = widgets.Button(description="Reload") display(widgets.HBox([reload_button, status_label])) def update(frame=1): nonlocal vert_list nonlocal plot assert plot is not None if fixed_scene is not None and frame - 1 < len(vert_list): vert = vert_list[frame - 1] color = fixed_scene.color(vert, options) # Always recompute normals for correct lighting plot.update(vert, color, recompute_normals=True) # Create the interactive slider slider = widgets.IntSlider( min=1, max=frame_count, step=1, value=1, description="frame" ) output = widgets.interactive_output(update, {"frame": slider}) def _reload(button): nonlocal vert_list nonlocal slider nonlocal status_label button.disabled = True button.description = "Reloading..." try: # Reload frames from disk new_frame_count = self.get.latest_frame() if new_frame_count > len(vert_list): for i in range(len(vert_list), new_frame_count): result = self.get.vertex(i) if result is not None: vert, _ = result vert_list.append(vert) # Update the slider range slider.max = new_frame_count # Update status label status_label.value = ( f"Loaded {len(vert_list)} frames" ) button.description = "Reload" except Exception: button.description = "Reload" finally: button.disabled = False reload_button.on_click(_reload) # Display slider and output display(slider, output) except Exception as _: pass return self
[docs] def stream(self, n_lines=40) -> "FixedSession": """Stream the tail of the session stdout log inside a Jupyter notebook. Outside Jupyter this is a no-op. Args: n_lines (int, optional): The number of trailing lines to display. Defaults to ``40``. Returns: FixedSession: This session. Example: Kick off a run and watch both the live preview and stdout tail in a notebook cell:: session.start().preview() session.stream() """ if Utils.in_jupyter_notebook(): import ipywidgets as widgets from IPython.display import display log_widget = widgets.HTML() display(log_widget) button = widgets.Button(description="Stop Live Stream") terminate_button = self._terminate_button() save_and_quit_button = self._save_and_quit_button() display(widgets.HBox((button, terminate_button, save_and_quit_button))) assert button is not None assert terminate_button is not None assert save_and_quit_button is not None stop = False log_path, err_path = _rust.stdout_error_log_paths(self.info.path) if os.path.exists(log_path): def live_stream(self): nonlocal stop nonlocal button nonlocal log_widget nonlocal log_path nonlocal err_path nonlocal terminate_button nonlocal save_and_quit_button assert button is not None assert terminate_button is not None assert save_and_quit_button is not None while not stop: # Read last n_lines from log file (cross-platform) # via Rust; the helper handles the missing-file # case and the lines[-n:] + strip() squash. tail_output = _rust.read_log_tail_joined(log_path, n_lines) log_widget.value = ( CONSOLE_STYLE + f"<pre style='no-scroll'>{tail_output}</pre>" ) if not self.is_running(): log_widget.value += "<p style='color: red;'>Terminated.</p>" lines = _rust.read_lines_with_newlines(err_path) if len(lines) > 0: log_widget.value += "<p style='color: red;'>" for line in lines: log_widget.value += line + "\n" log_widget.value += "</p>" button.disabled = True terminate_button.disabled = True save_and_quit_button.disabled = True break time.sleep(self._update_terminal_interval) thread = threading.Thread(target=live_stream, args=(self,)) thread.start() async def toggle_stream_async(b): nonlocal stop nonlocal thread if thread.is_alive(): stop = True b.disabled = True b.description = "Stopping..." while thread.is_alive(): await asyncio.sleep(0.1) b.disabled = False b.description = "Start Live Stream" else: thread = threading.Thread(target=live_stream, args=(self,)) stop = False thread.start() b.description = "Stop Live Stream" def toggle_stream(b): asyncio.ensure_future(toggle_stream_async(b)) button.on_click(toggle_stream) else: log_widget.value = "No log file found." terminate_button.disabled = True save_and_quit_button.disabled = True button.disabled = True return self
@property def fixed_scene(self) -> Optional[FixedScene]: """Get the fixed scene. Example: Retrieve the bound scene from a fixed session:: fixed = app.session.create(scene).build() scene_ref = fixed.fixed_scene """ return self._session.fixed_scene
[docs] class Session: """Class to setup a simulation session. Instances are created via :meth:`SessionManager.create`, configured via the :attr:`param` manager, then finalized with :meth:`build` to produce a :class:`FixedSession`. Example: Configure a session and build it into a runnable fixed session:: session = app.session.create(scene) session.param.set("frames", 120).set("dt", 0.01) fixed_session = session.build() fixed_session.start(blocking=True) """ def __init__( self, app_name: str, app_root: str, proj_root: str, data_dirpath: str, name: str, autogenerated: Optional[int] = None, ): """Initialize the Session class. Args: app_name (str): The name of the application. app_root (str): The root directory of the application. proj_root (str): The root directory of the project. data_dirpath (str): The data directory path. name (str): The name of the session. autogenerated (Optional[int]): Counter value if autogenerated, None otherwise. """ self._app_name = app_name self._name = name self._app_root = app_root self._proj_root = proj_root self._data_dirpath = data_dirpath self._autogenerated = autogenerated self._fixed_scene = None self._fixed_session = None self._param = ParamManager() @property def param(self) -> ParamManager: """Get the session parameter manager. Example: Configure solver parameters before building the session:: session = app.session.create(scene) session.param.set("frames", 120).set("dt", 0.01) """ return self._param @property def fixed_scene(self) -> Optional[FixedScene]: """Get the fixed scene. Returns: Optional[FixedScene]: The fixed scene object. Example: Inspect the bound scene before finalizing:: session = app.session.create(scene) print(session.fixed_scene) """ return self._fixed_scene @property def fixed_session(self) -> Optional[FixedSession]: """Get the fixed session. Returns: Optional[FixedSession]: The fixed session object, or ``None`` if :meth:`build` has not been called yet. Example: Access the built runnable session after calling :meth:`build`:: session = app.session.create(scene) session.build() fixed = session.fixed_session """ return self._fixed_session @property def proj_root(self) -> str: """Get the project root directory. Example: Print the project root for the current session:: session = app.session.create(scene) print(session.proj_root) """ return self._proj_root @property def app_name(self) -> str: """Get the application name. Example: Read the owning application name:: session = app.session.create(scene) print(session.app_name) """ return self._app_name @property def name(self) -> str: """Get the session name. Example: Retrieve the session name for logging or display:: session = app.session.create(scene) print(session.name) """ return self._name @property def app_root(self) -> str: """Get the application root directory. Example: Locate the application root on disk:: session = app.session.create(scene) print(session.app_root) """ return self._app_root def _check_ready(self): """Check if the session is ready.""" if self._fixed_scene is None: raise ValueError("Scene must be initialized")
[docs] def init(self, scene: FixedScene) -> "Session": """Attach a fixed scene to this session. Args: scene (FixedScene): The fixed scene. Returns: Session: This session, for chaining. Example: :meth:`SessionManager.create` calls this internally, but it can also be used to re-bind a scene to an existing session:: session.init(scene).param.set("frames", 60) """ self._fixed_scene = scene return self
[docs] def build(self) -> FixedSession: """Build and persist a :class:`FixedSession` from this session. Pickles the built session into its directory and creates a symlink (or a ``.txt`` fallback on Windows without symlink privileges) under the data dir for convenient access. Returns: FixedSession: The newly built fixed session. Example: Finalize a configured session and start it:: session.param.set("frames", 60).set("dt", 0.01) fixed_session = session.build() fixed_session.start(blocking=True) """ self._fixed_session = FixedSession(self) symlink_name = _rust.session_build_symlink_name( self._app_name, self._name, self._autogenerated, ) self._save_fixed_session(self._fixed_session, symlink_name) return self._fixed_session
def _save_fixed_session( self, fixed_session: FixedSession, name: Optional[str] = None ): """Save the fixed session to a recoverable file and create a symlink.""" from . import _cbor_bridge_ as _cbor _, session_path = _rust.session_fixed_dir_layout( self._app_root, fixed_session.info.name ) # Native CBOR envelope: structured fields the user can inspect # with any CBOR reader, plus a ``pickle_blob`` for the deep # ``Session`` / ``ParamHolder`` / ``FixedScene`` graph that # has no schema-level CBOR representation today. pickled = pickle.dumps(fixed_session) payload = fixed_session_to_cbor_dict(fixed_session, pickled) with open(session_path, "wb") as f: f.write(_cbor.dumps_envelope(_cbor.KIND_FIXED_SESSION, payload)) if name: symlink_path = _rust.session_symlink_target_path(self._data_dirpath, name) os.makedirs(os.path.dirname(symlink_path), exist_ok=True) if os.path.islink(symlink_path): os.unlink(symlink_path) elif os.path.exists(symlink_path): os.remove(symlink_path) try: os.symlink(fixed_session.info.path, symlink_path) except OSError: # On Windows, symlinks may require elevated privileges # Fall back to writing a text file with the path with open(symlink_path + ".txt", "w") as f: f.write(fixed_session.info.path)
def fixed_session_to_cbor_dict( fixed_session: "FixedSession", pickle_blob: bytes ) -> dict: """Build the native CBOR map payload for a recoverable fixed session. Inspectable metadata (name, on-disk paths, basic param state) sits at the top level so a generic CBOR reader can identify a saved session without unpickling. ``pickle_blob`` carries the full ``Session`` / ``ParamHolder`` / ``FixedScene`` graph for rehydration in :meth:`App.recover`. """ params: dict[str, Any] = {} try: for key, val in fixed_session.session.param.items(): # Only keep CBOR-friendly scalar metadata; complex values # (arrays, schedule tuples) survive via ``pickle_blob``. if isinstance(val, (bool, int, float, str, type(None))): params[str(key)] = val except Exception: # Param introspection is metadata-only; failing here would # block the whole save. The pickle_blob still carries the # authoritative state. params = {} info = fixed_session.info output_path = "" try: output_path = fixed_session.output.path except Exception: output_path = "" return { "info_name": info.name, "info_path": info.path, "output_path": output_path, "cmd_path": getattr(fixed_session, "_cmd_path", "") or "", "params": params, "pickle_blob": pickle_blob, }