# File: _decoder_.py
# Code: Claude Code and Codex
# Review: Ryoichi Ando (ryoichi.ando@zozo.com)
# License: Apache v2.0
import os
import pickle
from dataclasses import dataclass
from typing import Any, Optional
from . import _rust # type: ignore[attr-defined]
from ._asset_ import AssetManager
from ._mesh_ import MeshManager
from ._plot_ import PlotManager
from ._scene_ import FixedScene, Scene
from ._session_ import FixedSession, Session
from ._utils_ import Utils
@dataclass
class ObjectInfo:
"""Per-object metadata recorded by :class:`SceneDecoder.populate_objects`.
Field shape varies by ``type``:
* ``SOLID`` populates ``vert``, ``V``, ``F``, and optionally ``orig_to_sim``.
* ``SHELL`` populates ``vert``, ``V`` (== ``vert``), ``F``.
* ``ROD`` populates ``vert`` only; ``V`` and ``F`` stay ``None``.
The Rust ``cross_stitch_apply_batch`` consumer reads via dict access,
so :meth:`to_dict` is used at the FFI boundary.
"""
type: str
vert: Any
V: Optional[Any] = None
F: Optional[Any] = None
orig_to_sim: Optional[Any] = None
def to_dict(self) -> dict:
d: dict = {"type": self.type, "vert": self.vert}
if self.V is not None:
d["V"] = self.V
if self.F is not None:
d["F"] = self.F
if self.orig_to_sim is not None:
d["orig_to_sim"] = self.orig_to_sim
return d
[docs]
class BlenderApp:
"""Attach to a project exported by the Blender addon and build a runnable session.
The addon writes ``data.pickle`` and ``param.pickle`` into a
per-project directory under ``~/.local/share/ppf-cts/git-<branch>/<name>/``.
:meth:`open` is the canonical entry point: it decodes those pickles,
populates the scene, applies parameters, and builds a
``FixedSession`` accessible via :attr:`scene` and :attr:`session`.
Example:
Canonical notebook cell (as generated by the Blender addon),
attach to a transferred project and run it::
from frontend import BlenderApp
app = BlenderApp.open("rod-example-3")
app.scene.report()
app.scene.preview()
app.session.run()
app.session.preview()
"""
def __init__(self, name: str, verbose: bool = False, progress_callback=None):
"""Initialize the BlenderApp.
Args:
name (str): The name of the Blender project.
verbose (bool): Enable verbose logging.
progress_callback: Optional callable ``fn(progress: float, info: str)`` invoked during long-running operations. ``progress`` is in ``[0.0, 1.0]``.
"""
self._verbose = verbose
self._name = name
self._progress_callback = progress_callback
# Path math + branch resolution lives in Rust
# (`dec::blender_app_paths`), so the Python and Windows-native
# branches stay aligned with `App.get_data_dirpath` and the
# server's `make_root`.
paths = _rust.blender_app_paths(os.path.abspath(__file__), name)
self._data_dirpath = paths["data_dirpath"]
self._root = paths["root"]
cache_root = paths["cache_root"]
os.makedirs(cache_root, exist_ok=True)
self._asset_manager = AssetManager()
self._mesh_manager = MeshManager(cache_root)
self._scene = None
self._session = None
self._fixed_scene = None
self._fixed_session = None
def __getstate__(self):
state = self.__dict__.copy()
state["_progress_callback"] = None
return state
def _report_progress(self, progress: float, info: str):
if self._progress_callback is not None:
self._progress_callback(progress, info)
[docs]
@classmethod
def open(
cls,
name: str,
verbose: bool = False,
progress_callback=None,
) -> "BlenderApp":
"""Open a Blender project and build its scene and session.
Behavior:
1. If the Blender addon has uploaded both ``data.pickle`` and
``param.pickle`` under the project root, ``populate().make()``
is invoked to build a fresh scene and session in this process.
2. Otherwise, ``FileNotFoundError`` is raised. The user must
click "Transfer" in the Blender addon first.
After ``open()``, ``app.scene`` and ``app.session`` are the built
``FixedScene`` / ``FixedSession`` objects, ready for report,
preview, run, and stream.
Example:
Notebook cell generated by the Blender addon, attaching to
the transferred project and running it::
from frontend import BlenderApp
app = BlenderApp.open("<project>")
app.scene.preview()
app.session.run()
app.session.preview()
"""
app = cls(name, verbose=verbose, progress_callback=progress_callback)
data_path = os.path.join(app._root, "data.pickle")
param_path = os.path.join(app._root, "param.pickle")
if not (os.path.exists(data_path) and os.path.exists(param_path)):
raise FileNotFoundError(
f"Blender has not uploaded project '{name}' to {app._root}. "
f"In the Blender addon, click 'Transfer' first."
)
# Invariant: whenever data.pickle and param.pickle both exist,
# upload_id.txt must also exist. It's stamped by the Blender
# addon's upload handler at write time and is the identity that
# client build-tracking pins against. If it's missing, the
# project predates upload_id tracking (or someone wrote pickles
# out-of-band); refuse to build and tell the user how to migrate.
upload_id_path = os.path.join(app._root, "upload_id.txt")
if not os.path.exists(upload_id_path):
raise FileNotFoundError(
f"Project '{name}' at {app._root} has data.pickle and "
f"param.pickle but no upload_id.txt. This project predates "
f"upload_id tracking. Re-upload from the Blender addon, "
f"or run `python tools/backfill_upload_ids.py` to stamp a "
f"fresh id on every legacy project."
)
app.populate().make()
return app
[docs]
def populate(self) -> "BlenderApp":
"""Populate the scene with objects decoded from ``data.pickle``.
Also peeks at ``param.pickle`` for per-group fTetWild overrides so
that tetrahedralization can use them at build time; full
parameter application is deferred to :meth:`make`.
Example:
Two-stage construction (useful when you want to inspect the
mutable scene before parameters are applied)::
from frontend import BlenderApp
app = BlenderApp("my-project")
app.populate()
app.make()
app.session.run()
"""
data_path = os.path.join(self._root, "data.pickle")
assert os.path.exists(data_path)
self._report_progress(0.05, "Loading scene data...")
self._asset_manager = AssetManager()
self._scene = Scene("scene", PlotManager(), self._asset_manager)
self._scene_decoder = SceneDecoder(
data_path,
self._asset_manager,
self._mesh_manager,
)
# Peek at param.pickle for per-group fTetWild overrides so
# populate_objects can pass them into tetrahedralize() at build
# time. The full param.pickle is applied later in make(). Fan the
# group-level kwargs dict out to every UUID in the group via
# the Rust helper so this loop stays aligned with the
# ParamDecoder traversal.
ftetwild_by_uuid: dict = {}
param_path = os.path.join(self._root, "param.pickle")
if os.path.exists(param_path):
# See ``frontend/_cbor_bridge_.py`` for the byte-sniff contract.
from . import _cbor_bridge_ as _cbor
with open(param_path, "rb") as _pf:
_blob = _pf.read()
if _cbor.is_cbor(_blob):
_pdata = _cbor.loads_param(_blob)
else:
_pdata = pickle.loads(_blob)
ftetwild_by_uuid = _rust.extract_ftetwild_by_uuid(_pdata.get("group", []))
self._scene_decoder.populate_objects(
self._scene,
verbose=self._verbose,
progress_callback=lambda progress, info: self._report_progress(
0.10 + 0.50 * progress,
info,
),
ftetwild_by_uuid=ftetwild_by_uuid,
)
return self
[docs]
def make(self) -> "BlenderApp":
"""Apply ``param.pickle`` to the populated scene and build a runnable session.
Applies per-object parameters, pin configuration, cross-stitch
constraints, explicit merge pairs, and invisible colliders to the
scene, then builds the fixed scene and session. The resulting
state is persisted to ``app_state.pickle`` on a best-effort basis.
Example:
Finish the build after :meth:`populate`, then run the session::
from frontend import BlenderApp
app = BlenderApp("my-project").populate().make()
app.session.run()
app.session.preview()
"""
assert self._scene is not None, "Scene must be populated before making the app"
param_path = os.path.join(self._root, "param.pickle")
assert os.path.exists(param_path)
self._report_progress(0.65, "Applying object parameters...")
param_decoder = ParamDecoder().set_path(param_path)
param_decoder.apply_to_objects(self._scene, verbose=self._verbose)
self._report_progress(0.72, "Applying pin configuration...")
param_decoder.apply_pin_config(self._scene, verbose=self._verbose)
if param_decoder.cross_stitch:
self._report_progress(0.78, "Applying cross-stitch constraints...")
# Whole-batch port: Rust validates endpoints, projects
# anchors for SOLID targets, builds each canonical dict,
# and appends directly to ``self._scene._cross_stitch`` so
# no Python-side per-entry append loop survives.
obj_info_dict = {
k: v.to_dict() for k, v in self._scene_decoder._object_info.items()
}
_rust.cross_stitch_apply_batch(
param_decoder.cross_stitch,
obj_info_dict,
self._scene._cross_stitch,
self._verbose,
)
if param_decoder.explicit_merge_pairs:
self._scene.set_explicit_merge_pairs(param_decoder.explicit_merge_pairs)
self._report_progress(0.82, "Applying invisible colliders...")
param_decoder.apply_invisible_colliders(self._scene, verbose=self._verbose)
self._report_progress(0.84, "Building scene: preparing objects...")
self._fixed_scene = self._scene.build(
progress_callback=lambda progress, info: self._report_progress(
0.84 + 0.10 * progress,
info,
)
)
self._report_progress(0.94, "Initializing session...")
self._session = Session(
self._name,
self._root,
os.path.dirname(os.path.dirname(__file__)),
self._data_dirpath,
"session",
).init(self._fixed_scene)
self._report_progress(0.97, "Applying session parameters...")
param_decoder.apply_to_session(self._session, verbose=self._verbose)
self._fixed_session = self._session.build()
self._persist_app_state()
self._report_progress(0.97, "Build decode complete.")
return self
def _persist_app_state(self) -> None:
"""Write this BlenderApp to ``{_root}/app_state.pickle`` so that
both the Blender-addon server and future notebook runs can
discover the built state. The Blender-addon server reads it back
through ``server/engine.py:AppState.load`` when it needs to adopt
an externally-built session.
Raises any persistence error (disk full, permission denied,
pickling failure) rather than silently swallowing it: a failed
persist means the server-side state machine won't pick up this
build, and the caller should know.
Format note: this site uses raw ``pickle`` rather than a CBOR
envelope. The payload is the entire ``BlenderApp`` object
(asset / scene / mesh / fixed_session graph), no manager class
on that graph carries a hand-written CBOR schema, and the
consumer lives outside this repo (the Blender-addon server's
``AppState.load``). The producer-side wrappers in
:mod:`frontend._session_` and :mod:`frontend._app_` use CBOR
envelopes; this is the only intentional raw-pickle write in
:mod:`frontend.`
"""
# _process holds a subprocess Popen with non-picklable locks.
if self._fixed_session is not None:
self._fixed_session._process = None
paths = _rust.app_state_persist_paths(self._root)
os.makedirs(self._root, exist_ok=True)
with open(paths["tmp_path"], "wb") as f:
pickle.dump(self, f)
os.replace(paths["tmp_path"], paths["final_path"])
@property
def scene(self) -> FixedScene:
"""The built fixed scene. Use this for ``.report()``,
``.preview()``, etc. The mutable pre-build ``Scene`` remains
accessible as ``_scene`` for advanced callers.
Example:
Open a Blender-authored project and inspect the built scene::
from frontend import BlenderApp
app = BlenderApp.open("rod-example-3")
app.scene.report()
app.scene.preview()
"""
assert self._fixed_scene is not None, (
"Scene is not built yet. Call BlenderApp.open() or "
".populate().make() first."
)
return self._fixed_scene
@property
def session(self) -> FixedSession:
"""The built fixed session. Use this for ``.run()``,
``.preview()``, ``.stream()``, etc.
Example:
Run a Blender-authored session and stream live output::
from frontend import BlenderApp
app = BlenderApp.open("rod-example-3")
app.session.run()
app.session.preview()
app.session.stream()
"""
assert self._fixed_session is not None, (
"Session is not built yet. Call BlenderApp.open() or "
".populate().make() first."
)
return self._fixed_session
class ParamDecoder:
"""Load and apply ``param.pickle`` written by the Blender addon.
Parameters are split across object-level (velocity, fTetWild hints,
per-key ``param.set`` values), pin configuration, cross-stitch
constraints, explicit merge pairs, invisible walls / spheres, and
session-level scene settings. :meth:`set_path` loads the pickle,
then the ``apply_*`` methods dispatch each section.
Example:
Apply a pickle to an existing scene and session (this is what
:meth:`BlenderApp.make` does internally)::
from frontend._decoder_ import ParamDecoder
decoder = ParamDecoder().set_path("/path/to/param.pickle")
decoder.apply_to_objects(scene)
decoder.apply_pin_config(scene)
decoder.apply_invisible_colliders(scene)
fixed_scene = scene.build()
decoder.apply_to_session(session)
"""
def __init__(self):
self._data = None
def set_path(self, filepath: str) -> "ParamDecoder":
"""Load parameter data from a pickle file and cache it on this decoder.
Args:
filepath (str): Path to the pickle file. Must end in ``.pickle``.
Returns:
ParamDecoder: ``self`` for chaining.
Example:
Chain the load with a subsequent apply call::
from frontend._decoder_ import ParamDecoder
decoder = ParamDecoder().set_path("/path/to/param.pickle")
decoder.apply_to_objects(scene)
"""
_rust.validate_pickle_extension(filepath)
with open(filepath, "rb") as f:
blob = f.read()
from . import _cbor_bridge_ as _cbor
if _cbor.is_cbor(blob):
self._data = _cbor.loads_param(blob)
else:
self._data = pickle.loads(blob)
_rust.validate_param_top_keys(
"group" in self._data, "scene" in self._data
)
self._pin_config = self._data.get("pin_config", {})
self._explicit_merge_pairs = self._data.get("explicit_merge_pairs", [])
self._cross_stitch = self._data.get("cross_stitch", [])
return self
@property
def explicit_merge_pairs(self) -> list:
return self._explicit_merge_pairs
@property
def cross_stitch(self) -> list:
return self._cross_stitch
def apply_to_objects(self, scene: Scene, verbose: bool = False):
"""Apply the loaded parameter data to the objects in ``scene``.
Call :meth:`set_path` first. Per-object dicts (velocity,
velocity-schedule, collision-windows) are keyed by UUID; other
keys are forwarded to ``obj.param.set``. fTetWild overrides are
consumed at populate-time and skipped here.
Args:
scene (Scene): The scene to which the parameters will be applied.
verbose (bool): Enable verbose logging.
Example:
Load a pickle and apply per-object parameters to a populated scene::
from frontend._decoder_ import ParamDecoder
decoder = ParamDecoder().set_path("/path/to/param.pickle")
decoder.apply_to_objects(scene, verbose=True)
"""
assert self._data is not None, "Parameter data not set. Call set_path() first."
if verbose:
print("=== Object Parameters ===")
for group_entry in self._data["group"]:
params, objects = group_entry[0], group_entry[1]
# Third tuple slot holds UUIDs aligned with ``objects``.
# Per-object dicts (velocity / velocity-schedule / collision-
# windows) are keyed on UUID by the encoder.
_rust.validate_param_group_has_uuids(len(group_entry))
obj_uuids = group_entry[2]
for obj_name, obj_uuid in zip(objects, obj_uuids):
_rust.validate_param_object_uuid(obj_name, obj_uuid)
if verbose:
print(f"*** name: {obj_name} (uuid={obj_uuid}) ***")
obj = scene.select(obj_uuid)
obj.param.clear_all()
for key, val in params.items():
if verbose:
print(f" {key}: {val}")
if key == "velocity":
if isinstance(val, dict):
v = val.get(obj_uuid) if obj_uuid else None
if v is not None:
obj.velocity(*v)
else:
obj.velocity(*val)
elif key == "velocity-schedule":
if isinstance(val, dict):
s = val.get(obj_uuid) if obj_uuid else None
if s:
obj.velocity_schedule(s)
else:
obj.velocity_schedule(val)
elif key == "collision-windows":
if isinstance(val, dict):
w = val.get(obj_uuid) if obj_uuid else None
if w:
obj.collision_windows(w)
elif key == "ftetwild":
# Consumed at populate-time via the param.pickle peek;
# no per-object ParamHolder slot by design (would
# break concat_tet_param / concat_tri_param key-set
# equality in _scene_.extend_param).
pass
else:
obj.param.set(key, val)
def apply_to_session(self, session: Session, verbose: bool = False):
"""Apply scene-level and dynamic parameters from the loaded data to ``session``.
Call :meth:`set_path` first. Static scene parameters are forwarded to
``session.param.set``; ``inactive-momentum`` is additionally set as a
dynamic hold. Dynamic parameter keyframes from ``dyn_param`` are
applied via ``session.param.dyn(...)``.
Args:
session (Session): The session to which the parameters will be applied.
verbose (bool): Enable verbose logging.
Example:
Apply scene-level and dynamic parameters to a freshly initialized session::
from frontend._decoder_ import ParamDecoder
decoder = ParamDecoder().set_path("/path/to/param.pickle")
session = app.session.create(scene).init(fixed_scene)
decoder.apply_to_session(session)
session.build().start()
"""
assert self._data is not None, "Parameter data not set. Call set_path() first."
if verbose:
print("=== Session Parameters ===")
session.param.clear_all()
for k, v in self._data["scene"].items():
if verbose:
print(f" {k}: {v}")
if k == "inactive-momentum" and v > 0:
inactive_momentum_time = float(v)
session.param.set("inactive-momentum")
session.param.dyn("inactive-momentum").time(inactive_momentum_time).hold().change(False)
else:
session.param.set(k, v)
# Apply dynamic parameters if present
dyn_param = self._data.get("dyn_param", {})
for key, entries in dyn_param.items():
if len(entries) < 2:
continue
if verbose:
print(f" dyn({key}): {len(entries)} keyframes")
# First entry is initial value (already set via static params above).
# Subsequent entries are (time, value, is_hold) tuples.
builder = session.param.dyn(key)
for entry in entries[1:]:
t, v = entry[0], entry[1]
is_hold = entry[2] if len(entry) > 2 else False
if is_hold:
builder.time(t).hold()
else:
if isinstance(v, list) and len(v) == 1:
builder.time(t).change(v[0])
else:
builder.time(t).change(v)
def apply_pin_config(self, scene, verbose: bool = False):
"""Apply saved pin configuration to scene pin holders.
Applies unpin time, pull strength, pin group id, embedded move
keyframes, and explicit pin operations (spin / scale / move_by /
torque) to each pin holder that has a matching config entry.
Call after the scene is populated and pins are created.
Example:
Reapply pin settings after populating objects::
from frontend._decoder_ import ParamDecoder
decoder = ParamDecoder().set_path("/path/to/param.pickle")
decoder.apply_to_objects(scene)
decoder.apply_pin_config(scene, verbose=True)
"""
if not self._pin_config:
return
if verbose:
print("=== Pin Config ===")
for dyn_name, dyn_obj in scene.object_dict.items():
# pin_config keyed by UUID; objects are registered by UUID
obj_cfg = self._pin_config.get(dyn_name, {})
if not obj_cfg:
continue
# Collapse the single all-vertex holder (built by
# _apply_pin_mapping) into one holder per pin_group_id, so a
# group's ops/animation apply once to the whole group. Without
# this a keyframed N-vertex pin became N one-vertex holders,
# each carrying every keyframe op -> N*M solver pin files.
self._regroup_pin_holders(dyn_obj, obj_cfg)
for pin_holder in dyn_obj.pin_list:
# For solid objects, use stored Blender indices for config lookup
lookup_indices = getattr(pin_holder._data, '_blender_pin_indices', None) or pin_holder.index
for vi in lookup_indices:
cfg = obj_cfg.get(vi)
if cfg is None:
continue
self._apply_pin_cfg_entry(
pin_holder, dyn_name, vi, cfg, obj_cfg, verbose,
)
break
@staticmethod
def _regroup_pin_holders(dyn_obj, obj_cfg):
"""Rebuild ``dyn_obj``'s pin holders as one holder per pin group.
``_apply_pin_mapping`` registers a single holder spanning every
pinned vertex of an object. An object with several distinct pin
vertex groups (e.g. separate ``left`` / ``right`` pins) needs one
holder per group so each group's config is applied to exactly its
vertices. Grouping key is ``pin_group_id`` from ``obj_cfg``.
Skips SOLID surface-mapped holders: those carry
``_blender_pin_indices`` and a sim-vertex set the regroup would
lose, and ``_apply_pin_mapping`` already builds them correctly.
"""
holders = list(dyn_obj.pin_list)
if not holders:
return
if any(getattr(h._data, "_blender_pin_indices", None)
for h in holders):
return
all_idx = [int(i) for h in holders for i in h.index]
gid_order: list = []
gid_verts: dict = {}
for vi in all_idx:
cfg = obj_cfg.get(vi)
# Verts with no config entry (plain hold-fixed pins) group
# under one sentinel key so they stay a single holder too.
key = cfg.get("pin_group_id") if cfg else "__plain__"
if key not in gid_verts:
gid_verts[key] = []
gid_order.append(key)
gid_verts[key].append(vi)
# Already one-holder-per-group: nothing to do.
if len(holders) == len(gid_order):
return
dyn_obj.pin_list.clear()
for key in gid_order:
dyn_obj.pin(gid_verts[key])
def _apply_pin_cfg_entry(self, pin_holder, dyn_name, vi, cfg, obj_cfg,
verbose):
"""Apply a single pin-config entry to ``pin_holder``."""
if "unpin_time" in cfg:
pin_holder.unpin(cfg["unpin_time"])
if verbose:
print(f" {dyn_name}[{vi}]: unpin_time={cfg['unpin_time']}")
if "pull_strength" in cfg:
pin_holder.pull(cfg["pull_strength"])
if "pin_group_id" in cfg:
pin_holder._data.pin_group_id = cfg["pin_group_id"]
if verbose:
print(f" {dyn_name}[{vi}]: pin_group_id={cfg['pin_group_id']}")
if "operations" not in cfg and "embedded_move_index" not in cfg:
return
embedded_ops = self._build_embedded_move_ops(pin_holder, obj_cfg)
embedded_move_index = cfg.get("embedded_move_index", -1)
explicit_operations = cfg.get("operations", [])
# Validate: torque cannot be mixed with kinematic ops
_rust.validate_pin_op_types(
[str(o.get("type", "")) for o in explicit_operations],
dyn_name,
)
# Clear existing operations
pin_holder._data.operations = []
# Re-add in the correct order
explicit_idx = 0
embedded_inserted = False
total = len(explicit_operations) + (1 if embedded_move_index >= 0 else 0)
if total == 0 and embedded_ops:
# Only embedded move, no explicit ops
pin_holder._data.operations.extend(embedded_ops)
embedded_inserted = True
for pos in range(total):
if pos == embedded_move_index and not embedded_inserted:
# Insert embedded move operations at this position
pin_holder._data.operations.extend(embedded_ops)
embedded_inserted = True
else:
if explicit_idx < len(explicit_operations):
op = explicit_operations[explicit_idx]
explicit_idx += 1
self._dispatch_pin_op(pin_holder, op)
# If embedded move wasn't inserted yet (index beyond end), append
if not embedded_inserted and embedded_move_index >= 0:
pin_holder._data.operations.extend(embedded_ops)
if verbose:
print(f" {dyn_name}[{vi}]: operations={len(pin_holder.operations)}")
@staticmethod
def _build_embedded_move_ops(pin_holder, obj_cfg):
"""Build embedded ``MoveByOperation`` segments from ``pin_anim``.
Each pinned vertex's ``PinData`` carries its own single-entry
``pin_anim`` track (``{vertex_index: PinAnim}``). Consecutive
keyframes become ``MoveByOperation`` segments whose ``delta`` is
the genuine per-vertex displacement, so the pin deforms โ it is
not rigidly translated.
"""
import numpy as np
from ._scene_pin_ import MoveByOperation
index = list(pin_holder.index)
n_pin_verts = len(index)
# Each vertex's own track lives in its own cfg entry, keyed by
# that same vertex index. Gather them in pin_holder.index order.
tracks = []
times = None
for v in index:
cfg = obj_cfg.get(v)
track = cfg.get("pin_anim", {}).get(v) if cfg else None
tracks.append(track)
if track is not None and times is None:
times = list(track["time"])
if times is None or len(times) < 2:
return []
n_frames = len(times)
# Per-vertex absolute-position tracks, aligned to the
# pin_holder.index order. A vertex with no track keeps a zero
# track (zero delta โ unaffected by the embedded move).
positions = np.zeros((n_frames, n_pin_verts, 3), dtype=np.float64)
for j, track in enumerate(tracks):
if track is not None:
pos = np.asarray(track["position"], dtype=np.float64)
if pos.shape == (n_frames, 3):
positions[:, j, :] = pos
embedded_ops: list = []
for k in range(n_frames - 1):
embedded_ops.append(MoveByOperation(
delta=np.ascontiguousarray(positions[k + 1] - positions[k]),
t_start=times[k],
t_end=times[k + 1],
transition="linear",
))
return embedded_ops
def _dispatch_pin_op(self, pin_holder, op):
"""Dispatch a single explicit pin op to its per-type handler."""
t_start = op.get("t_start", 0)
t_end = op.get("t_end", 1)
transition = op.get("transition", "linear")
if transition != "linear":
pin_holder.interp(transition)
op_type = op["type"]
if op_type == "spin":
self._apply_pin_op_spin(pin_holder, op, t_start, t_end)
elif op_type == "scale":
self._apply_pin_op_scale(pin_holder, op, t_start, t_end)
elif op_type == "move_by":
self._apply_pin_op_move_by(pin_holder, op, t_start, t_end)
elif op_type == "torque":
self._apply_pin_op_torque(pin_holder, op, t_start, t_end)
if transition != "linear":
pin_holder.interp("linear")
@staticmethod
def _apply_pin_op_spin(pin_holder, op, t_start, t_end):
center_mode = op.get("center_mode", "absolute")
pin_holder.spin(
center=op.get("center"),
axis=op.get("axis", [0, 1, 0]),
angular_velocity=op.get("angular_velocity", 360),
t_start=t_start,
t_end=t_end,
center_mode=center_mode,
)
@staticmethod
def _apply_pin_op_scale(pin_holder, op, t_start, t_end):
center_mode = op.get("center_mode", "absolute")
pin_holder.scale(
op.get("factor", 1.0),
t_start=t_start,
t_end=t_end,
center=op.get("center"),
center_mode=center_mode,
)
@staticmethod
def _apply_pin_op_move_by(pin_holder, op, t_start, t_end):
pin_holder.move_by(
op.get("delta", [0, 0, 0]),
t_start=t_start,
t_end=t_end,
)
@staticmethod
def _apply_pin_op_torque(pin_holder, op, t_start, t_end):
blender_hint = int(op.get("hint_vertex", -1))
# Translate Blender hint vertex to nearest tet vertex by position
# (robust to re-tetrahedralization). Closest-vertex search
# lives in Rust (`dec::closest_vertex_index`).
tet_V = getattr(pin_holder._data, '_tet_V', None)
blender_vert = getattr(pin_holder._data, '_blender_vert', None)
if tet_V is not None and blender_vert is not None and blender_hint >= 0:
import numpy as _np
hint_pos = _np.ascontiguousarray(
_np.asarray(blender_vert[blender_hint], dtype=_np.float64)
)
tet_arr = _np.ascontiguousarray(
_np.asarray(tet_V, dtype=_np.float64)
)
sim_hint = int(_rust.closest_vertex_index(tet_arr, hint_pos))
else:
sim_hint = blender_hint
pin_holder.torque(
magnitude=op.get("magnitude", 1.0),
axis_component=int(op.get("axis_component", 2)),
hint_vertex=sim_hint,
t_start=t_start,
t_end=t_end,
)
def apply_invisible_colliders(self, scene, verbose: bool = False):
"""Create invisible wall and sphere colliders on the scene from the loaded pickle data.
Must be called BEFORE ``scene.build()``.
Example:
Add the pickle's invisible walls and spheres just before building::
from frontend._decoder_ import ParamDecoder
decoder = ParamDecoder().set_path("/path/to/param.pickle")
decoder.apply_invisible_colliders(scene)
fixed_scene = scene.build()
"""
ic = self._data.get("invisible_colliders", {})
if not ic:
return
if verbose:
print("=== Invisible Colliders ===")
for w in ic.get("walls", []):
wall = scene.add.invisible.wall(w["position"], w["normal"])
wall.param.set("contact-gap", w.get("contact_gap", 1e-3))
wall.param.set("friction", w.get("friction", 0.0))
wall.param.set("active-duration", w.get("active_duration", -1.0))
thickness = _rust.validate_invisible_collider_thickness(
"wall", float(w.get("thickness", 1.0))
)
wall.param.set("thickness", thickness)
keyframes = w.get("keyframes", [])
for kf in keyframes[1:]:
wall.move_to(kf["position"], kf["time"])
if verbose:
print(f" Wall: pos={w['position']}, normal={w['normal']}, kf={len(keyframes)}")
for s in ic.get("spheres", []):
sphere = scene.add.invisible.sphere(s["position"], s["radius"])
if s.get("hemisphere", False):
sphere.hemisphere()
if s.get("invert", False):
sphere.invert()
sphere.param.set("contact-gap", s.get("contact_gap", 1e-3))
sphere.param.set("friction", s.get("friction", 0.0))
sphere.param.set("active-duration", s.get("active_duration", -1.0))
thickness = _rust.validate_invisible_collider_thickness(
"sphere", float(s.get("thickness", 1.0))
)
sphere.param.set("thickness", thickness)
keyframes = s.get("keyframes", [])
for kf in keyframes[1:]:
sphere.transform_to(kf["position"], kf["radius"], kf["time"])
if verbose:
print(f" Sphere: pos={s['position']}, r={s['radius']}, inv={s.get('invert')}, hemi={s.get('hemisphere')}, kf={len(keyframes)}")
class SceneDecoder:
"""Decode ``data.pickle`` written by the Blender addon into scene objects.
Handles canonical mesh deduplication, per-instance transforms, cached
tetrahedralization for SOLID groups, pin registration with
Blender-to-sim surface mapping, UV assignment for SHELL groups, and
rod / static groups. Used internally by :class:`BlenderApp`.
Example:
Drive the decoder directly to populate a scene (this is what
:meth:`BlenderApp.populate` does internally)::
from frontend._asset_ import AssetManager
from frontend._mesh_ import MeshManager
from frontend._decoder_ import SceneDecoder
assets = AssetManager()
meshes = MeshManager("/tmp/cache")
decoder = SceneDecoder("/path/to/data.pickle", assets, meshes)
decoder.populate_objects(scene, verbose=True)
"""
def __init__(
self, filepath: str, asset_manager: AssetManager, mesh_manager: MeshManager
):
_rust.validate_pickle_extension(filepath)
with open(filepath, "rb") as f:
blob = f.read()
# See ``frontend/_cbor_bridge_.py`` for the byte-sniff contract.
from . import _cbor_bridge_ as _cbor
if _cbor.is_cbor(blob):
self._data = _cbor.loads_scene(blob)
else:
self._data = pickle.loads(blob)
self._asset = asset_manager
self._mesh = mesh_manager
self._object_info: dict[str, ObjectInfo] = {} # uuid -> ObjectInfo for stitch generation
def _tetra_cache_path(self, tri_mesh) -> str:
return tri_mesh.cache_path(_rust.tetra_cache_filename(tri_mesh.hash))
def _summarize_tetra_jobs(self, tetra_jobs: list[dict]) -> str:
return _rust.summarize_tetra_jobs(tetra_jobs)
@staticmethod
def _apply_transform(local_vert, transform):
"""Apply a 4x4 transform to local-space vertices and return world-space vertices as float32."""
import numpy as np
mat = np.ascontiguousarray(np.asarray(transform, dtype=np.float64))
v = np.ascontiguousarray(np.asarray(local_vert, dtype=np.float64))
return _rust.apply_transform_4x4(v, mat)
def _resolve_object_mesh(self, obj, name, obj_uuid, mesh_ref, canonical_meshes):
"""Resolve local-space mesh data for an object during the second
pass. Returns ``(local_vert, face, edge, uv, vert)`` where
``vert`` is the world-space vertices used by ``ObjectInfo`` and
the pin-mapping pass.
"""
if mesh_ref and mesh_ref in canonical_meshes:
ref = canonical_meshes[mesh_ref]
local_vert = ref["vert"]
face = ref["face"]
edge = ref.get("edge")
uv = ref.get("uv")
elif "vert" in obj:
local_vert = obj["vert"] # Local if transform present, world if legacy
face = obj.get("face")
edge = obj.get("edge")
uv = obj.get("uv", None)
else:
_rust.validate_object_has_mesh(name, obj_uuid, False)
raise AssertionError("unreachable") # pragma: no cover
# World-space vertices (for object_info used by pin mapping etc.)
vert = obj.get("_resolved_vert", local_vert)
return local_vert, face, edge, uv, vert
@staticmethod
def _log_object_mesh(name, vert, face, edge, uv):
"""Verbose-mode per-object summary printed during the second pass."""
if edge is not None:
print(
f" * name: {name}, vert: {vert.shape}, edge: {edge.shape}, uv: {len(uv) if uv is not None else 'None'}"
)
else:
print(
f" * name: {name}, vert: {vert.shape}, face: {face.shape if face is not None else 'None'}, uv: {len(uv) if uv is not None else 'None'}"
)
def populate_objects(self, scene: Scene, verbose: bool = False, progress_callback=None, ftetwild_by_uuid: dict | None = None) -> Scene:
"""Populate ``scene`` with objects from the decoder's pickle data.
Handles STATIC, SOLID, SHELL, and ROD groups, including canonical
mesh deduplication by UUID, per-instance transforms, cached
tetrahedralization (with per-UUID fTetWild overrides), pin
registration with Blender-to-sim surface mapping, and stitches.
Args:
scene (Scene): The scene to populate.
verbose (bool): Enable verbose logging.
progress_callback: Optional callable ``fn(progress: float, info: str)`` invoked during loading. ``progress`` is in ``[0.0, 1.0]``.
ftetwild_by_uuid (dict | None): Optional mapping of object UUID to fTetWild keyword arguments used during tetrahedralization.
Returns:
Scene: The populated scene.
Example:
Populate a fresh scene from a Blender pickle::
from frontend._asset_ import AssetManager
from frontend._mesh_ import MeshManager
from frontend._decoder_ import SceneDecoder
decoder = SceneDecoder(
"/path/to/data.pickle", AssetManager(), MeshManager("/tmp/cache"),
)
decoder.populate_objects(scene, verbose=True)
fixed_scene = scene.build()
"""
plan = self._plan_build(progress_callback)
object_entries = plan["object_entries"]
canonical_meshes = plan["canonical_meshes"]
canonical_asset_name = plan["canonical_asset_name"]
tetra_jobs = plan["tetra_jobs"]
progress = plan["progress"]
report = progress["report"]
report(f"Build plan: {self._summarize_tetra_jobs(tetra_jobs)}")
progress["completed"] += 0.5
object_iter = iter(object_entries)
for group in self._data:
objects = group.get("object", None)
assert objects is not None, "Object data not found in the group."
group_type = group.get("type")
if verbose:
print(f"--- new group: {group_type} ---")
for obj in objects:
entry = next(object_iter)
name = obj.get("name", "")
obj_uuid = obj.get("uuid", "")
_rust.validate_scene_object_identity(name, obj_uuid)
mesh_ref = obj.get("_mesh_ref_resolved")
transform = obj.get("transform")
local_vert, face, edge, uv, vert = self._resolve_object_mesh(
obj, name, obj_uuid, mesh_ref, canonical_meshes,
)
report(f"Loading {group_type}: {name}...")
if verbose:
self._log_object_mesh(name, vert, face, edge, uv)
tet_mesh = None
V = None
F = None
if group_type == "STATIC":
_obj = self._populate_static(
scene, obj, name, obj_uuid, local_vert, face, transform, verbose,
)
elif group_type == "SOLID":
_obj, tet_mesh, V, F = self._populate_solid(
scene, obj, entry, name, obj_uuid, vert,
object_entries, tetra_jobs, ftetwild_by_uuid, progress,
)
elif group_type == "SHELL":
_obj = self._populate_shell(
scene, obj_uuid, mesh_ref, canonical_meshes,
canonical_asset_name, name, local_vert, face, transform, vert, uv,
)
elif group_type == "ROD":
_obj = self._populate_rod(
scene, name, obj_uuid, local_vert, edge, transform, vert,
)
else:
_rust.validate_group_type(group_type)
_obj = None
self._apply_pin_mapping(
obj, _obj, group_type, vert, V, F, tet_mesh, verbose,
)
self._apply_stitch(obj, _obj, name, verbose)
progress["completed"] += entry["base_weight"]
report(f"Loaded {group_type}: {name}")
return scene
def _plan_build(self, progress_callback):
"""First pass: build canonical mesh table, resolve mesh refs, and
size the tetra-job list. Returns a dict with ``object_entries``,
``canonical_meshes``, ``canonical_asset_name``, ``tetra_jobs``,
and a ``progress`` sub-dict carrying mutable counters plus the
``report`` callable used by the second pass.
"""
planning_steps = sum(len(group.get("object", [])) for group in self._data)
planning_weight = max(1.0, 0.35 * planning_steps)
# Preallocate ``object_entries`` to ``planning_steps`` slots so
# the planning loop fills slots by index instead of growing the
# list one ``append`` at a time. Each slot is later overwritten
# with a fully-populated entry dict.
object_entries: list = [None] * planning_steps
# The ``progress`` dict carries mutable counters across the
# planning loop, the per-group dispatchers, and ``report``.
progress = {"completed": 0.0, "total": 0.0}
object_work = 0.0
# First-pass tetra-job counter; the dedup-aware rebuild below
# is what actually feeds ``_summarize_tetra_jobs`` so this just
# stamps a provisional ``tetra_index``.
tetra_idx_first_pass = 0
entry_idx = 0
# Mesh deduplication: canonical mesh data keyed by UUID
canonical_meshes: dict = {}
def report(info: str, total_work_override: float | None = None):
if progress_callback is not None:
total = progress["total"] if total_work_override is None else total_work_override
progress_callback(
progress["completed"] / total if total > 0 else 1.0,
info,
)
progress["report"] = report
# First pass: resolve mesh_ref and build canonical mesh table.
# canonical_meshes is keyed by UUID (encoder sends mesh_ref as UUID).
# Also keep asset_name (Blender name) for asset registration.
canonical_asset_name: dict = {}
for group in self._data:
objects = group.get("object", None)
assert objects is not None, "Object data not found in the group."
for obj in objects:
name = obj.get("name", "")
obj_uuid = obj.get("uuid", "")
_rust.validate_scene_object_identity(name, obj_uuid)
if "mesh_ref" not in obj and "vert" in obj:
# Canonical mesh: store local data for referenced instances
canonical_meshes[obj_uuid] = {
"vert": obj.get("vert"),
"face": obj.get("face"),
"edge": obj.get("edge"),
"uv": obj.get("uv", None),
"stitch": obj.get("stitch", None),
}
canonical_asset_name[obj_uuid] = name
planning_increment = planning_weight / planning_steps if planning_steps > 0 else 0.0
report("Scanning build plan...", planning_weight)
for group in self._data:
objects = group.get("object", None)
assert objects is not None, "Object data not found in the group."
group_type = group.get("type")
for obj in objects:
name = obj.get("name")
report(
f"Scanning build plan: {name}...",
planning_weight,
)
# Resolve mesh data: either from obj directly or via mesh_ref
mesh_ref = obj.get("mesh_ref")
transform = obj.get("transform")
if mesh_ref is not None:
_rust.validate_mesh_ref_known(
name, str(mesh_ref), mesh_ref in canonical_meshes
)
if mesh_ref is not None and mesh_ref in canonical_meshes:
# Duplicate: get local mesh from canonical, apply transform
ref = canonical_meshes[mesh_ref]
local_vert = ref["vert"]
face = ref["face"]
vert = self._apply_transform(local_vert, transform) if transform is not None else local_vert
obj["_resolved_vert"] = vert
obj["_resolved_face"] = face
obj["_resolved_uv"] = ref.get("uv")
obj["_resolved_stitch"] = ref.get("stitch")
obj["_resolved_edge"] = ref.get("edge")
obj["_mesh_ref_resolved"] = mesh_ref
elif transform is not None and "vert" in obj:
# Canonical with transform: apply transform to local vertices
vert = self._apply_transform(obj["vert"], transform)
obj["_resolved_vert"] = vert
obj["_resolved_face"] = obj.get("face")
obj["_resolved_uv"] = obj.get("uv")
obj["_resolved_stitch"] = obj.get("stitch")
obj["_resolved_edge"] = obj.get("edge")
obj["_mesh_ref_resolved"] = None
else:
# Legacy format: vertices already in world space
obj["_resolved_vert"] = obj.get("vert")
obj["_resolved_face"] = obj.get("face")
obj["_resolved_uv"] = obj.get("uv")
obj["_resolved_stitch"] = obj.get("stitch")
obj["_resolved_edge"] = obj.get("edge")
obj["_mesh_ref_resolved"] = None
entry = {
"group_type": group_type,
"obj": obj,
"name": name,
"base_weight": 1.0,
"tetra_weight": 0.0,
"tetra_index": None,
"tetra_cached": False,
"tri_mesh": None,
}
if group_type == "SOLID":
report(
f"Checking tetra cache: {name}...",
planning_weight,
)
# For dedup: use LOCAL vertices for tet (same hash for duplicates)
if mesh_ref and mesh_ref in canonical_meshes:
tet_vert = canonical_meshes[mesh_ref]["vert"]
tet_face = canonical_meshes[mesh_ref]["face"]
elif "vert" in obj and obj.get("transform") is not None:
tet_vert = obj["vert"] # Local-space
tet_face = obj.get("face")
else:
tet_vert = obj["_resolved_vert"] # Legacy: world-space
tet_face = obj["_resolved_face"]
tri_mesh = self._mesh.create.tri(tet_vert, tet_face)
cached = os.path.exists(self._tetra_cache_path(tri_mesh))
entry["tri_mesh"] = tri_mesh
entry["tetra_cached"] = cached
tetra_idx_first_pass += 1
entry["tetra_index"] = tetra_idx_first_pass
entry["tetra_weight"] = 3.0 if cached else 8.0
object_work += entry["tetra_weight"]
object_entries[entry_idx] = entry
entry_idx += 1
object_work += entry["base_weight"]
progress["completed"] += planning_increment
progress["total"] = planning_weight + 0.5 + object_work
# Dedup-by-tri_mesh-hash + tetra_jobs rebuild + per-entry
# tetra_index reassignment all run in a single Rust pass, so
# the per-entry ``tet_hash_seen`` dict and the per-entry
# ``tetra_jobs.append`` Python loops are eliminated. The Rust
# call returns the rebuilt jobs list and the work_delta that
# gets folded into ``object_work``.
tetra_jobs, _work_delta = _rust.dedup_and_rebuild_tetra_jobs(object_entries)
object_work += _work_delta
progress["total"] = planning_weight + 0.5 + object_work
return {
"object_entries": object_entries,
"canonical_meshes": canonical_meshes,
"canonical_asset_name": canonical_asset_name,
"tetra_jobs": tetra_jobs,
"progress": progress,
}
def _populate_static(self, scene, obj, name, obj_uuid, local_vert, face, transform, verbose):
"""STATIC group dispatcher: rest-pose mesh, transform-keyframe
animation, UI-assigned static ops, or per-vertex deformation
cache. Returns the Scene Object for downstream pin / stitch
passes, or ``None`` for the rest-pose case (no further pin
work needed).
"""
import numpy as np
transform_anim = obj.get("transform_animation", None)
static_ops = obj.get("static_ops", []) or []
static_deform = obj.get("static_deform_animation", None)
_rust.validate_static_anim_xor_ops(
name,
transform_anim is not None,
bool(static_ops),
static_deform is not None,
)
if verbose:
print(
f" > transform_animation: "
f"{'YES' if transform_anim else 'NO'}, "
f"static_ops: {len(static_ops)}, "
f"static_deform_animation: "
f"{'YES' if static_deform else 'NO'}"
)
def _setup_pin_shell():
"""Common setup for animated static objects: a
zero-stiffness shell whose vertices are driven by
pin operations. Returns the Object plus its
rest-frame translation (``(obj, rest_t)``).
"""
self._asset.add.tri(name, local_vert, face)
_o = scene.add(name, obj_uuid)
if transform is not None:
_o.mat4x4(transform)
_o.param.set("density", 0.0)
_o.param.set("young-mod", 0.0)
_o.param.set("poiss-rat", 0.0)
_o.param.set("bend", 0.0)
# Mark for preview suppression: the shell's pins
# are implementation detail, not user-chosen.
_o._is_static_moving = True
rest_t = (
np.asarray(transform, dtype=np.float64)[:3, 3]
if transform is not None
else np.zeros(3, dtype=np.float64)
)
return _o, rest_t
if transform_anim is not None:
# Case 1: Blender keyframes drive the pose. The simulator
# enforces the pin as a soft constraint, so its output
# vertices drift slightly from the input keyframes. The
# dynamics were resolved against the drifted positions, so
# we must display those โ not the keyframes โ to keep the
# collider visually consistent with the cloth. Include in
# output PC2.
_obj, rest_translation = _setup_pin_shell()
_obj.pin().transform_keyframes(
local_vert=local_vert,
times=transform_anim["time"],
translations=transform_anim["translation"],
quaternions=transform_anim["quaternion"],
scales=transform_anim["scale"],
segments=transform_anim.get("segments", []),
rest_translation=rest_translation,
)
return _obj
if static_ops:
# Case 2: UI-assigned move/spin/scale ops. Blender has no
# fcurves, so the remote sim is the source of truth (include
# in output so PC2 can play it back).
_obj, _ = _setup_pin_shell()
pin = _obj.pin()
for op in static_ops:
t_start = float(op["t_start"])
t_end = float(op["t_end"])
transition = op.get("transition", "linear")
if op["op_type"] == "MOVE_BY":
pin.move_by(
list(op["delta"]),
t_start=t_start,
t_end=t_end,
transition=transition,
)
elif op["op_type"] == "SPIN":
# Center is always the object origin: in the
# pin-shell's local op frame that's (0,0,0).
pin.spin(
center=[0.0, 0.0, 0.0],
axis=list(op["axis"]),
angular_velocity=float(op["angular_velocity"]),
t_start=t_start,
t_end=t_end,
center_mode="absolute",
)
elif op["op_type"] == "SCALE":
pin.scale(
scale=float(op["factor"]),
t_start=t_start,
t_end=t_end,
center=[0.0, 0.0, 0.0],
center_mode="absolute",
)
else:
_rust.validate_static_op_type(op["op_type"])
return _obj
if static_deform is not None:
# Case 3: per-frame depsgraph-baked vertex stream from the
# Capture Deformation operator. local_vert is already
# frame_start's depsgraph-evaluated mesh in solver world
# space (the encoder swapped it in), and transform is
# identity. The pin shell starts at vert_frames[0] and we
# add one MoveByOperation per consecutive frame pair to
# drive every vertex through the recorded trajectory.
#
# The pin is a soft constraint, so the simulator output
# drifts from the captured cache. The cloth was resolved
# against the drifted positions, so include the shell in
# output PC2 and let MESH_CACHE overwrite the depsgraph-
# driven mesh on display.
times = list(static_deform["time"])
vert_frames = np.ascontiguousarray(
static_deform["vert_frames"], dtype=np.float64,
)
if vert_frames.ndim != 3 or vert_frames.shape[2] != 3:
raise ValueError(
f"static_deform_animation['vert_frames'] for "
f"'{name}' must be (n_frames, n_verts, 3); got "
f"shape {vert_frames.shape}"
)
n_frames = vert_frames.shape[0]
n_verts = vert_frames.shape[1]
if n_frames != len(times):
raise ValueError(
f"static_deform_animation for '{name}': "
f"len(time)={len(times)} != n_frames={n_frames}"
)
if n_verts != len(local_vert):
raise ValueError(
f"static_deform_animation for '{name}': "
f"cache has {n_verts} vertices but mesh has "
f"{len(local_vert)}"
)
_obj, _ = _setup_pin_shell()
pin = _obj.pin()
# Successive MoveBy segments compose: at t=times[k],
# pin pos = local_vert + sum(deltas up to k) = vert_frames[k]
# (because the encoder set local_vert == vert_frames[0]).
for k in range(n_frames - 1):
delta = np.ascontiguousarray(
vert_frames[k + 1] - vert_frames[k], dtype=np.float64,
)
pin.move_by(
delta,
t_start=float(times[k]),
t_end=float(times[k + 1]),
transition="linear",
)
return _obj
# Case 4: rest-pose static (plain collision mesh).
self._asset.add.tri(name, local_vert, face)
_static_obj = scene.add(name, obj_uuid)
if transform is not None:
_static_obj.mat4x4(transform)
_static_obj.pin()
return None
def _populate_solid(
self,
scene,
obj,
entry,
name,
obj_uuid,
vert,
object_entries,
tetra_jobs,
ftetwild_by_uuid,
progress,
):
"""SOLID group dispatcher: tetrahedralize (or reuse a canonical
result), register the tet asset, add the scene instance, and
record an ``ObjectInfo`` entry. Returns ``(_obj, tet_mesh, V, F)``
for the pin-mapping pass.
"""
report = progress["report"]
reuse_from = entry.get("tetra_reuse_from")
transform = obj.get("transform")
if reuse_from is not None:
# Reuse tet + asset from canonical mesh
src = object_entries[reuse_from]
tet_mesh = src["_tet_mesh_result"]
asset_name = src["name"]
report(f"Reusing tetrahedralization from {asset_name} for {name}")
else:
prefix = (
f"Tetrahedralizing {name} "
f"({entry['tetra_index']}/{len(tetra_jobs)}, "
f"{'cached' if entry['tetra_cached'] else 'new'})"
)
report(f"{prefix}...")
ftw_kwargs = (
(ftetwild_by_uuid or {}).get(obj_uuid, {}) or {}
)
try:
tet_mesh = entry["tri_mesh"].tetrahedralize(
status_callback=lambda detail, prefix=prefix: report(
f"{prefix}: {detail}"
),
**ftw_kwargs,
)
except ValueError as e:
# Prepend the object name so the addon UI shows which
# SOLID mesh failed (the underlying message already
# explains the cause and suggests SHELL).
raise ValueError(f"{name}: {e}") from e
entry["_tet_mesh_result"] = tet_mesh
progress["completed"] += entry["tetra_weight"]
report(
f"Finished tetrahedralizing {name} "
f"({entry['tetra_index']}/{len(tetra_jobs)})"
)
asset_name = name
V_local, F, T = tet_mesh
# Register asset with local-space vertices
self._asset.add.tet(asset_name, V_local, F, T)
V_local, F, T = tet_mesh
# Add scene instance with per-instance transform
_obj = scene.add(asset_name, obj_uuid)
if transform is not None:
_obj.mat4x4(transform)
# Compute world-space V for object_info
if transform is not None:
V = self._apply_transform(V_local, transform)
else:
V = V_local
solid_info = ObjectInfo(type="SOLID", vert=vert, V=V, F=F)
if tet_mesh.has_surface_mapping():
import numpy as np
tri_indices, coefs = tet_mesh.surface_map
# Pass local-space tet vertices: the surface-map coefs were
# computed in local space, so the in-Rust ``x0 + c1*b1 + c2*b2 +
# c3*nฬ`` reconstruction (used to pick the closest of three
# triangle corners) only matches the original Blender position
# when V is in the same space as the coefs. Under non-uniform
# world scale, mixing world V with local coefs shifts the bp by
# tens of centimeters and can pick the wrong corner.
orig_to_sim = _rust.solid_orig_to_sim(
np.ascontiguousarray(np.asarray(tri_indices, dtype=np.int64)),
np.ascontiguousarray(np.asarray(coefs, dtype=np.float64)),
np.ascontiguousarray(np.asarray(F, dtype=np.int64)),
np.ascontiguousarray(np.asarray(V_local, dtype=np.float64)),
)
solid_info.orig_to_sim = orig_to_sim
scene.set_surface_map(obj_uuid, tri_indices, coefs, F)
self._object_info[obj_uuid] = solid_info
return _obj, tet_mesh, V, F
def _populate_shell(
self, scene, obj_uuid, mesh_ref, canonical_meshes, canonical_asset_name,
name, local_vert, face, transform, vert, uv,
):
"""SHELL group dispatcher: register (or reuse) the tri asset, add
the scene instance, and record an ``ObjectInfo`` entry. Returns
the Scene Object for the pin / stitch passes.
"""
if mesh_ref and mesh_ref in canonical_meshes:
asset_name = canonical_asset_name[mesh_ref]
else:
asset_name = name
self._asset.add.tri(asset_name, local_vert, face)
_obj = scene.add(asset_name, obj_uuid)
if transform is not None:
_obj.mat4x4(transform)
self._object_info[obj_uuid] = ObjectInfo(
type="SHELL", vert=vert, V=vert, F=face,
)
if uv is not None:
assert len(uv) == len(face), "UV length must match face length."
_obj.set_uv(uv)
return _obj
def _populate_rod(self, scene, name, obj_uuid, local_vert, edge, transform, vert):
"""ROD group dispatcher: register the rod asset, add the scene
instance, and record an ``ObjectInfo`` entry. Returns the Scene
Object for the pin / stitch passes.
"""
_rust.validate_rod_has_edges(name, edge is not None)
self._asset.add.rod(name, local_vert, edge)
_obj = scene.add(name, obj_uuid)
if transform is not None:
_obj.mat4x4(transform)
self._object_info[obj_uuid] = ObjectInfo(type="ROD", vert=vert)
return _obj
def _apply_pin_mapping(self, obj, _obj, group_type, vert, V, F, tet_mesh, verbose):
"""Pin-mapping pass: register pins on ``_obj`` from ``obj['pin']``,
building the Blender-to-sim surface transfer for SOLID groups
with a surface mapping and falling back to direct per-index pins
otherwise.
"""
if _obj is None or "pin" not in obj:
return
pin_index = obj["pin"]
if verbose:
print(f" > pin: {len(pin_index)}")
if group_type == "SOLID" and tet_mesh is not None and tet_mesh.has_surface_mapping():
import numpy as np
# Build Blender->sim surface transfer support from the
# interpolation map. A pinned Blender surface vertex
# contributes to every simulation surface vertex whose
# in-plane weight is positive on the mapped triangle.
# Frame coefs store (c1, c2, c3); the implicit bary
# weights are (1-c1-c2, c1, c2), which reduce to the
# old bary weights when c3 ~ 0 (vertex lies on the
# triangle). c3 (normal offset) does not affect the
# pin transfer support.
tri_indices_pin, coefs_pin = tet_mesh.surface_map
def mapped_surface_vertices(blender_index: int) -> list[int]:
ti = tri_indices_pin[blender_index]
tri_verts = F[ti]
c = coefs_pin[blender_index]
w = np.array([1.0 - c[0] - c[1], c[0], c[1]])
sim_verts = [
int(tri_verts[k]) for k in range(len(tri_verts))
if w[k] > 1e-4
]
if not sim_verts:
sim_verts = [int(tri_verts[int(np.argmax(w))])]
return sim_verts
n_surface_verts = len(vert)
if len(pin_index) == n_surface_verts:
# Use the transpose of the interpolation map:
# collect every simulation surface vertex that
# receives positive weight from a pinned Blender
# surface vertex.
sim_verts = sorted({
vi
for i in pin_index
for vi in mapped_surface_vertices(i)
})
holder = _obj.pin(sim_verts)
holder._data._blender_pin_indices = list(pin_index)
holder._data._tet_V = V
holder._data._blender_vert = vert
if verbose:
print(f" > pin (mapped surface): {len(sim_verts)} verts")
else:
# Partial pin: each Blender pin vertex maps to the
# simulation surface vertices with positive
# interpolation weight.
for i in pin_index:
holder = _obj.pin(mapped_surface_vertices(i))
holder._data._blender_pin_indices = [i]
holder._data._tet_V = V
holder._data._blender_vert = vert
else:
# One holder for the whole pinned set. apply_pin_config
# later splits it per pin_group_id when the object has
# multiple distinct pin vertex groups. (A per-vertex
# _obj.pin([i]) loop here made N holders, and a keyframed
# pin then wrote N x M operation files for the solver.)
if pin_index:
_obj.pin(list(pin_index))
def _apply_stitch(self, obj, _obj, name, verbose):
"""Stitch pass: register a stitch asset and attach it to ``_obj``
when a stitch was resolved during planning.
"""
if _obj is None:
return
resolved_stitch = obj.get("_resolved_stitch", obj.get("stitch"))
if resolved_stitch is None:
return
stitch_data = resolved_stitch
stitch_name = f"{name}_stitch"
if verbose:
print(f" > stitch: {len(stitch_data[0])} edges")
self._asset.add.stitch(stitch_name, stitch_data)
_obj.stitch(stitch_name)