Source code for frontend._decoder_

# File: _decoder_.py
# Code: Claude Code and Codex
# Review: Ryoichi Ando (ryoichi.ando@zozo.com)
# License: Apache v2.0

import os
import pickle
from dataclasses import dataclass
from typing import Any, Optional

from . import _rust  # type: ignore[attr-defined]

from ._asset_ import AssetManager
from ._mesh_ import MeshManager
from ._plot_ import PlotManager
from ._scene_ import FixedScene, Scene
from ._session_ import FixedSession, Session
from ._utils_ import Utils


@dataclass
class ObjectInfo:
    """Per-object metadata recorded by :class:`SceneDecoder.populate_objects`.

    Field shape varies by ``type``:

    * ``SOLID``  populates ``vert``, ``V``, ``F``, and optionally ``orig_to_sim``.
    * ``SHELL``  populates ``vert``, ``V`` (== ``vert``), ``F``.
    * ``ROD``    populates ``vert`` only; ``V`` and ``F`` stay ``None``.

    The Rust ``cross_stitch_apply_batch`` consumer reads via dict access,
    so :meth:`to_dict` is used at the FFI boundary.
    """

    type: str
    vert: Any
    V: Optional[Any] = None
    F: Optional[Any] = None
    orig_to_sim: Optional[Any] = None

    def to_dict(self) -> dict:
        d: dict = {"type": self.type, "vert": self.vert}
        if self.V is not None:
            d["V"] = self.V
        if self.F is not None:
            d["F"] = self.F
        if self.orig_to_sim is not None:
            d["orig_to_sim"] = self.orig_to_sim
        return d


