From 5841300d2005237329a1868b2fbb1ec169e1c716 Mon Sep 17 00:00:00 2001 From: Zhaowenlong Date: Wed, 27 May 2026 11:11:31 +0800 Subject: [PATCH] Improve QET 3D manual wiring task sync --- src/Mod/FreeCADExchange/DeviceImport.py | 12 +- src/Mod/FreeCADExchange/ManualWiringPanel.py | 349 +++++++++++++++++- src/Mod/FreeCADExchange/TerminalImport.py | 109 +++++- src/Mod/FreeCADExchange/WiringImport.py | 43 +++ ...eecad_exchange_manual_wiring_panel_test.py | 120 ++++++ ...nge_terminal_import_template_slots_test.py | 169 ++++++++- .../freecad_exchange_wiring_import_test.py | 49 +++ 7 files changed, 823 insertions(+), 28 deletions(-) diff --git a/src/Mod/FreeCADExchange/DeviceImport.py b/src/Mod/FreeCADExchange/DeviceImport.py index 053e19a..d20890a 100644 --- a/src/Mod/FreeCADExchange/DeviceImport.py +++ b/src/Mod/FreeCADExchange/DeviceImport.py @@ -3,8 +3,16 @@ from pathlib import Path import uuid import FreeCAD as App -import FreeCADGui as Gui -import ImportGui + +try: + import FreeCADGui as Gui +except ImportError: + Gui = None + +try: + import ImportGui +except ImportError: + ImportGui = None import DevicePreview import TemplateSemantics import TerminalObjects diff --git a/src/Mod/FreeCADExchange/ManualWiringPanel.py b/src/Mod/FreeCADExchange/ManualWiringPanel.py index 6b99025..53ed966 100644 --- a/src/Mod/FreeCADExchange/ManualWiringPanel.py +++ b/src/Mod/FreeCADExchange/ManualWiringPanel.py @@ -1,5 +1,7 @@ # FreeCADExchange GUI panel for guided manual 3D wiring. +from pathlib import Path + import FreeCAD as App try: @@ -27,14 +29,24 @@ try: except Exception: ExchangeWriteBack = None +try: + import ImportGui +except Exception: + ImportGui = None + COMMAND_NAME = "QET_Exchange_OpenManualWiringPanel" DEFAULT_TERMINAL_EXIT_LENGTH = 20.0 +DEFAULT_CARRIER_BASE_LENGTH = 200.0 CARRIER_ROLE_LABELS = { "wire_duct": "线槽", "cabinet": "柜面", "rail": "导轨", } +BUILTIN_CARRIER_ASSETS = { + "wire_duct": "qet_wire_duct.FCStd", + "rail": "qet_din_rail.FCStd", +} class ManualWiringPanelError(RuntimeError): @@ -55,6 +67,22 @@ def _console_error(message): pass +def _repo_root(): + return Path(__file__).resolve().parents[3] + + +def _builtin_carrier_asset_path(carrier_kind): + file_name = BUILTIN_CARRIER_ASSETS.get((carrier_kind or "").strip()) + if not file_name: + return "" + return str(_repo_root() / "data" / "examples" / "qet_cabinet_assets" / file_name) + + +def _supported_carrier_asset(path): + suffix = Path(path or "").suffix.lower() + return suffix in {".step", ".stp", ".fcstd"} + + def _active_document(): doc = getattr(App, "ActiveDocument", None) if doc is None: @@ -163,14 +191,22 @@ def _iter_terminal_objects(doc): ] -def _find_terminal_by_uuid(doc, terminal_uuid): +def _find_terminal_by_uuid(doc, terminal_uuid, element_uuid=""): target = (terminal_uuid or "").strip() if not target: return None + target_element = (element_uuid or "").strip() + fallback = None for terminal in _iter_terminal_objects(doc): - if getattr(terminal, "QetTerminalUuid", "").strip() == target: + if getattr(terminal, "QetTerminalUuid", "").strip() != target: + continue + if not target_element: return terminal - return None + if getattr(terminal, "QetElementUuid", "").strip() == target_element: + return terminal + if fallback is None: + fallback = terminal + return fallback def _terminal_label(obj): @@ -225,21 +261,14 @@ def _dominant_axis(vector): def _carrier_kind_from_object(obj): - candidates = [] - current = obj - if current is not None: - candidates.append(current) - candidates.extend(list(getattr(current, "InList", []) or [])) - - for candidate in candidates: - carrier_kind = (getattr(candidate, "QetCarrierKind", "") or "").strip() - if carrier_kind: - return carrier_kind + carrier = _carrier_object_from_object(obj) + if carrier is not None: + return (getattr(carrier, "QetCarrierKind", "") or "").strip() text_parts = [] - for candidate in candidates: - text_parts.append(getattr(candidate, "Name", "") or "") - text_parts.append(getattr(candidate, "Label", "") or "") + if obj is not None: + text_parts.append(getattr(obj, "Name", "") or "") + text_parts.append(getattr(obj, "Label", "") or "") text = " ".join(text_parts).lower() if "线槽" in text or "duct" in text or "trunking" in text: return "wire_duct" @@ -250,6 +279,20 @@ def _carrier_kind_from_object(obj): return "" +def _carrier_object_from_object(obj): + candidates = [] + current = obj + if current is not None: + candidates.append(current) + candidates.extend(list(getattr(current, "InList", []) or [])) + + for candidate in candidates: + carrier_kind = (getattr(candidate, "QetCarrierKind", "") or "").strip() + if carrier_kind: + return candidate + return None + + def _edge_carrier_axis(edge): vertexes = list(getattr(edge, "Vertexes", []) or []) if len(vertexes) >= 2: @@ -278,6 +321,141 @@ def _selected_carrier_objects(): ] +def _existing_object_names(doc): + return {getattr(obj, "Name", "") for obj in list(getattr(doc, "Objects", []) or [])} + + +def _new_objects_since(doc, before_names): + return [ + obj + for obj in list(getattr(doc, "Objects", []) or []) + if getattr(obj, "Name", "") not in before_names + ] + + +def _top_level_objects(objects): + object_set = set(objects or []) + result = [] + for obj in objects or []: + parents = set(getattr(obj, "InList", []) or []) + if parents.intersection(object_set): + continue + result.append(obj) + return result + + +def _open_fcstd_source(path): + source_doc = App.openDocument(path, hidden=True, temporary=True) + return source_doc + + +def _import_carrier_objects_from_path(doc, path): + if not _supported_carrier_asset(path): + raise ManualWiringPanelError("请选择 STEP/STP/FCStd 线槽或导轨模型。") + + suffix = Path(path).suffix.lower() + if suffix == ".fcstd": + source_doc = None + try: + source_doc = _open_fcstd_source(path) + copied = [] + for source_obj in _top_level_objects(list(getattr(source_doc, "Objects", []) or [])): + copied.append(doc.copyObject(source_obj, True)) + return copied + finally: + if source_doc is not None: + try: + App.closeDocument(source_doc.Name) + except Exception: + pass + + if ImportGui is None: + raise ManualWiringPanelError("当前 FreeCAD 无法导入 STEP/STP 文件。") + before = _existing_object_names(doc) + ImportGui.insert( + name=path, + docName=doc.Name, + merge=False, + useLinkGroup=True, + ) + return _top_level_objects(_new_objects_since(doc, before)) + + +def _ensure_float_property(obj, prop_name, value, description): + if prop_name not in getattr(obj, "PropertiesList", []): + obj.addProperty("App::PropertyFloat", prop_name, "QET Wiring", description) + setattr(obj, prop_name, float(value or 0.0)) + + +def _set_carrier_properties(obj, carrier_kind, source_path="", base_length=DEFAULT_CARRIER_BASE_LENGTH): + role_label = _carrier_role_label(carrier_kind) + TerminalObjects.ensure_string_property( + obj, + "QetCarrierKind", + "QET Wiring", + "3D wiring carrier kind", + carrier_kind, + ) + TerminalObjects.ensure_string_property( + obj, + "QetCarrierRoleLabel", + "QET Wiring", + "3D wiring carrier role label", + role_label, + ) + TerminalObjects.ensure_string_property( + obj, + "QetCarrierAxis", + "QET Wiring", + "3D wiring carrier axis", + "x", + ) + TerminalObjects.ensure_string_property( + obj, + "QetCarrierSourcePath", + "QET Wiring", + "3D wiring carrier source path", + source_path, + ) + _ensure_float_property(obj, "QetCarrierBaseLength", base_length, "Carrier base length") + return obj + + +def _iter_object_tree(obj): + yield obj + for child in list(getattr(obj, "Group", []) or []): + for nested in _iter_object_tree(child): + yield nested + + +def _scale_shape_x(obj, factor): + shape = getattr(obj, "Shape", None) + if shape is None or not hasattr(shape, "transformGeometry"): + return False + matrix = App.Matrix() + matrix.scale(float(factor), 1.0, 1.0) + obj.Shape = shape.transformGeometry(matrix) + return True + + +def _apply_carrier_length(carrier, length_mm): + length = max(float(length_mm or 0.0), 1.0) + base_length = float(getattr(carrier, "QetCarrierBaseLength", 0.0) or DEFAULT_CARRIER_BASE_LENGTH) + target_scale = length / base_length + current_scale = float(getattr(carrier, "QetCarrierScaleX", 1.0) or 1.0) + factor = target_scale / current_scale if current_scale else target_scale + + for obj in _iter_object_tree(carrier): + try: + _scale_shape_x(obj, factor) + except Exception: + pass + + _ensure_float_property(carrier, "QetCarrierLength", length, "Carrier length") + _ensure_float_property(carrier, "QetCarrierScaleX", target_scale, "Carrier X scale") + return carrier + + def _selected_waypoint(): for picked in _selection_ex(): picked_points = list(getattr(picked, "PickedPoints", []) or []) @@ -324,7 +502,7 @@ def _selected_waypoint(): "carrier_kind": _carrier_kind_from_object(obj), "carrier_axis": carrier_axis, "source_label": getattr(obj, "Label", "") if obj is not None else "", - "source_object_name": getattr(obj, "Name", "") if obj is not None else "", + "source_object_name": getattr(_carrier_object_from_object(obj) or obj, "Name", "") if obj is not None else "", "subelement_name": subelement_names[0] if subelement_names else "", } return None @@ -488,6 +666,65 @@ class ManualWiringController: marked.append(obj) return marked + def import_carrier_asset( + self, + asset_path, + carrier_kind, + length_mm=DEFAULT_CARRIER_BASE_LENGTH, + importer=None, + ): + carrier_kind = (carrier_kind or "").strip() + if carrier_kind not in {"wire_duct", "rail"}: + raise ManualWiringPanelError("只能导入线槽或导轨资产。") + path = str(asset_path or "").strip() + if not path: + path = _builtin_carrier_asset_path(carrier_kind) + if not path: + raise ManualWiringPanelError("找不到内置线槽/导轨资产。") + + doc = _active_document() + project_uuid = getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "").strip() + carrier_group = WiringObjects.ensure_carrier_group(doc, project_uuid) + imported = list((importer or _import_carrier_objects_from_path)(doc, path) or []) + if not imported: + raise ManualWiringPanelError("没有从模型文件导入任何对象。") + + role_label = _carrier_role_label(carrier_kind) + container_name = "QETCarrier_{0}".format(TerminalObjects.safe_token(carrier_kind)) + container = doc.addObject("App::DocumentObjectGroup", container_name) + container.Label = "QET {0}".format(role_label or carrier_kind) + carrier_group.addObject(container) + for obj in imported: + if obj not in getattr(container, "Group", []): + container.addObject(obj) + + _set_carrier_properties(container, carrier_kind, source_path=path) + for obj in imported: + _set_carrier_properties(obj, carrier_kind, source_path=path) + _apply_carrier_length(container, length_mm) + try: + doc.recompute() + except Exception: + pass + return container + + def apply_length_to_selected_carriers(self, length_mm): + selected = _selected_carrier_objects() + if not selected: + raise ManualWiringPanelError("请先选择线槽或导轨对象。") + updated = [] + for carrier in selected: + if not (getattr(carrier, "QetCarrierKind", "") or "").strip(): + continue + updated.append(_apply_carrier_length(carrier, length_mm)) + if not updated: + raise ManualWiringPanelError("所选对象不是已标记的线槽或导轨。") + try: + _active_document().recompute() + except Exception: + pass + return updated + def _clear_preview_objects(self): doc = getattr(App, "ActiveDocument", None) if doc is None: @@ -527,10 +764,12 @@ class ManualWiringController: start_terminal = _find_terminal_by_uuid( doc, getattr(task, "QetStartTerminalUuid", ""), + element_uuid=getattr(task, "QetStartElementUuid", ""), ) end_terminal = _find_terminal_by_uuid( doc, getattr(task, "QetEndTerminalUuid", ""), + element_uuid=getattr(task, "QetEndElementUuid", ""), ) if start_terminal is None or end_terminal is None: raise ManualWiringPanelError("导线任务的起点或终点工程端子未找到。") @@ -576,6 +815,7 @@ class ManualWiringController: end_terminal = _find_terminal_by_uuid( doc, getattr(self.current_task, "QetEndTerminalUuid", ""), + element_uuid=getattr(self.current_task, "QetEndElementUuid", ""), ) wire_kwargs = _task_wire_kwargs(self.current_task) else: @@ -668,6 +908,15 @@ class ManualWiringTaskPanel: self.exit_length_input.setSingleStep(5.0) self.exit_length_input.setSuffix(" mm") self.exit_length_input.setValue(self.controller.terminal_exit_length) + self.carrier_length_input = QtWidgets.QDoubleSpinBox() + self.carrier_length_input.setRange(1.0, 10000.0) + self.carrier_length_input.setDecimals(1) + self.carrier_length_input.setSingleStep(50.0) + self.carrier_length_input.setSuffix(" mm") + self.carrier_length_input.setValue(DEFAULT_CARRIER_BASE_LENGTH) + self.import_duct_button = QtWidgets.QPushButton("导入线槽") + self.import_rail_button = QtWidgets.QPushButton("导入导轨") + self.apply_carrier_length_button = QtWidgets.QPushButton("应用载体长度") self.start_button = QtWidgets.QPushButton("设为起点") self.mark_duct_button = QtWidgets.QPushButton("标记为线槽") self.mark_cabinet_button = QtWidgets.QPushButton("标记为柜面") @@ -687,6 +936,15 @@ class ManualWiringTaskPanel: exit_layout.addWidget(QtWidgets.QLabel("端子出线长度")) exit_layout.addWidget(self.exit_length_input) layout.addLayout(exit_layout) + carrier_length_layout = QtWidgets.QHBoxLayout() + carrier_length_layout.addWidget(QtWidgets.QLabel("载体长度")) + carrier_length_layout.addWidget(self.carrier_length_input) + layout.addLayout(carrier_length_layout) + import_layout = QtWidgets.QHBoxLayout() + import_layout.addWidget(self.import_duct_button) + import_layout.addWidget(self.import_rail_button) + import_layout.addWidget(self.apply_carrier_length_button) + layout.addLayout(import_layout) carrier_layout = QtWidgets.QHBoxLayout() carrier_layout.addWidget(self.mark_duct_button) carrier_layout.addWidget(self.mark_cabinet_button) @@ -712,6 +970,9 @@ class ManualWiringTaskPanel: self.use_task_button.clicked.connect(self.use_selected_task) self.reload_tasks_button.clicked.connect(self._refresh_task_list) self.exit_length_input.valueChanged.connect(self.set_exit_length) + self.import_duct_button.clicked.connect(self.import_wire_duct) + self.import_rail_button.clicked.connect(self.import_din_rail) + self.apply_carrier_length_button.clicked.connect(self.apply_carrier_length) self.mark_duct_button.clicked.connect(self.mark_wire_duct) self.mark_cabinet_button.clicked.connect(self.mark_cabinet) self.mark_rail_button.clicked.connect(self.mark_rail) @@ -787,6 +1048,60 @@ class ManualWiringTaskPanel: except Exception as exc: self._set_error(str(exc)) + def _select_carrier_asset_path(self, carrier_kind): + default_path = _builtin_carrier_asset_path(carrier_kind) + default_dir = str(Path(default_path).parent) if default_path else "" + title = "选择线槽模型" if carrier_kind == "wire_duct" else "选择导轨模型" + if not hasattr(QtWidgets, "QFileDialog"): + return default_path + path, _selected_filter = QtWidgets.QFileDialog.getOpenFileName( + self.form, + title, + default_dir, + "3D carrier asset (*.FCStd *.fcstd *.step *.stp);;All files (*.*)", + ) + return path or default_path + + def import_carrier(self, carrier_kind): + path = self._select_carrier_asset_path(carrier_kind) + carrier = self.controller.import_carrier_asset( + path, + carrier_kind, + length_mm=self.carrier_length_input.value(), + ) + self._set_status( + "已导入{0}:{1}".format( + _carrier_role_label(carrier_kind) or carrier_kind, + getattr(carrier, "Label", "") or getattr(carrier, "Name", ""), + ) + ) + + def import_wire_duct(self): + try: + self.import_carrier("wire_duct") + except Exception as exc: + self._set_error(str(exc)) + + def import_din_rail(self): + try: + self.import_carrier("rail") + except Exception as exc: + self._set_error(str(exc)) + + def apply_carrier_length(self): + try: + updated = self.controller.apply_length_to_selected_carriers( + self.carrier_length_input.value() + ) + self._set_status( + "已更新 {0} 个线槽/导轨长度为 {1:.1f} mm。".format( + len(updated), + self.carrier_length_input.value(), + ) + ) + except Exception as exc: + self._set_error(str(exc)) + def mark_carrier(self, carrier_kind): marked = self.controller.mark_selected_carriers(carrier_kind) role_label = _carrier_role_label(carrier_kind) or carrier_kind diff --git a/src/Mod/FreeCADExchange/TerminalImport.py b/src/Mod/FreeCADExchange/TerminalImport.py index 3419203..8cf91aa 100644 --- a/src/Mod/FreeCADExchange/TerminalImport.py +++ b/src/Mod/FreeCADExchange/TerminalImport.py @@ -3,7 +3,11 @@ from collections import OrderedDict import FreeCAD as App -import FreeCADGui as Gui + +try: + import FreeCADGui as Gui +except ImportError: + Gui = None import DeviceImport import TerminalObjects as TerminalObjects @@ -76,6 +80,59 @@ def _payload_device_lookup(payload): } +def _payload_device_instance_by_element(payload): + result = {} + for item in payload.get("devices", []) or []: + if not isinstance(item, dict): + continue + element_uuid = (item.get("element_uuid") or "").strip() + instance_id = (item.get("instance_id") or "").strip() + if element_uuid and instance_id and element_uuid not in result: + result[element_uuid] = instance_id + return result + + +def _wire_endpoint_terminal_entries(payload, existing_keys): + wires = payload.get("wires", []) or [] + if not isinstance(wires, list): + return [] + + instance_by_element = _payload_device_instance_by_element(payload) + seen = set(existing_keys or set()) + entries = [] + for wire in wires: + if not isinstance(wire, dict): + continue + for side in ("start", "end"): + terminal_uuid = (wire.get("{0}_terminal_uuid".format(side)) or "").strip() + element_uuid = (wire.get("{0}_element_uuid".format(side)) or "").strip() + instance_id = (wire.get("{0}_instance_id".format(side)) or "").strip() + if not instance_id and element_uuid: + instance_id = instance_by_element.get(element_uuid, "") + if not terminal_uuid or not (element_uuid or instance_id): + continue + + key = (element_uuid, terminal_uuid) + if key in seen: + continue + seen.add(key) + terminal_display = ( + wire.get("{0}_terminal_display".format(side)) + or wire.get("{0}_terminal_label".format(side)) + or "" + ) + entries.append( + { + "terminal_uuid": terminal_uuid, + "element_uuid": element_uuid, + "instance_id": instance_id, + "terminal_display": terminal_display, + "slot_name_hint": terminal_display, + } + ) + return entries + + def _terminal_belongs_to_payload_devices(entry, device_lookup): instance_id = entry["instance_id"] element_uuid = entry["element_uuid"] @@ -95,6 +152,21 @@ def _ensure_visible(obj): pass +def _set_terminal_geometry_source(obj, source): + source_text = (source or "").strip() + if source_text == "fallback": + source_text = "generated_bbox_fallback" + if not source_text: + source_text = "template" + TerminalObjects.ensure_string_property( + obj, + "QetTerminalGeometrySource", + "QET Exchange", + "How this engineering terminal geometry was resolved", + source_text, + ) + + def _hide_object(obj): try: if getattr(obj, "ViewObject", None) is not None: @@ -266,6 +338,7 @@ def _create_terminal_object(doc, terminal_uuid, entry, slot, terminal_group, pro label=terminal_label, slot_name=slot.get("name", ""), ) + _set_terminal_geometry_source(terminal_obj, slot.get("source", "template")) _ensure_visible(terminal_obj) return terminal_obj @@ -287,6 +360,17 @@ def import_terminals_from_payload(payload, scene_path=""): terminal_entries = payload.get("terminals", []) if not isinstance(terminal_entries, list): raise TerminalImportError("Field 'terminals' must be a list.") + terminal_entries = list(terminal_entries) + terminal_entry_keys = set() + for item in terminal_entries: + if not isinstance(item, dict): + continue + element_uuid = (item.get("element_uuid") or "").strip() + terminal_uuid = (item.get("terminal_uuid") or "").strip() + if element_uuid and terminal_uuid: + terminal_entry_keys.add((element_uuid, terminal_uuid)) + synthesized_entries = _wire_endpoint_terminal_entries(payload, terminal_entry_keys) + terminal_entries.extend(synthesized_entries) device_lookup = _payload_device_lookup(payload) @@ -300,6 +384,8 @@ def import_terminals_from_payload(payload, scene_path=""): "removed_terminals": 0, "reused_template_hints": 0, "matched_by_slot_hint": 0, + "generated_fallback_slots": 0, + "synthesized_wire_endpoint_terminals": len(synthesized_entries), "skipped_missing_slot": 0, "skipped_missing_device": 0, "skipped_invalid_entry": 0, @@ -364,6 +450,14 @@ def import_terminals_from_payload(payload, scene_path=""): resolved_model_path, len(entries), ) + if len(fallback_slots) < len(entries): + generated_slots = TemplateSemantics.build_fallback_terminal_slots( + device_group, + len(entries), + ) + if generated_slots: + fallback_slots = generated_slots + report["generated_fallback_slots"] += len(generated_slots) slot_lookup = _build_slot_lookup(template_slots) for index, entry in enumerate(entries): @@ -415,6 +509,7 @@ def import_terminals_from_payload(payload, scene_path=""): label=_terminal_entry_label(entry, slot, terminal_uuid), slot_name=slot.get("name", ""), ) + _set_terminal_geometry_source(terminal_obj, slot.get("source", "template")) try: terminal_obj.Placement = _slot_placement(slot) except Exception: @@ -443,6 +538,7 @@ def import_terminals_from_payload(payload, scene_path=""): label=_terminal_entry_label(entry, slot, terminal_uuid), slot_name=slot.get("name", ""), ) + _set_terminal_geometry_source(terminal_obj, slot.get("source", "template")) try: terminal_obj.Placement = _slot_placement(slot) except Exception: @@ -465,6 +561,8 @@ def import_terminals_from_payload(payload, scene_path=""): continue if terminal_obj in used_objects: continue + if TerminalObjects.is_local_terminal_uuid(terminal_uuid): + continue report["warnings"].append( "Removed stale terminal {0} from device {1}.".format( terminal_uuid, device_element_uuid @@ -474,10 +572,11 @@ def import_terminals_from_payload(payload, scene_path=""): report["removed_terminals"] += 1 doc.recompute() - try: - Gui.SendMsgToActiveView("ViewFit") - except Exception: - pass + if Gui is not None: + try: + Gui.SendMsgToActiveView("ViewFit") + except Exception: + pass _append_debug_log( "TerminalImport finished: imported={0}, updated={1}, removed={2}, skipped_unmatched_parent={3}, skipped_missing_slot={4}".format( diff --git a/src/Mod/FreeCADExchange/WiringImport.py b/src/Mod/FreeCADExchange/WiringImport.py index edf39dc..47080a9 100644 --- a/src/Mod/FreeCADExchange/WiringImport.py +++ b/src/Mod/FreeCADExchange/WiringImport.py @@ -132,6 +132,40 @@ def _find_task_by_wire_uuid(task_group, wire_uuid): return None +def _remove_task_object(doc, task_group, task): + try: + if task in list(getattr(task_group, "Group", []) or []): + task_group.removeObject(task) + except Exception: + try: + task_group.Group = [ + candidate + for candidate in list(getattr(task_group, "Group", []) or []) + if candidate is not task + ] + except Exception: + pass + try: + doc.removeObject(getattr(task, "Name", "")) + except Exception: + pass + + +def _remove_stale_wire_tasks(doc, task_group, active_wire_uuids): + active = {(wire_uuid or "").strip() for wire_uuid in active_wire_uuids if (wire_uuid or "").strip()} + removed = 0 + for candidate in list(getattr(task_group, "Group", []) or []): + wire_uuid = (getattr(candidate, "QetWireUuid", "") or "").strip() + route_type = (getattr(candidate, "RouteType", "") or "").strip() + if not wire_uuid or route_type != "Task": + continue + if wire_uuid in active: + continue + _remove_task_object(doc, task_group, candidate) + removed += 1 + return removed + + def _ensure_string_property(obj, prop_name, value, description="QET wire task property"): TerminalObjects.ensure_string_property( obj, @@ -221,11 +255,13 @@ def import_wire_tasks_from_payload(payload, doc=None): "total_wires": len(wires), "imported_tasks": 0, "updated_tasks": 0, + "removed_stale_tasks": 0, "skipped_invalid": 0, "warnings": [], } device_labels = _device_display_map(payload) + active_wire_uuids = [] for index, item in enumerate(wires): try: entry = _normalize_wire_entry(item, index, device_labels=device_labels) @@ -234,10 +270,17 @@ def import_wire_tasks_from_payload(payload, doc=None): report["warnings"].append(str(exc)) continue + active_wire_uuids.append(entry["wire_uuid"]) _task, created = _upsert_wire_task(doc, task_group, project_uuid, entry) if created: report["imported_tasks"] += 1 else: report["updated_tasks"] += 1 + report["removed_stale_tasks"] = _remove_stale_wire_tasks( + doc, + task_group, + active_wire_uuids, + ) + return report diff --git a/tests/python/freecad_exchange_manual_wiring_panel_test.py b/tests/python/freecad_exchange_manual_wiring_panel_test.py index 10c1009..ef41f95 100644 --- a/tests/python/freecad_exchange_manual_wiring_panel_test.py +++ b/tests/python/freecad_exchange_manual_wiring_panel_test.py @@ -347,6 +347,62 @@ class ManualWiringPanelTest(unittest.TestCase): self.assertEqual("线槽", getattr(carrier, "QetCarrierRoleLabel", "")) self.assertIn(carrier, carrier_group.Group) + def test_controller_imports_wire_duct_carrier_from_asset_path(self): + _selection_state = _install_fake_freecad() + terminal_objects, panel = _reload_modules() + app = sys.modules["FreeCAD"] + + doc = FakeDocument() + app.ActiveDocument = doc + terminal_objects.ensure_root_group(doc, "project-1") + + def importer(doc, path): + obj = doc.addObject("Part::Feature", "ImportedWireDuct") + obj.Label = "Imported Wire Duct" + return [obj] + + carrier = panel.ManualWiringController().import_carrier_asset( + r"D:\assets\duct.FCStd", + "wire_duct", + length_mm=600.0, + importer=importer, + ) + + carrier_group = doc.getObject("QETWiring_02_Carriers") + self.assertEqual("wire_duct", getattr(carrier, "QetCarrierKind", "")) + self.assertEqual("线槽", getattr(carrier, "QetCarrierRoleLabel", "")) + self.assertEqual(r"D:\assets\duct.FCStd", getattr(carrier, "QetCarrierSourcePath", "")) + self.assertEqual(200.0, getattr(carrier, "QetCarrierBaseLength", None)) + self.assertEqual(600.0, getattr(carrier, "QetCarrierLength", None)) + self.assertIn(carrier, carrier_group.Group) + self.assertEqual(3.0, getattr(carrier, "QetCarrierScaleX", None)) + + def test_controller_applies_length_to_selected_carrier(self): + selection_state = _install_fake_freecad() + terminal_objects, panel = _reload_modules() + app = sys.modules["FreeCAD"] + + doc = FakeDocument() + app.ActiveDocument = doc + terminal_objects.ensure_root_group(doc, "project-1") + carrier = doc.addObject("App::DocumentObjectGroup", "Carrier") + terminal_objects.ensure_string_property( + carrier, + "QetCarrierKind", + "QET Wiring", + "Carrier kind", + "rail", + ) + carrier.addProperty("App::PropertyFloat", "QetCarrierBaseLength", "QET Wiring", "Base length") + carrier.QetCarrierBaseLength = 200.0 + selection_state["selection"] = [carrier] + + updated = panel.ManualWiringController().apply_length_to_selected_carriers(500.0) + + self.assertEqual([carrier], updated) + self.assertEqual(500.0, getattr(carrier, "QetCarrierLength", None)) + self.assertEqual(2.5, getattr(carrier, "QetCarrierScaleX", None)) + def test_controller_deletes_last_waypoint_and_preview_point(self): selection_state = _install_fake_freecad() terminal_objects, panel = _reload_modules() @@ -647,6 +703,70 @@ class ManualWiringPanelTest(unittest.TestCase): self.assertEqual("terminal-end", getattr(wire, "QetEndTerminalUuid", "")) self.assertEqual("Routed", getattr(task, "RouteStatus", "")) + def test_controller_prefers_task_element_uuid_when_terminal_uuid_is_reused(self): + _install_fake_freecad() + terminal_objects, panel = _reload_modules() + wiring_objects = importlib.import_module("WiringObjects") + app = sys.modules["FreeCAD"] + + doc = FakeDocument() + app.ActiveDocument = doc + root = terminal_objects.ensure_root_group(doc, "project-1") + + def add_device(element_uuid, instance_id, terminal_name): + device = doc.addObject("App::DocumentObjectGroup", "QETDevice_" + element_uuid) + root.addObject(device) + terminal_objects.ensure_string_property(device, "QetElementUuid", "QET Exchange", "", element_uuid) + terminal_objects.ensure_string_property(device, "QetInstanceId", "QET Exchange", "", instance_id) + terminal = doc.addObject("Part::LocalCoordinateSystem", terminal_name) + terminal.Placement = app.Placement(app.Vector(1, 2, 3), app.Rotation()) + device.addObject(terminal) + terminal_objects.set_terminal_semantics( + terminal, + "project-1", + element_uuid, + "terminal-reused", + instance_id, + label=terminal_name, + ) + return terminal + + wrong_start = add_device("device-a", "instance-a", "WrongStart") + correct_start = add_device("device-b", "instance-b", "CorrectStart") + correct_end = add_device("device-c", "instance-c", "CorrectEnd") + _ = wrong_start + + task = wiring_objects.create_wire_task( + doc, + "project-1", + "wire-1", + "W001", + "terminal-reused", + "terminal-reused", + "", + "", + ) + terminal_objects.ensure_string_property( + task, + "QetStartElementUuid", + "QET Wiring", + "", + "device-b", + ) + terminal_objects.ensure_string_property( + task, + "QetEndElementUuid", + "QET Wiring", + "", + "device-c", + ) + + controller = panel.ManualWiringController() + controller.set_task_from_object(task) + + self.assertIs(correct_start, controller.start_terminal) + self.assertIs(correct_end, panel._find_terminal_by_uuid(doc, "terminal-reused", element_uuid="device-c")) + if __name__ == "__main__": unittest.main() diff --git a/tests/python/freecad_exchange_terminal_import_template_slots_test.py b/tests/python/freecad_exchange_terminal_import_template_slots_test.py index 781330d..5fb8446 100644 --- a/tests/python/freecad_exchange_terminal_import_template_slots_test.py +++ b/tests/python/freecad_exchange_terminal_import_template_slots_test.py @@ -128,7 +128,7 @@ def _reload_modules(): class TerminalImportTemplateSlotPolicyTest(unittest.TestCase): - def test_import_skips_terminal_when_device_has_no_template_slots(self): + def test_import_creates_fallback_terminal_when_device_has_no_template_slots(self): _install_fake_freecad() terminal_import, terminal_objects, device_import = _reload_modules() @@ -183,9 +183,170 @@ class TerminalImportTemplateSlotPolicyTest(unittest.TestCase): terminal_objects.TERMINAL_GROUP_KIND, ) - self.assertEqual(0, report["imported_terminals"]) - self.assertEqual(1, report.get("skipped_missing_slot")) - self.assertEqual([], terminal_objects.collect_terminal_objects(terminal_group)) + terminals = terminal_objects.collect_terminal_objects(terminal_group) + + self.assertEqual(1, report["imported_terminals"]) + self.assertEqual(0, report.get("skipped_missing_slot")) + self.assertEqual(1, report.get("generated_fallback_slots")) + self.assertEqual(1, len(terminals)) + self.assertEqual("terminal-a", terminals[0].QetTerminalUuid) + self.assertEqual("generated_bbox_fallback", terminals[0].QetTerminalGeometrySource) + + def test_import_preserves_local_terminals_when_payload_has_no_entry_for_device(self): + _install_fake_freecad() + terminal_import, terminal_objects, device_import = _reload_modules() + app = sys.modules["FreeCAD"] + + doc = FakeDocument() + device_import._ensure_document = lambda scene_path: doc + root = device_import._ensure_root_group(doc, project_uuid="project-1") + device = doc.addObject("App::Part", "QETDevice_device_a") + root.addObject(device) + terminal_objects.ensure_string_property( + device, + "QetProjectUuid", + "QET Exchange", + "Project UUID", + "project-1", + ) + terminal_objects.ensure_string_property( + device, + "QetElementUuid", + "QET Exchange", + "Element UUID", + "device-a", + ) + terminal_objects.ensure_string_property( + device, + "QetInstanceId", + "QET Exchange", + "Instance ID", + "instance-a", + ) + terminal_group = terminal_objects.ensure_terminal_group( + doc, + device, + project_uuid="project-1", + instance_id="instance-a", + ) + local_terminal = terminal_objects.create_lcs_object( + doc, + "QETTerminal_instance_a_P1", + placement=app.Placement(app.Vector(1, 0, 0), app.Rotation()), + label="P1", + ) + terminal_group.addObject(local_terminal) + terminal_objects.set_terminal_semantics( + local_terminal, + "project-1", + "device-a", + "local:instance-a:P1", + "instance-a", + label="P1", + slot_name="P1", + ) + + report = terminal_import.import_terminals_from_payload( + { + "project_uuid": "project-1", + "devices": [ + { + "element_uuid": "device-a", + "instance_id": "instance-a", + } + ], + "terminals": [], + } + ) + + terminals = terminal_objects.collect_terminal_objects(terminal_group) + + self.assertEqual(0, report["removed_terminals"]) + self.assertEqual([local_terminal], terminals) + self.assertEqual("local:instance-a:P1", local_terminal.QetTerminalUuid) + + def test_import_synthesizes_missing_terminal_entries_from_wire_endpoints(self): + _install_fake_freecad() + terminal_import, terminal_objects, device_import = _reload_modules() + + doc = FakeDocument() + device_import._ensure_document = lambda scene_path: doc + root = device_import._ensure_root_group(doc, project_uuid="project-1") + + def add_device(element_uuid, instance_id): + device = doc.addObject("App::Part", "QETDevice_" + element_uuid) + root.addObject(device) + terminal_objects.ensure_string_property( + device, + "QetProjectUuid", + "QET Exchange", + "Project UUID", + "project-1", + ) + terminal_objects.ensure_string_property( + device, + "QetElementUuid", + "QET Exchange", + "Element UUID", + element_uuid, + ) + terminal_objects.ensure_string_property( + device, + "QetInstanceId", + "QET Exchange", + "Instance ID", + instance_id, + ) + return device + + start_device = add_device("device-a", "instance-a") + end_device = add_device("device-b", "instance-b") + + report = terminal_import.import_terminals_from_payload( + { + "project_uuid": "project-1", + "devices": [ + {"element_uuid": "device-a", "instance_id": "instance-a"}, + {"element_uuid": "device-b", "instance_id": "instance-b"}, + ], + "terminals": [], + "wires": [ + { + "wire_id": "wire-1", + "start_element_uuid": "device-a", + "start_terminal_uuid": "terminal-a", + "start_instance_id": "", + "end_element_uuid": "device-b", + "end_terminal_uuid": "terminal-b", + "end_instance_id": "", + } + ], + } + ) + + start_terminals = terminal_objects.collect_terminal_objects( + terminal_objects.ensure_terminal_group( + doc, + start_device, + project_uuid="project-1", + instance_id="instance-a", + ) + ) + end_terminals = terminal_objects.collect_terminal_objects( + terminal_objects.ensure_terminal_group( + doc, + end_device, + project_uuid="project-1", + instance_id="instance-b", + ) + ) + + self.assertEqual(2, report["imported_terminals"]) + self.assertEqual(2, report["synthesized_wire_endpoint_terminals"]) + self.assertEqual("terminal-a", start_terminals[0].QetTerminalUuid) + self.assertEqual("device-a", start_terminals[0].QetElementUuid) + self.assertEqual("terminal-b", end_terminals[0].QetTerminalUuid) + self.assertEqual("device-b", end_terminals[0].QetElementUuid) def test_import_uses_slot_name_hint_to_match_template_slots(self): _install_fake_freecad() diff --git a/tests/python/freecad_exchange_wiring_import_test.py b/tests/python/freecad_exchange_wiring_import_test.py index 4397cb5..da4f3bd 100644 --- a/tests/python/freecad_exchange_wiring_import_test.py +++ b/tests/python/freecad_exchange_wiring_import_test.py @@ -84,6 +84,17 @@ class FakeDocument: return obj return None + def removeObject(self, name): + obj = self.getObject(name) + if obj is None: + return + for candidate in list(self.Objects): + if obj in getattr(candidate, "Group", []): + candidate.Group.remove(obj) + if candidate in getattr(obj, "InList", []): + obj.InList.remove(candidate) + self.Objects.remove(obj) + def _reload_modules(): for name in ["TerminalObjects", "WiringObjects", "WiringImport"]: @@ -160,6 +171,44 @@ class WiringImportTest(unittest.TestCase): self.assertEqual("W001-updated", task_group.Group[0].QetWireMark) self.assertEqual("Routed", task_group.Group[0].RouteStatus) + def test_reimport_removes_stale_wire_tasks_not_in_current_payload(self): + _install_fake_freecad() + terminal_objects, wiring_objects, wiring_import = _reload_modules() + + doc = FakeDocument() + terminal_objects.ensure_root_group(doc, "project-1") + task_group = wiring_objects.ensure_task_group(doc, "project-1") + stale_task = wiring_objects.create_wire_task( + doc, + "project-1", + "conductor:old-wire", + "old wire", + "old-start", + "old-end", + "", + "", + ) + self.assertIn(stale_task, task_group.Group) + + payload = { + "project_uuid": "project-1", + "wires": [ + { + "wire_id": "direction:new-wire:0", + "start_terminal_uuid": "terminal-a", + "end_terminal_uuid": "terminal-b", + } + ], + } + + report = wiring_import.import_wire_tasks_from_payload(payload, doc) + + self.assertEqual(1, report["imported_tasks"]) + self.assertEqual(1, report["removed_stale_tasks"]) + self.assertIsNone(doc.getObject(stale_task.Name)) + self.assertEqual(1, len(task_group.Group)) + self.assertEqual("direction:new-wire:0", task_group.Group[0].QetWireUuid) + if __name__ == "__main__": unittest.main()