# FreeCADExchange write-back helpers. import json import os from datetime import datetime from pathlib import Path import FreeCAD as App import DeviceImport import TerminalObjects as TerminalObjects try: import FreeCADGui as Gui except ImportError: Gui = None STATE_WRITEBACK_OBSERVER = "_qet_exchange_writeback_observer" ENV_JSON_PATH = "QET_2D_TO_3D_JSON" class ExchangeWriteBackError(RuntimeError): pass def _append_debug_log(message): try: DeviceImport._append_debug_log(message) except Exception: pass def _project_uuid_from_payload(payload): if isinstance(payload, dict): value = (payload.get("project_uuid") or "").strip() if value: return value return "" def _root_group(doc): try: return doc.getObject(TerminalObjects.ROOT_GROUP_NAME) except Exception: return None def _is_device_group(obj): if obj is None: return False try: if not obj.Name.startswith(DeviceImport.DEVICE_GROUP_PREFIX): return False return "QetElementUuid" in getattr(obj, "PropertiesList", []) except Exception: return False def _iter_device_groups(doc): root = _root_group(doc) if root is not None: for child in list(getattr(root, "Group", []) or []): if _is_device_group(child): yield child return for obj in doc.Objects: if _is_device_group(obj): yield obj def _iter_terminal_objects(device_group): terminal_container = TerminalObjects.find_child_group_by_kind( device_group, TerminalObjects.TERMINAL_GROUP_KIND, ) if terminal_container is None: return [] return TerminalObjects.collect_terminal_objects(terminal_container) def _scene_path_from_doc(doc, scene_path=""): candidate = (scene_path or "").strip() if candidate: return candidate env_scene = os.environ.get("QET_FREECAD_SCENE_FILE", "").strip() if env_scene: return env_scene file_name = getattr(doc, "FileName", "").strip() if file_name: return file_name return "" def _output_path_for_scene(scene_path): scene_path = (scene_path or "").strip() if not scene_path: return "" path = Path(scene_path) if path.suffix.lower() == ".fcstd": return str(path.with_name("3d_to_2d.json")) if path.is_dir(): return str(path / "3d_to_2d.json") if path.name.lower().endswith(".fcstd"): return str(path.with_name("3d_to_2d.json")) return str(path.parent / "3d_to_2d.json") def _output_path_for_exchange_json(): json_path = os.environ.get(ENV_JSON_PATH, "").strip() if not json_path: return "" return str(Path(json_path).with_name("3d_to_2d.json")) def _format_timestamp(): return datetime.now().astimezone().isoformat(timespec="seconds") def _collect_instance_bindings(doc): bindings = [] seen = set() for device_group in _iter_device_groups(doc): element_uuid = getattr(device_group, "QetElementUuid", "").strip() instance_id = getattr(device_group, "QetInstanceId", "").strip() if not element_uuid or not instance_id: continue key = (element_uuid, instance_id) if key in seen: continue seen.add(key) bindings.append( { "element_uuid": element_uuid, "instance_id": instance_id, } ) return bindings def _collect_terminal_bindings(doc): bindings = [] seen = set() for device_group in _iter_device_groups(doc): instance_id = getattr(device_group, "QetInstanceId", "").strip() for terminal_obj in _iter_terminal_objects(device_group): terminal_uuid = getattr(terminal_obj, "QetTerminalUuid", "").strip() binding_mode = getattr(terminal_obj, "QetTerminalBindingMode", "").strip().lower() if ( TerminalObjects.is_local_terminal_uuid(terminal_uuid) or binding_mode == TerminalObjects.TERMINAL_BINDING_MODE_LOCAL ): continue terminal_instance_id = getattr(terminal_obj, "QetInstanceId", "").strip() or instance_id if not terminal_uuid or not terminal_instance_id: continue key = (terminal_uuid, terminal_instance_id) if key in seen: continue seen.add(key) bindings.append( { "terminal_uuid": terminal_uuid, "instance_id": terminal_instance_id, } ) return bindings def _project_uuid_from_doc(doc, payload=None): root = _root_group(doc) if root is not None: project_uuid = getattr(root, "QetProjectUuid", "").strip() if project_uuid: return project_uuid return _project_uuid_from_payload(payload) def write_back_document(doc=None, scene_path="", payload=None): if doc is None: doc = App.ActiveDocument if doc is None: raise ExchangeWriteBackError("No active FreeCAD document is available.") scene_path = _scene_path_from_doc(doc, scene_path) output_path = _output_path_for_exchange_json() or _output_path_for_scene(scene_path) if not output_path: raise ExchangeWriteBackError( "Cannot determine the 3d_to_2d.json output path." ) project_uuid = _project_uuid_from_doc(doc, payload) if not project_uuid: raise ExchangeWriteBackError( "Cannot determine project_uuid for write-back." ) report = { "schema_version": "1.0", "project_uuid": project_uuid, "generated_at": _format_timestamp(), "instances": _collect_instance_bindings(doc), "terminals": _collect_terminal_bindings(doc), "output_path": output_path, } output_dir = str(Path(output_path).parent) os.makedirs(output_dir, exist_ok=True) Path(output_path).write_text( json.dumps( { "schema_version": report["schema_version"], "project_uuid": report["project_uuid"], "generated_at": report["generated_at"], "instances": report["instances"], "terminals": report["terminals"], }, ensure_ascii=False, indent=2, ), encoding="utf-8", ) _append_debug_log( "write_back_document completed: instances={0}, terminals={1}, path={2}".format( len(report["instances"]), len(report["terminals"]), output_path, ) ) try: App.Console.PrintMessage( "[FreeCADExchange] Wrote 3d_to_2d.json to {0}\n".format(output_path) ) except Exception: pass return report def _is_exchange_document(doc): if doc is None: return False if _root_group(doc) is not None: return True for obj in doc.Objects: if _is_device_group(obj): return True return False class _WriteBackObserver: def slotFinishSaveDocument(self, doc, name): if not _is_exchange_document(doc): return try: write_back_document(doc, scene_path=name) except Exception as exc: _append_debug_log("write-back after save failed: {0}".format(exc)) try: App.Console.PrintError( "[FreeCADExchange] write-back after save failed: {0}\n".format(exc) ) except Exception: pass def ensure_document_observer_installed(): if getattr(App, STATE_WRITEBACK_OBSERVER, None) is not None: return getattr(App, STATE_WRITEBACK_OBSERVER) observer = _WriteBackObserver() try: App.addDocumentObserver(observer) except Exception as exc: _append_debug_log("failed to add write-back observer: {0}".format(exc)) return None setattr(App, STATE_WRITEBACK_OBSERVER, observer) return observer class CommandWriteBack: def GetResources(self): return { "MenuText": "Write Back 3D Binding", "ToolTip": "Generate 3d_to_2d.json from the current FreeCAD document", } def IsActive(self): return App.ActiveDocument is not None def Activated(self): try: report = write_back_document(App.ActiveDocument) try: App.Console.PrintMessage( "[FreeCADExchange] Write-back completed: {0} instances, {1} terminals\n".format( len(report["instances"]), len(report["terminals"]), ) ) except Exception: pass except Exception as exc: try: App.Console.PrintError( "[FreeCADExchange] Write-back failed: {0}\n".format(exc) ) except Exception: pass _COMMANDS_REGISTERED = False def register_commands(): global _COMMANDS_REGISTERED if _COMMANDS_REGISTERED: return if Gui is None: return try: Gui.addCommand("QET_Exchange_WriteBack", CommandWriteBack()) _COMMANDS_REGISTERED = True except Exception as exc: _append_debug_log("failed to register write-back command: {0}".format(exc)) register_commands() ensure_document_observer_installed()