[docs] class BlenderApp: """Attach to a project exported by the Blender addon and build a runnable session. The addon writes ``data.pickle`` and ``param.pickle`` into a per-project directory under ``~/.local/share/ppf-cts/git-<branch>/<name>/``. :meth:`open` is the canonical entry point: it decodes those pickles, populates the scene, applies parameters, and builds a ``FixedSession`` accessible via :attr:`scene` and :attr:`session`. Example: Canonical notebook cell (as generated by the Blender addon), attach to a transferred project and run it:: from frontend import BlenderApp app = BlenderApp.open("rod-example-3") app.scene.report() app.scene.preview() app.session.run() app.session.preview() """ def __init__(self, name: str, verbose: bool = False, progress_callback=None): """Initialize the BlenderApp. Args: name (str): The name of the Blender project. verbose (bool): Enable verbose logging. progress_callback: Optional callable ``fn(progress: float, info: str)`` invoked during long-running operations. ``progress`` is in ``[0.0, 1.0]``. """ self._verbose = verbose self._name = name self._progress_callback = progress_callback # Path math + branch resolution lives in Rust # (`dec::blender_app_paths`), so the Python and Windows-native # branches stay aligned with `App.get_data_dirpath` and the # server's `make_root`. paths = _rust.blender_app_paths(os.path.abspath(__file__), name) self._data_dirpath = paths["data_dirpath"] self._root = paths["root"] cache_root = paths["cache_root"] os.makedirs(cache_root, exist_ok=True) self._asset_manager = AssetManager() self._mesh_manager = MeshManager(cache_root) self._scene = None self._session = None self._fixed_scene = None self._fixed_session = None def __getstate__(self): state = self.__dict__.copy() state["_progress_callback"] = None return state def _report_progress(self, progress: float, info: str): if self._progress_callback is not None: self._progress_callback(progress, info)
[docs] @classmethod def open( cls, name: str, verbose: bool = False, progress_callback=None, ) -> "BlenderApp": """Open a Blender project and build its scene and session. Behavior: 1. If the Blender addon has uploaded both ``data.pickle`` and ``param.pickle`` under the project root, ``populate().make()`` is invoked to build a fresh scene and session in this process. 2. Otherwise, ``FileNotFoundError`` is raised. The user must click "Transfer" in the Blender addon first. After ``open()``, ``app.scene`` and ``app.session`` are the built ``FixedScene`` / ``FixedSession`` objects, ready for report, preview, run, and stream. Example: Notebook cell generated by the Blender addon, attaching to the transferred project and running it:: from frontend import BlenderApp app = BlenderApp.open("<project>") app.scene.preview() app.session.run() app.session.preview() """ app = cls(name, verbose=verbose, progress_callback=progress_callback) data_path = os.path.join(app._root, "data.pickle") param_path = os.path.join(app._root, "param.pickle") if not (os.path.exists(data_path) and os.path.exists(param_path)): raise FileNotFoundError( f"Blender has not uploaded project '{name}' to {app._root}. " f"In the Blender addon, click 'Transfer' first." ) # Invariant: whenever data.pickle and param.pickle both exist, # upload_id.txt must also exist. It's stamped by the Blender # addon's upload handler at write time and is the identity that # client build-tracking pins against. If it's missing, the # project predates upload_id tracking (or someone wrote pickles # out-of-band); refuse to build and tell the user how to migrate. upload_id_path = os.path.join(app._root, "upload_id.txt") if not os.path.exists(upload_id_path): raise FileNotFoundError( f"Project '{name}' at {app._root} has data.pickle and " f"param.pickle but no upload_id.txt. This project predates " f"upload_id tracking. Re-upload from the Blender addon, " f"or run `python tools/backfill_upload_ids.py` to stamp a " f"fresh id on every legacy project." ) app.populate().make() return app
[docs] def populate(self) -> "BlenderApp": """Populate the scene with objects decoded from ``data.pickle``. Also peeks at ``param.pickle`` for per-group fTetWild overrides so that tetrahedralization can use them at build time; full parameter application is deferred to :meth:`make`. Example: Two-stage construction (useful when you want to inspect the mutable scene before parameters are applied):: from frontend import BlenderApp app = BlenderApp("my-project") app.populate() app.make() app.session.run() """ data_path = os.path.join(self._root, "data.pickle") assert os.path.exists(data_path) self._report_progress(0.05, "Loading scene data...") self._asset_manager = AssetManager() self._scene = Scene("scene", PlotManager(), self._asset_manager) self._scene_decoder = SceneDecoder( data_path, self._asset_manager, self._mesh_manager, ) # Peek at param.pickle for per-group fTetWild overrides so # populate_objects can pass them into tetrahedralize() at build # time. The full param.pickle is applied later in make(). Fan the # group-level kwargs dict out to every UUID in the group via # the Rust helper so this loop stays aligned with the # ParamDecoder traversal. ftetwild_by_uuid: dict = {} param_path = os.path.join(self._root, "param.pickle") if os.path.exists(param_path): # See ``frontend/_cbor_bridge_.py`` for the byte-sniff contract. from . import _cbor_bridge_ as _cbor with open(param_path, "rb") as _pf: _blob = _pf.read() if _cbor.is_cbor(_blob): _pdata = _cbor.loads_param(_blob) else: _pdata = pickle.loads(_blob) ftetwild_by_uuid = _rust.extract_ftetwild_by_uuid(_pdata.get("group", [])) self._scene_decoder.populate_objects( self._scene, verbose=self._verbose, progress_callback=lambda progress, info: self._report_progress( 0.10 + 0.50 * progress, info, ), ftetwild_by_uuid=ftetwild_by_uuid, ) return self
[docs] def make(self) -> "BlenderApp": """Apply ``param.pickle`` to the populated scene and build a runnable session. Applies per-object parameters, pin configuration, cross-stitch constraints, explicit merge pairs, and invisible colliders to the scene, then builds the fixed scene and session. The resulting state is persisted to ``app_state.pickle`` on a best-effort basis. Example: Finish the build after :meth:`populate`, then run the session:: from frontend import BlenderApp app = BlenderApp("my-project").populate().make() app.session.run() app.session.preview() """ assert self._scene is not None, "Scene must be populated before making the app" param_path = os.path.join(self._root, "param.pickle") assert os.path.exists(param_path) self._report_progress(0.65, "Applying object parameters...") param_decoder = ParamDecoder().set_path(param_path) param_decoder.apply_to_objects(self._scene, verbose=self._verbose) self._report_progress(0.72, "Applying pin configuration...") param_decoder.apply_pin_config(self._scene, verbose=self._verbose) if param_decoder.cross_stitch: self._report_progress(0.78, "Applying cross-stitch constraints...") # Whole-batch port: Rust validates endpoints, projects # anchors for SOLID targets, builds each canonical dict, # and appends directly to ``self._scene._cross_stitch`` so # no Python-side per-entry append loop survives. obj_info_dict = { k: v.to_dict() for k, v in self._scene_decoder._object_info.items() } _rust.cross_stitch_apply_batch( param_decoder.cross_stitch, obj_info_dict, self._scene._cross_stitch, self._verbose, ) if param_decoder.explicit_merge_pairs: self._scene.set_explicit_merge_pairs(param_decoder.explicit_merge_pairs) self._report_progress(0.82, "Applying invisible colliders...") param_decoder.apply_invisible_colliders(self._scene, verbose=self._verbose) self._report_progress(0.84, "Building scene: preparing objects...") self._fixed_scene = self._scene.build( progress_callback=lambda progress, info: self._report_progress( 0.84 + 0.10 * progress, info, ) ) self._report_progress(0.94, "Initializing session...") self._session = Session( self._name, self._root, os.path.dirname(os.path.dirname(__file__)), self._data_dirpath, "session", ).init(self._fixed_scene) self._report_progress(0.97, "Applying session parameters...") param_decoder.apply_to_session(self._session, verbose=self._verbose) self._fixed_session = self._session.build() self._persist_app_state() self._report_progress(0.97, "Build decode complete.") return self
def _persist_app_state(self) -> None: """Write this BlenderApp to ``{_root}/app_state.pickle`` so that both the Blender-addon server and future notebook runs can discover the built state. The Blender-addon server reads it back through ``server/engine.py:AppState.load`` when it needs to adopt an externally-built session. Raises any persistence error (disk full, permission denied, pickling failure) rather than silently swallowing it: a failed persist means the server-side state machine won't pick up this build, and the caller should know. Format note: this site uses raw ``pickle`` rather than a CBOR envelope. The payload is the entire ``BlenderApp`` object (asset / scene / mesh / fixed_session graph), no manager class on that graph carries a hand-written CBOR schema, and the consumer lives outside this repo (the Blender-addon server's ``AppState.load``). The producer-side wrappers in :mod:`frontend._session_` and :mod:`frontend._app_` use CBOR envelopes; this is the only intentional raw-pickle write in :mod:`frontend.` """ # _process holds a subprocess Popen with non-picklable locks. if self._fixed_session is not None: self._fixed_session._process = None paths = _rust.app_state_persist_paths(self._root) os.makedirs(self._root, exist_ok=True) with open(paths["tmp_path"], "wb") as f: pickle.dump(self, f) os.replace(paths["tmp_path"], paths["final_path"]) @property def scene(self) -> FixedScene: """The built fixed scene. Use this for ``.report()``, ``.preview()``, etc. The mutable pre-build ``Scene`` remains accessible as ``_scene`` for advanced callers. Example: Open a Blender-authored project and inspect the built scene:: from frontend import BlenderApp app = BlenderApp.open("rod-example-3") app.scene.report() app.scene.preview() """ assert self._fixed_scene is not None, ( "Scene is not built yet. Call BlenderApp.open() or " ".populate().make() first." ) return self._fixed_scene @property def session(self) -> FixedSession: """The built fixed session. Use this for ``.run()``, ``.preview()``, ``.stream()``, etc. Example: Run a Blender-authored session and stream live output:: from frontend import BlenderApp app = BlenderApp.open("rod-example-3") app.session.run() app.session.preview() app.session.stream() """ assert self._fixed_session is not None, ( "Session is not built yet. Call BlenderApp.open() or " ".populate().make() first." ) return self._fixed_session
class ParamDecoder: """Load and apply ``param.pickle`` written by the Blender addon. Parameters are split across object-level (velocity, fTetWild hints, per-key ``param.set`` values), pin configuration, cross-stitch constraints, explicit merge pairs, invisible walls / spheres, and session-level scene settings. :meth:`set_path` loads the pickle, then the ``apply_*`` methods dispatch each section. Example: Apply a pickle to an existing scene and session (this is what :meth:`BlenderApp.make` does internally):: from frontend._decoder_ import ParamDecoder decoder = ParamDecoder().set_path("/path/to/param.pickle") decoder.apply_to_objects(scene) decoder.apply_pin_config(scene) decoder.apply_invisible_colliders(scene) fixed_scene = scene.build() decoder.apply_to_session(session) """ def __init__(self): self._data = None def set_path(self, filepath: str) -> "ParamDecoder": """Load parameter data from a pickle file and cache it on this decoder. Args: filepath (str): Path to the pickle file. Must end in ``.pickle``. Returns: ParamDecoder: ``self`` for chaining. Example: Chain the load with a subsequent apply call:: from frontend._decoder_ import ParamDecoder decoder = ParamDecoder().set_path("/path/to/param.pickle") decoder.apply_to_objects(scene) """ _rust.validate_pickle_extension(filepath) with open(filepath, "rb") as f: blob = f.read() from . import _cbor_bridge_ as _cbor if _cbor.is_cbor(blob): self._data = _cbor.loads_param(blob) else: self._data = pickle.loads(blob) _rust.validate_param_top_keys( "group" in self._data, "scene" in self._data ) self._pin_config = self._data.get("pin_config", {}) self._explicit_merge_pairs = self._data.get("explicit_merge_pairs", []) self._cross_stitch = self._data.get("cross_stitch", []) return self @property def explicit_merge_pairs(self) -> list: return self._explicit_merge_pairs @property def cross_stitch(self) -> list: return self._cross_stitch def apply_to_objects(self, scene: Scene, verbose: bool = False): """Apply the loaded parameter data to the objects in ``scene``. Call :meth:`set_path` first. Per-object dicts (velocity, velocity-schedule, collision-windows) are keyed by UUID; other keys are forwarded to ``obj.param.set``. fTetWild overrides are consumed at populate-time and skipped here. Args: scene (Scene): The scene to which the parameters will be applied. verbose (bool): Enable verbose logging. Example: Load a pickle and apply per-object parameters to a populated scene:: from frontend._decoder_ import ParamDecoder decoder = ParamDecoder().set_path("/path/to/param.pickle") decoder.apply_to_objects(scene, verbose=True) """ assert self._data is not None, "Parameter data not set. Call set_path() first." if verbose: print("=== Object Parameters ===") for group_entry in self._data["group"]: params, objects = group_entry[0], group_entry[1] # Third tuple slot holds UUIDs aligned with ``objects``. # Per-object dicts (velocity / velocity-schedule / collision- # windows) are keyed on UUID by the encoder. _rust.validate_param_group_has_uuids(len(group_entry)) obj_uuids = group_entry[2] for obj_name, obj_uuid in zip(objects, obj_uuids): _rust.validate_param_object_uuid(obj_name, obj_uuid) if verbose: print(f"*** name: {obj_name} (uuid={obj_uuid}) ***") obj = scene.select(obj_uuid) obj.param.clear_all() for key, val in params.items(): if verbose: print(f" {key}: {val}") if key == "velocity": if isinstance(val, dict): v = val.get(obj_uuid) if obj_uuid else None if v is not None: obj.velocity(*v) else: obj.velocity(*val) elif key == "velocity-schedule": if isinstance(val, dict): s = val.get(obj_uuid) if obj_uuid else None if s: obj.velocity_schedule(s) else: obj.velocity_schedule(val) elif key == "collision-windows": if isinstance(val, dict): w = val.get(obj_uuid) if obj_uuid else None if w: obj.collision_windows(w) elif key == "ftetwild": # Consumed at populate-time via the param.pickle peek; # no per-object ParamHolder slot by design (would # break concat_tet_param / concat_tri_param key-set # equality in _scene_.extend_param). pass else: obj.param.set(key, val) def apply_to_session(self, session: Session, verbose: bool = False): """Apply scene-level and dynamic parameters from the loaded data to ``session``. Call :meth:`set_path` first. Static scene parameters are forwarded to ``session.param.set``; ``inactive-momentum`` is additionally set as a dynamic hold. Dynamic parameter keyframes from ``dyn_param`` are applied via ``session.param.dyn(...)``. Args: session (Session): The session to which the parameters will be applied. verbose (bool): Enable verbose logging. Example: Apply scene-level and dynamic parameters to a freshly initialized session:: from frontend._decoder_ import ParamDecoder decoder = ParamDecoder().set_path("/path/to/param.pickle") session = app.session.create(scene).init(fixed_scene) decoder.apply_to_session(session) session.build().start() """ assert self._data is not None, "Parameter data not set. Call set_path() first." if verbose: print("=== Session Parameters ===") session.param.clear_all() for k, v in self._data["scene"].items(): if verbose: print(f" {k}: {v}") if k == "inactive-momentum" and v > 0: inactive_momentum_time = float(v) session.param.set("inactive-momentum") session.param.dyn("inactive-momentum").time(inactive_momentum_time).hold().change(False) else: session.param.set(k, v) # Apply dynamic parameters if present dyn_param = self._data.get("dyn_param", {}) for key, entries in dyn_param.items(): if len(entries) < 2: continue if verbose: print(f" dyn({key}): {len(entries)} keyframes") # First entry is initial value (already set via static params above). # Subsequent entries are (time, value, is_hold) tuples. builder = session.param.dyn(key) for entry in entries[1:]: t, v = entry[0], entry[1] is_hold = entry[2] if len(entry) > 2 else False if is_hold: builder.time(t).hold() else: if isinstance(v, list) and len(v) == 1: builder.time(t).change(v[0]) else: builder.time(t).change(v) def apply_pin_config(self, scene, verbose: bool = False): """Apply saved pin configuration to scene pin holders. Applies unpin time, pull strength, pin group id, embedded move keyframes, and explicit pin operations (spin / scale / move_by / torque) to each pin holder that has a matching config entry. Call after the scene is populated and pins are created. Example: Reapply pin settings after populating objects:: from frontend._decoder_ import ParamDecoder decoder = ParamDecoder().set_path("/path/to/param.pickle") decoder.apply_to_objects(scene) decoder.apply_pin_config(scene, verbose=True) """ if not self._pin_config: return if verbose: print("=== Pin Config ===") for dyn_name, dyn_obj in scene.object_dict.items(): # pin_config keyed by UUID; objects are registered by UUID obj_cfg = self._pin_config.get(dyn_name, {}) if not obj_cfg: continue # Collapse the single all-vertex holder (built by # _apply_pin_mapping) into one holder per pin_group_id, so a # group's ops/animation apply once to the whole group. Without # this a keyframed N-vertex pin became N one-vertex holders, # each carrying every keyframe op -> N*M solver pin files. self._regroup_pin_holders(dyn_obj, obj_cfg) for pin_holder in dyn_obj.pin_list: # For solid objects, use stored Blender indices for config lookup lookup_indices = getattr(pin_holder._data, '_blender_pin_indices', None) or pin_holder.index for vi in lookup_indices: cfg = obj_cfg.get(vi) if cfg is None: continue self._apply_pin_cfg_entry( pin_holder, dyn_name, vi, cfg, obj_cfg, verbose, ) break @staticmethod def _regroup_pin_holders(dyn_obj, obj_cfg): """Rebuild ``dyn_obj``'s pin holders as one holder per pin group. ``_apply_pin_mapping`` registers a single holder spanning every pinned vertex of an object. An object with several distinct pin vertex groups (e.g. separate ``left`` / ``right`` pins) needs one holder per group so each group's config is applied to exactly its vertices. Grouping key is ``pin_group_id`` from ``obj_cfg``. Skips SOLID surface-mapped holders: those carry ``_blender_pin_indices`` and a sim-vertex set the regroup would lose, and ``_apply_pin_mapping`` already builds them correctly. """ holders = list(dyn_obj.pin_list) if not holders: return if any(getattr(h._data, "_blender_pin_indices", None) for h in holders): return all_idx = [int(i) for h in holders for i in h.index] gid_order: list = [] gid_verts: dict = {} for vi in all_idx: cfg = obj_cfg.get(vi) # Verts with no config entry (plain hold-fixed pins) group # under one sentinel key so they stay a single holder too. key = cfg.get("pin_group_id") if cfg else "__plain__" if key not in gid_verts: gid_verts[key] = [] gid_order.append(key) gid_verts[key].append(vi) # Already one-holder-per-group: nothing to do. if len(holders) == len(gid_order): return dyn_obj.pin_list.clear() for key in gid_order: dyn_obj.pin(gid_verts[key]) def _apply_pin_cfg_entry(self, pin_holder, dyn_name, vi, cfg, obj_cfg, verbose): """Apply a single pin-config entry to ``pin_holder``.""" if "unpin_time" in cfg: pin_holder.unpin(cfg["unpin_time"]) if verbose: print(f" {dyn_name}[{vi}]: unpin_time={cfg['unpin_time']}") if "pull_strength" in cfg: pin_holder.pull(cfg["pull_strength"]) if "pin_group_id" in cfg: pin_holder._data.pin_group_id = cfg["pin_group_id"] if verbose: print(f" {dyn_name}[{vi}]: pin_group_id={cfg['pin_group_id']}") if "operations" not in cfg and "embedded_move_index" not in cfg: return embedded_ops = self._build_embedded_move_ops(pin_holder, obj_cfg) embedded_move_index = cfg.get("embedded_move_index", -1) explicit_operations = cfg.get("operations", []) # Validate: torque cannot be mixed with kinematic ops _rust.validate_pin_op_types( [str(o.get("type", "")) for o in explicit_operations], dyn_name, ) # Clear existing operations pin_holder._data.operations = [] # Re-add in the correct order explicit_idx = 0 embedded_inserted = False total = len(explicit_operations) + (1 if embedded_move_index >= 0 else 0) if total == 0 and embedded_ops: # Only embedded move, no explicit ops pin_holder._data.operations.extend(embedded_ops) embedded_inserted = True for pos in range(total): if pos == embedded_move_index and not embedded_inserted: # Insert embedded move operations at this position pin_holder._data.operations.extend(embedded_ops) embedded_inserted = True else: if explicit_idx < len(explicit_operations): op = explicit_operations[explicit_idx] explicit_idx += 1 self._dispatch_pin_op(pin_holder, op) # If embedded move wasn't inserted yet (index beyond end), append if not embedded_inserted and embedded_move_index >= 0: pin_holder._data.operations.extend(embedded_ops) if verbose: print(f" {dyn_name}[{vi}]: operations={len(pin_holder.operations)}") @staticmethod def _build_embedded_move_ops(pin_holder, obj_cfg): """Build embedded ``MoveByOperation`` segments from ``pin_anim``. Each pinned vertex's ``PinData`` carries its own single-entry ``pin_anim`` track (``{vertex_index: PinAnim}``). Consecutive keyframes become ``MoveByOperation`` segments whose ``delta`` is the genuine per-vertex displacement, so the pin deforms โ€” it is not rigidly translated. """ import numpy as np from ._scene_pin_ import MoveByOperation index = list(pin_holder.index) n_pin_verts = len(index) # Each vertex's own track lives in its own cfg entry, keyed by # that same vertex index. Gather them in pin_holder.index order. tracks = [] times = None for v in index: cfg = obj_cfg.get(v) track = cfg.get("pin_anim", {}).get(v) if cfg else None tracks.append(track) if track is not None and times is None: times = list(track["time"]) if times is None or len(times) < 2: return [] n_frames = len(times) # Per-vertex absolute-position tracks, aligned to the # pin_holder.index order. A vertex with no track keeps a zero # track (zero delta โ€” unaffected by the embedded move). positions = np.zeros((n_frames, n_pin_verts, 3), dtype=np.float64) for j, track in enumerate(tracks): if track is not None: pos = np.asarray(track["position"], dtype=np.float64) if pos.shape == (n_frames, 3): positions[:, j, :] = pos embedded_ops: list = [] for k in range(n_frames - 1): embedded_ops.append(MoveByOperation( delta=np.ascontiguousarray(positions[k + 1] - positions[k]), t_start=times[k], t_end=times[k + 1], transition="linear", )) return embedded_ops def _dispatch_pin_op(self, pin_holder, op): """Dispatch a single explicit pin op to its per-type handler.""" t_start = op.get("t_start", 0) t_end = op.get("t_end", 1) transition = op.get("transition", "linear") if transition != "linear": pin_holder.interp(transition) op_type = op["type"] if op_type == "spin": self._apply_pin_op_spin(pin_holder, op, t_start, t_end) elif op_type == "scale": self._apply_pin_op_scale(pin_holder, op, t_start, t_end) elif op_type == "move_by": self._apply_pin_op_move_by(pin_holder, op, t_start, t_end) elif op_type == "torque": self._apply_pin_op_torque(pin_holder, op, t_start, t_end) if transition != "linear": pin_holder.interp("linear") @staticmethod def _apply_pin_op_spin(pin_holder, op, t_start, t_end): center_mode = op.get("center_mode", "absolute") pin_holder.spin( center=op.get("center"), axis=op.get("axis", [0, 1, 0]), angular_velocity=op.get("angular_velocity", 360), t_start=t_start, t_end=t_end, center_mode=center_mode, ) @staticmethod def _apply_pin_op_scale(pin_holder, op, t_start, t_end): center_mode = op.get("center_mode", "absolute") pin_holder.scale( op.get("factor", 1.0), t_start=t_start, t_end=t_end, center=op.get("center"), center_mode=center_mode, ) @staticmethod def _apply_pin_op_move_by(pin_holder, op, t_start, t_end): pin_holder.move_by( op.get("delta", [0, 0, 0]), t_start=t_start, t_end=t_end, ) @staticmethod def _apply_pin_op_torque(pin_holder, op, t_start, t_end): blender_hint = int(op.get("hint_vertex", -1)) # Translate Blender hint vertex to nearest tet vertex by position # (robust to re-tetrahedralization). Closest-vertex search # lives in Rust (`dec::closest_vertex_index`). tet_V = getattr(pin_holder._data, '_tet_V', None) blender_vert = getattr(pin_holder._data, '_blender_vert', None) if tet_V is not None and blender_vert is not None and blender_hint >= 0: import numpy as _np hint_pos = _np.ascontiguousarray( _np.asarray(blender_vert[blender_hint], dtype=_np.float64) ) tet_arr = _np.ascontiguousarray( _np.asarray(tet_V, dtype=_np.float64) ) sim_hint = int(_rust.closest_vertex_index(tet_arr, hint_pos)) else: sim_hint = blender_hint pin_holder.torque( magnitude=op.get("magnitude", 1.0), axis_component=int(op.get("axis_component", 2)), hint_vertex=sim_hint, t_start=t_start, t_end=t_end, ) def apply_invisible_colliders(self, scene, verbose: bool = False): """Create invisible wall and sphere colliders on the scene from the loaded pickle data. Must be called BEFORE ``scene.build()``. Example: Add the pickle's invisible walls and spheres just before building:: from frontend._decoder_ import ParamDecoder decoder = ParamDecoder().set_path("/path/to/param.pickle") decoder.apply_invisible_colliders(scene) fixed_scene = scene.build() """ ic = self._data.get("invisible_colliders", {}) if not ic: return if verbose: print("=== Invisible Colliders ===") for w in ic.get("walls", []): wall = scene.add.invisible.wall(w["position"], w["normal"]) wall.param.set("contact-gap", w.get("contact_gap", 1e-3)) wall.param.set("friction", w.get("friction", 0.0)) wall.param.set("active-duration", w.get("active_duration", -1.0)) thickness = _rust.validate_invisible_collider_thickness( "wall", float(w.get("thickness", 1.0)) ) wall.param.set("thickness", thickness) keyframes = w.get("keyframes", []) for kf in keyframes[1:]: wall.move_to(kf["position"], kf["time"]) if verbose: print(f" Wall: pos={w['position']}, normal={w['normal']}, kf={len(keyframes)}") for s in ic.get("spheres", []): sphere = scene.add.invisible.sphere(s["position"], s["radius"]) if s.get("hemisphere", False): sphere.hemisphere() if s.get("invert", False): sphere.invert() sphere.param.set("contact-gap", s.get("contact_gap", 1e-3)) sphere.param.set("friction", s.get("friction", 0.0)) sphere.param.set("active-duration", s.get("active_duration", -1.0)) thickness = _rust.validate_invisible_collider_thickness( "sphere", float(s.get("thickness", 1.0)) ) sphere.param.set("thickness", thickness) keyframes = s.get("keyframes", []) for kf in keyframes[1:]: sphere.transform_to(kf["position"], kf["radius"], kf["time"]) if verbose: print(f" Sphere: pos={s['position']}, r={s['radius']}, inv={s.get('invert')}, hemi={s.get('hemisphere')}, kf={len(keyframes)}") class SceneDecoder: """Decode ``data.pickle`` written by the Blender addon into scene objects. Handles canonical mesh deduplication, per-instance transforms, cached tetrahedralization for SOLID groups, pin registration with Blender-to-sim surface mapping, UV assignment for SHELL groups, and rod / static groups. Used internally by :class:`BlenderApp`. Example: Drive the decoder directly to populate a scene (this is what :meth:`BlenderApp.populate` does internally):: from frontend._asset_ import AssetManager from frontend._mesh_ import MeshManager from frontend._decoder_ import SceneDecoder assets = AssetManager() meshes = MeshManager("/tmp/cache") decoder = SceneDecoder("/path/to/data.pickle", assets, meshes) decoder.populate_objects(scene, verbose=True) """ def __init__( self, filepath: str, asset_manager: AssetManager, mesh_manager: MeshManager ): _rust.validate_pickle_extension(filepath) with open(filepath, "rb") as f: blob = f.read() # See ``frontend/_cbor_bridge_.py`` for the byte-sniff contract. from . import _cbor_bridge_ as _cbor if _cbor.is_cbor(blob): self._data = _cbor.loads_scene(blob) else: self._data = pickle.loads(blob) self._asset = asset_manager self._mesh = mesh_manager self._object_info: dict[str, ObjectInfo] = {} # uuid -> ObjectInfo for stitch generation def _tetra_cache_path(self, tri_mesh) -> str: return tri_mesh.cache_path(_rust.tetra_cache_filename(tri_mesh.hash)) def _summarize_tetra_jobs(self, tetra_jobs: list[dict]) -> str: return _rust.summarize_tetra_jobs(tetra_jobs) @staticmethod def _apply_transform(local_vert, transform): """Apply a 4x4 transform to local-space vertices and return world-space vertices as float32.""" import numpy as np mat = np.ascontiguousarray(np.asarray(transform, dtype=np.float64)) v = np.ascontiguousarray(np.asarray(local_vert, dtype=np.float64)) return _rust.apply_transform_4x4(v, mat) def _resolve_object_mesh(self, obj, name, obj_uuid, mesh_ref, canonical_meshes): """Resolve local-space mesh data for an object during the second pass. Returns ``(local_vert, face, edge, uv, vert)`` where ``vert`` is the world-space vertices used by ``ObjectInfo`` and the pin-mapping pass. """ if mesh_ref and mesh_ref in canonical_meshes: ref = canonical_meshes[mesh_ref] local_vert = ref["vert"] face = ref["face"] edge = ref.get("edge") uv = ref.get("uv") elif "vert" in obj: local_vert = obj["vert"] # Local if transform present, world if legacy face = obj.get("face") edge = obj.get("edge") uv = obj.get("uv", None) else: _rust.validate_object_has_mesh(name, obj_uuid, False) raise AssertionError("unreachable") # pragma: no cover # World-space vertices (for object_info used by pin mapping etc.) vert = obj.get("_resolved_vert", local_vert) return local_vert, face, edge, uv, vert @staticmethod def _log_object_mesh(name, vert, face, edge, uv): """Verbose-mode per-object summary printed during the second pass.""" if edge is not None: print( f" * name: {name}, vert: {vert.shape}, edge: {edge.shape}, uv: {len(uv) if uv is not None else 'None'}" ) else: print( f" * name: {name}, vert: {vert.shape}, face: {face.shape if face is not None else 'None'}, uv: {len(uv) if uv is not None else 'None'}" ) def populate_objects(self, scene: Scene, verbose: bool = False, progress_callback=None, ftetwild_by_uuid: dict | None = None) -> Scene: """Populate ``scene`` with objects from the decoder's pickle data. Handles STATIC, SOLID, SHELL, and ROD groups, including canonical mesh deduplication by UUID, per-instance transforms, cached tetrahedralization (with per-UUID fTetWild overrides), pin registration with Blender-to-sim surface mapping, and stitches. Args: scene (Scene): The scene to populate. verbose (bool): Enable verbose logging. progress_callback: Optional callable ``fn(progress: float, info: str)`` invoked during loading. ``progress`` is in ``[0.0, 1.0]``. ftetwild_by_uuid (dict | None): Optional mapping of object UUID to fTetWild keyword arguments used during tetrahedralization. Returns: Scene: The populated scene. Example: Populate a fresh scene from a Blender pickle:: from frontend._asset_ import AssetManager from frontend._mesh_ import MeshManager from frontend._decoder_ import SceneDecoder decoder = SceneDecoder( "/path/to/data.pickle", AssetManager(), MeshManager("/tmp/cache"), ) decoder.populate_objects(scene, verbose=True) fixed_scene = scene.build() """ plan = self._plan_build(progress_callback) object_entries = plan["object_entries"] canonical_meshes = plan["canonical_meshes"] canonical_asset_name = plan["canonical_asset_name"] tetra_jobs = plan["tetra_jobs"] progress = plan["progress"] report = progress["report"] report(f"Build plan: {self._summarize_tetra_jobs(tetra_jobs)}") progress["completed"] += 0.5 object_iter = iter(object_entries) for group in self._data: objects = group.get("object", None) assert objects is not None, "Object data not found in the group." group_type = group.get("type") if verbose: print(f"--- new group: {group_type} ---") for obj in objects: entry = next(object_iter) name = obj.get("name", "") obj_uuid = obj.get("uuid", "") _rust.validate_scene_object_identity(name, obj_uuid) mesh_ref = obj.get("_mesh_ref_resolved") transform = obj.get("transform") local_vert, face, edge, uv, vert = self._resolve_object_mesh( obj, name, obj_uuid, mesh_ref, canonical_meshes, ) report(f"Loading {group_type}: {name}...") if verbose: self._log_object_mesh(name, vert, face, edge, uv) tet_mesh = None V = None F = None if group_type == "STATIC": _obj = self._populate_static( scene, obj, name, obj_uuid, local_vert, face, transform, verbose, ) elif group_type == "SOLID": _obj, tet_mesh, V, F = self._populate_solid( scene, obj, entry, name, obj_uuid, vert, object_entries, tetra_jobs, ftetwild_by_uuid, progress, ) elif group_type == "SHELL": _obj = self._populate_shell( scene, obj_uuid, mesh_ref, canonical_meshes, canonical_asset_name, name, local_vert, face, transform, vert, uv, ) elif group_type == "ROD": _obj = self._populate_rod( scene, name, obj_uuid, local_vert, edge, transform, vert, ) else: _rust.validate_group_type(group_type) _obj = None self._apply_pin_mapping( obj, _obj, group_type, vert, V, F, tet_mesh, verbose, ) self._apply_stitch(obj, _obj, name, verbose) progress["completed"] += entry["base_weight"] report(f"Loaded {group_type}: {name}") return scene def _plan_build(self, progress_callback): """First pass: build canonical mesh table, resolve mesh refs, and size the tetra-job list. Returns a dict with ``object_entries``, ``canonical_meshes``, ``canonical_asset_name``, ``tetra_jobs``, and a ``progress`` sub-dict carrying mutable counters plus the ``report`` callable used by the second pass. """ planning_steps = sum(len(group.get("object", [])) for group in self._data) planning_weight = max(1.0, 0.35 * planning_steps) # Preallocate ``object_entries`` to ``planning_steps`` slots so # the planning loop fills slots by index instead of growing the # list one ``append`` at a time. Each slot is later overwritten # with a fully-populated entry dict. object_entries: list = [None] * planning_steps # The ``progress`` dict carries mutable counters across the # planning loop, the per-group dispatchers, and ``report``. progress = {"completed": 0.0, "total": 0.0} object_work = 0.0 # First-pass tetra-job counter; the dedup-aware rebuild below # is what actually feeds ``_summarize_tetra_jobs`` so this just # stamps a provisional ``tetra_index``. tetra_idx_first_pass = 0 entry_idx = 0 # Mesh deduplication: canonical mesh data keyed by UUID canonical_meshes: dict = {} def report(info: str, total_work_override: float | None = None): if progress_callback is not None: total = progress["total"] if total_work_override is None else total_work_override progress_callback( progress["completed"] / total if total > 0 else 1.0, info, ) progress["report"] = report # First pass: resolve mesh_ref and build canonical mesh table. # canonical_meshes is keyed by UUID (encoder sends mesh_ref as UUID). # Also keep asset_name (Blender name) for asset registration. canonical_asset_name: dict = {} for group in self._data: objects = group.get("object", None) assert objects is not None, "Object data not found in the group." for obj in objects: name = obj.get("name", "") obj_uuid = obj.get("uuid", "") _rust.validate_scene_object_identity(name, obj_uuid) if "mesh_ref" not in obj and "vert" in obj: # Canonical mesh: store local data for referenced instances canonical_meshes[obj_uuid] = { "vert": obj.get("vert"), "face": obj.get("face"), "edge": obj.get("edge"), "uv": obj.get("uv", None), "stitch": obj.get("stitch", None), } canonical_asset_name[obj_uuid] = name planning_increment = planning_weight / planning_steps if planning_steps > 0 else 0.0 report("Scanning build plan...", planning_weight) for group in self._data: objects = group.get("object", None) assert objects is not None, "Object data not found in the group." group_type = group.get("type") for obj in objects: name = obj.get("name") report( f"Scanning build plan: {name}...", planning_weight, ) # Resolve mesh data: either from obj directly or via mesh_ref mesh_ref = obj.get("mesh_ref") transform = obj.get("transform") if mesh_ref is not None: _rust.validate_mesh_ref_known( name, str(mesh_ref), mesh_ref in canonical_meshes ) if mesh_ref is not None and mesh_ref in canonical_meshes: # Duplicate: get local mesh from canonical, apply transform ref = canonical_meshes[mesh_ref] local_vert = ref["vert"] face = ref["face"] vert = self._apply_transform(local_vert, transform) if transform is not None else local_vert obj["_resolved_vert"] = vert obj["_resolved_face"] = face obj["_resolved_uv"] = ref.get("uv") obj["_resolved_stitch"] = ref.get("stitch") obj["_resolved_edge"] = ref.get("edge") obj["_mesh_ref_resolved"] = mesh_ref elif transform is not None and "vert" in obj: # Canonical with transform: apply transform to local vertices vert = self._apply_transform(obj["vert"], transform) obj["_resolved_vert"] = vert obj["_resolved_face"] = obj.get("face") obj["_resolved_uv"] = obj.get("uv") obj["_resolved_stitch"] = obj.get("stitch") obj["_resolved_edge"] = obj.get("edge") obj["_mesh_ref_resolved"] = None else: # Legacy format: vertices already in world space obj["_resolved_vert"] = obj.get("vert") obj["_resolved_face"] = obj.get("face") obj["_resolved_uv"] = obj.get("uv") obj["_resolved_stitch"] = obj.get("stitch") obj["_resolved_edge"] = obj.get("edge") obj["_mesh_ref_resolved"] = None entry = { "group_type": group_type, "obj": obj, "name": name, "base_weight": 1.0, "tetra_weight": 0.0, "tetra_index": None, "tetra_cached": False, "tri_mesh": None, } if group_type == "SOLID": report( f"Checking tetra cache: {name}...", planning_weight, ) # For dedup: use LOCAL vertices for tet (same hash for duplicates) if mesh_ref and mesh_ref in canonical_meshes: tet_vert = canonical_meshes[mesh_ref]["vert"] tet_face = canonical_meshes[mesh_ref]["face"] elif "vert" in obj and obj.get("transform") is not None: tet_vert = obj["vert"] # Local-space tet_face = obj.get("face") else: tet_vert = obj["_resolved_vert"] # Legacy: world-space tet_face = obj["_resolved_face"] tri_mesh = self._mesh.create.tri(tet_vert, tet_face) cached = os.path.exists(self._tetra_cache_path(tri_mesh)) entry["tri_mesh"] = tri_mesh entry["tetra_cached"] = cached tetra_idx_first_pass += 1 entry["tetra_index"] = tetra_idx_first_pass entry["tetra_weight"] = 3.0 if cached else 8.0 object_work += entry["tetra_weight"] object_entries[entry_idx] = entry entry_idx += 1 object_work += entry["base_weight"] progress["completed"] += planning_increment progress["total"] = planning_weight + 0.5 + object_work # Dedup-by-tri_mesh-hash + tetra_jobs rebuild + per-entry # tetra_index reassignment all run in a single Rust pass, so # the per-entry ``tet_hash_seen`` dict and the per-entry # ``tetra_jobs.append`` Python loops are eliminated. The Rust # call returns the rebuilt jobs list and the work_delta that # gets folded into ``object_work``. tetra_jobs, _work_delta = _rust.dedup_and_rebuild_tetra_jobs(object_entries) object_work += _work_delta progress["total"] = planning_weight + 0.5 + object_work return { "object_entries": object_entries, "canonical_meshes": canonical_meshes, "canonical_asset_name": canonical_asset_name, "tetra_jobs": tetra_jobs, "progress": progress, } def _populate_static(self, scene, obj, name, obj_uuid, local_vert, face, transform, verbose): """STATIC group dispatcher: rest-pose mesh, transform-keyframe animation, UI-assigned static ops, or per-vertex deformation cache. Returns the Scene Object for downstream pin / stitch passes, or ``None`` for the rest-pose case (no further pin work needed). """ import numpy as np transform_anim = obj.get("transform_animation", None) static_ops = obj.get("static_ops", []) or [] static_deform = obj.get("static_deform_animation", None) _rust.validate_static_anim_xor_ops( name, transform_anim is not None, bool(static_ops), static_deform is not None, ) if verbose: print( f" > transform_animation: " f"{'YES' if transform_anim else 'NO'}, " f"static_ops: {len(static_ops)}, " f"static_deform_animation: " f"{'YES' if static_deform else 'NO'}" ) def _setup_pin_shell(): """Common setup for animated static objects: a zero-stiffness shell whose vertices are driven by pin operations. Returns the Object plus its rest-frame translation (``(obj, rest_t)``). """ self._asset.add.tri(name, local_vert, face) _o = scene.add(name, obj_uuid) if transform is not None: _o.mat4x4(transform) _o.param.set("density", 0.0) _o.param.set("young-mod", 0.0) _o.param.set("poiss-rat", 0.0) _o.param.set("bend", 0.0) # Mark for preview suppression: the shell's pins # are implementation detail, not user-chosen. _o._is_static_moving = True rest_t = ( np.asarray(transform, dtype=np.float64)[:3, 3] if transform is not None else np.zeros(3, dtype=np.float64) ) return _o, rest_t if transform_anim is not None: # Case 1: Blender keyframes drive the pose. The simulator # enforces the pin as a soft constraint, so its output # vertices drift slightly from the input keyframes. The # dynamics were resolved against the drifted positions, so # we must display those โ€” not the keyframes โ€” to keep the # collider visually consistent with the cloth. Include in # output PC2. _obj, rest_translation = _setup_pin_shell() _obj.pin().transform_keyframes( local_vert=local_vert, times=transform_anim["time"], translations=transform_anim["translation"], quaternions=transform_anim["quaternion"], scales=transform_anim["scale"], segments=transform_anim.get("segments", []), rest_translation=rest_translation, ) return _obj if static_ops: # Case 2: UI-assigned move/spin/scale ops. Blender has no # fcurves, so the remote sim is the source of truth (include # in output so PC2 can play it back). _obj, _ = _setup_pin_shell() pin = _obj.pin() for op in static_ops: t_start = float(op["t_start"]) t_end = float(op["t_end"]) transition = op.get("transition", "linear") if op["op_type"] == "MOVE_BY": pin.move_by( list(op["delta"]), t_start=t_start, t_end=t_end, transition=transition, ) elif op["op_type"] == "SPIN": # Center is always the object origin: in the # pin-shell's local op frame that's (0,0,0). pin.spin( center=[0.0, 0.0, 0.0], axis=list(op["axis"]), angular_velocity=float(op["angular_velocity"]), t_start=t_start, t_end=t_end, center_mode="absolute", ) elif op["op_type"] == "SCALE": pin.scale( scale=float(op["factor"]), t_start=t_start, t_end=t_end, center=[0.0, 0.0, 0.0], center_mode="absolute", ) else: _rust.validate_static_op_type(op["op_type"]) return _obj if static_deform is not None: # Case 3: per-frame depsgraph-baked vertex stream from the # Capture Deformation operator. local_vert is already # frame_start's depsgraph-evaluated mesh in solver world # space (the encoder swapped it in), and transform is # identity. The pin shell starts at vert_frames[0] and we # add one MoveByOperation per consecutive frame pair to # drive every vertex through the recorded trajectory. # # The pin is a soft constraint, so the simulator output # drifts from the captured cache. The cloth was resolved # against the drifted positions, so include the shell in # output PC2 and let MESH_CACHE overwrite the depsgraph- # driven mesh on display. times = list(static_deform["time"]) vert_frames = np.ascontiguousarray( static_deform["vert_frames"], dtype=np.float64, ) if vert_frames.ndim != 3 or vert_frames.shape[2] != 3: raise ValueError( f"static_deform_animation['vert_frames'] for " f"'{name}' must be (n_frames, n_verts, 3); got " f"shape {vert_frames.shape}" ) n_frames = vert_frames.shape[0] n_verts = vert_frames.shape[1] if n_frames != len(times): raise ValueError( f"static_deform_animation for '{name}': " f"len(time)={len(times)} != n_frames={n_frames}" ) if n_verts != len(local_vert): raise ValueError( f"static_deform_animation for '{name}': " f"cache has {n_verts} vertices but mesh has " f"{len(local_vert)}" ) _obj, _ = _setup_pin_shell() pin = _obj.pin() # Successive MoveBy segments compose: at t=times[k], # pin pos = local_vert + sum(deltas up to k) = vert_frames[k] # (because the encoder set local_vert == vert_frames[0]). for k in range(n_frames - 1): delta = np.ascontiguousarray( vert_frames[k + 1] - vert_frames[k], dtype=np.float64, ) pin.move_by( delta, t_start=float(times[k]), t_end=float(times[k + 1]), transition="linear", ) return _obj # Case 4: rest-pose static (plain collision mesh). self._asset.add.tri(name, local_vert, face) _static_obj = scene.add(name, obj_uuid) if transform is not None: _static_obj.mat4x4(transform) _static_obj.pin() return None def _populate_solid( self, scene, obj, entry, name, obj_uuid, vert, object_entries, tetra_jobs, ftetwild_by_uuid, progress, ): """SOLID group dispatcher: tetrahedralize (or reuse a canonical result), register the tet asset, add the scene instance, and record an ``ObjectInfo`` entry. Returns ``(_obj, tet_mesh, V, F)`` for the pin-mapping pass. """ report = progress["report"] reuse_from = entry.get("tetra_reuse_from") transform = obj.get("transform") if reuse_from is not None: # Reuse tet + asset from canonical mesh src = object_entries[reuse_from] tet_mesh = src["_tet_mesh_result"] asset_name = src["name"] report(f"Reusing tetrahedralization from {asset_name} for {name}") else: prefix = ( f"Tetrahedralizing {name} " f"({entry['tetra_index']}/{len(tetra_jobs)}, " f"{'cached' if entry['tetra_cached'] else 'new'})" ) report(f"{prefix}...") ftw_kwargs = ( (ftetwild_by_uuid or {}).get(obj_uuid, {}) or {} ) try: tet_mesh = entry["tri_mesh"].tetrahedralize( status_callback=lambda detail, prefix=prefix: report( f"{prefix}: {detail}" ), **ftw_kwargs, ) except ValueError as e: # Prepend the object name so the addon UI shows which # SOLID mesh failed (the underlying message already # explains the cause and suggests SHELL). raise ValueError(f"{name}: {e}") from e entry["_tet_mesh_result"] = tet_mesh progress["completed"] += entry["tetra_weight"] report( f"Finished tetrahedralizing {name} " f"({entry['tetra_index']}/{len(tetra_jobs)})" ) asset_name = name V_local, F, T = tet_mesh # Register asset with local-space vertices self._asset.add.tet(asset_name, V_local, F, T) V_local, F, T = tet_mesh # Add scene instance with per-instance transform _obj = scene.add(asset_name, obj_uuid) if transform is not None: _obj.mat4x4(transform) # Compute world-space V for object_info if transform is not None: V = self._apply_transform(V_local, transform) else: V = V_local solid_info = ObjectInfo(type="SOLID", vert=vert, V=V, F=F) if tet_mesh.has_surface_mapping(): import numpy as np tri_indices, coefs = tet_mesh.surface_map # Pass local-space tet vertices: the surface-map coefs were # computed in local space, so the in-Rust ``x0 + c1*b1 + c2*b2 + # c3*nฬ‚`` reconstruction (used to pick the closest of three # triangle corners) only matches the original Blender position # when V is in the same space as the coefs. Under non-uniform # world scale, mixing world V with local coefs shifts the bp by # tens of centimeters and can pick the wrong corner. orig_to_sim = _rust.solid_orig_to_sim( np.ascontiguousarray(np.asarray(tri_indices, dtype=np.int64)), np.ascontiguousarray(np.asarray(coefs, dtype=np.float64)), np.ascontiguousarray(np.asarray(F, dtype=np.int64)), np.ascontiguousarray(np.asarray(V_local, dtype=np.float64)), ) solid_info.orig_to_sim = orig_to_sim scene.set_surface_map(obj_uuid, tri_indices, coefs, F) self._object_info[obj_uuid] = solid_info return _obj, tet_mesh, V, F def _populate_shell( self, scene, obj_uuid, mesh_ref, canonical_meshes, canonical_asset_name, name, local_vert, face, transform, vert, uv, ): """SHELL group dispatcher: register (or reuse) the tri asset, add the scene instance, and record an ``ObjectInfo`` entry. Returns the Scene Object for the pin / stitch passes. """ if mesh_ref and mesh_ref in canonical_meshes: asset_name = canonical_asset_name[mesh_ref] else: asset_name = name self._asset.add.tri(asset_name, local_vert, face) _obj = scene.add(asset_name, obj_uuid) if transform is not None: _obj.mat4x4(transform) self._object_info[obj_uuid] = ObjectInfo( type="SHELL", vert=vert, V=vert, F=face, ) if uv is not None: assert len(uv) == len(face), "UV length must match face length." _obj.set_uv(uv) return _obj def _populate_rod(self, scene, name, obj_uuid, local_vert, edge, transform, vert): """ROD group dispatcher: register the rod asset, add the scene instance, and record an ``ObjectInfo`` entry. Returns the Scene Object for the pin / stitch passes. """ _rust.validate_rod_has_edges(name, edge is not None) self._asset.add.rod(name, local_vert, edge) _obj = scene.add(name, obj_uuid) if transform is not None: _obj.mat4x4(transform) self._object_info[obj_uuid] = ObjectInfo(type="ROD", vert=vert) return _obj def _apply_pin_mapping(self, obj, _obj, group_type, vert, V, F, tet_mesh, verbose): """Pin-mapping pass: register pins on ``_obj`` from ``obj['pin']``, building the Blender-to-sim surface transfer for SOLID groups with a surface mapping and falling back to direct per-index pins otherwise. """ if _obj is None or "pin" not in obj: return pin_index = obj["pin"] if verbose: print(f" > pin: {len(pin_index)}") if group_type == "SOLID" and tet_mesh is not None and tet_mesh.has_surface_mapping(): import numpy as np # Build Blender->sim surface transfer support from the # interpolation map. A pinned Blender surface vertex # contributes to every simulation surface vertex whose # in-plane weight is positive on the mapped triangle. # Frame coefs store (c1, c2, c3); the implicit bary # weights are (1-c1-c2, c1, c2), which reduce to the # old bary weights when c3 ~ 0 (vertex lies on the # triangle). c3 (normal offset) does not affect the # pin transfer support. tri_indices_pin, coefs_pin = tet_mesh.surface_map def mapped_surface_vertices(blender_index: int) -> list[int]: ti = tri_indices_pin[blender_index] tri_verts = F[ti] c = coefs_pin[blender_index] w = np.array([1.0 - c[0] - c[1], c[0], c[1]]) sim_verts = [ int(tri_verts[k]) for k in range(len(tri_verts)) if w[k] > 1e-4 ] if not sim_verts: sim_verts = [int(tri_verts[int(np.argmax(w))])] return sim_verts n_surface_verts = len(vert) if len(pin_index) == n_surface_verts: # Use the transpose of the interpolation map: # collect every simulation surface vertex that # receives positive weight from a pinned Blender # surface vertex. sim_verts = sorted({ vi for i in pin_index for vi in mapped_surface_vertices(i) }) holder = _obj.pin(sim_verts) holder._data._blender_pin_indices = list(pin_index) holder._data._tet_V = V holder._data._blender_vert = vert if verbose: print(f" > pin (mapped surface): {len(sim_verts)} verts") else: # Partial pin: each Blender pin vertex maps to the # simulation surface vertices with positive # interpolation weight. for i in pin_index: holder = _obj.pin(mapped_surface_vertices(i)) holder._data._blender_pin_indices = [i] holder._data._tet_V = V holder._data._blender_vert = vert else: # One holder for the whole pinned set. apply_pin_config # later splits it per pin_group_id when the object has # multiple distinct pin vertex groups. (A per-vertex # _obj.pin([i]) loop here made N holders, and a keyframed # pin then wrote N x M operation files for the solver.) if pin_index: _obj.pin(list(pin_index)) def _apply_stitch(self, obj, _obj, name, verbose): """Stitch pass: register a stitch asset and attach it to ``_obj`` when a stitch was resolved during planning. """ if _obj is None: return resolved_stitch = obj.get("_resolved_stitch", obj.get("stitch")) if resolved_stitch is None: return stitch_data = resolved_stitch stitch_name = f"{name}_stitch" if verbose: print(f" > stitch: {len(stitch_data[0])} edges") self._asset.add.stitch(stitch_name, stitch_data) _obj.stitch(stitch_name)