Improve QET 3D manual wiring task sync

dev
Zhaowenlong 4 weeks ago
parent 61fa285472
commit 5841300d20

@ -3,8 +3,16 @@ from pathlib import Path
import uuid
import FreeCAD as App
import FreeCADGui as Gui
import ImportGui
try:
import FreeCADGui as Gui
except ImportError:
Gui = None
try:
import ImportGui
except ImportError:
ImportGui = None
import DevicePreview
import TemplateSemantics
import TerminalObjects

@ -1,5 +1,7 @@
# FreeCADExchange GUI panel for guided manual 3D wiring.
from pathlib import Path
import FreeCAD as App
try:
@ -27,14 +29,24 @@ try:
except Exception:
ExchangeWriteBack = None
try:
import ImportGui
except Exception:
ImportGui = None
COMMAND_NAME = "QET_Exchange_OpenManualWiringPanel"
DEFAULT_TERMINAL_EXIT_LENGTH = 20.0
DEFAULT_CARRIER_BASE_LENGTH = 200.0
CARRIER_ROLE_LABELS = {
"wire_duct": "线槽",
"cabinet": "柜面",
"rail": "导轨",
}
BUILTIN_CARRIER_ASSETS = {
"wire_duct": "qet_wire_duct.FCStd",
"rail": "qet_din_rail.FCStd",
}
class ManualWiringPanelError(RuntimeError):
@ -55,6 +67,22 @@ def _console_error(message):
pass
def _repo_root():
return Path(__file__).resolve().parents[3]
def _builtin_carrier_asset_path(carrier_kind):
file_name = BUILTIN_CARRIER_ASSETS.get((carrier_kind or "").strip())
if not file_name:
return ""
return str(_repo_root() / "data" / "examples" / "qet_cabinet_assets" / file_name)
def _supported_carrier_asset(path):
suffix = Path(path or "").suffix.lower()
return suffix in {".step", ".stp", ".fcstd"}
def _active_document():
doc = getattr(App, "ActiveDocument", None)
if doc is None:
@ -163,14 +191,22 @@ def _iter_terminal_objects(doc):
]
def _find_terminal_by_uuid(doc, terminal_uuid):
def _find_terminal_by_uuid(doc, terminal_uuid, element_uuid=""):
target = (terminal_uuid or "").strip()
if not target:
return None
target_element = (element_uuid or "").strip()
fallback = None
for terminal in _iter_terminal_objects(doc):
if getattr(terminal, "QetTerminalUuid", "").strip() == target:
if getattr(terminal, "QetTerminalUuid", "").strip() != target:
continue
if not target_element:
return terminal
return None
if getattr(terminal, "QetElementUuid", "").strip() == target_element:
return terminal
if fallback is None:
fallback = terminal
return fallback
def _terminal_label(obj):
@ -225,21 +261,14 @@ def _dominant_axis(vector):
def _carrier_kind_from_object(obj):
candidates = []
current = obj
if current is not None:
candidates.append(current)
candidates.extend(list(getattr(current, "InList", []) or []))
for candidate in candidates:
carrier_kind = (getattr(candidate, "QetCarrierKind", "") or "").strip()
if carrier_kind:
return carrier_kind
carrier = _carrier_object_from_object(obj)
if carrier is not None:
return (getattr(carrier, "QetCarrierKind", "") or "").strip()
text_parts = []
for candidate in candidates:
text_parts.append(getattr(candidate, "Name", "") or "")
text_parts.append(getattr(candidate, "Label", "") or "")
if obj is not None:
text_parts.append(getattr(obj, "Name", "") or "")
text_parts.append(getattr(obj, "Label", "") or "")
text = " ".join(text_parts).lower()
if "线槽" in text or "duct" in text or "trunking" in text:
return "wire_duct"
@ -250,6 +279,20 @@ def _carrier_kind_from_object(obj):
return ""
def _carrier_object_from_object(obj):
candidates = []
current = obj
if current is not None:
candidates.append(current)
candidates.extend(list(getattr(current, "InList", []) or []))
for candidate in candidates:
carrier_kind = (getattr(candidate, "QetCarrierKind", "") or "").strip()
if carrier_kind:
return candidate
return None
def _edge_carrier_axis(edge):
vertexes = list(getattr(edge, "Vertexes", []) or [])
if len(vertexes) >= 2:
@ -278,6 +321,141 @@ def _selected_carrier_objects():
]
def _existing_object_names(doc):
return {getattr(obj, "Name", "") for obj in list(getattr(doc, "Objects", []) or [])}
def _new_objects_since(doc, before_names):
return [
obj
for obj in list(getattr(doc, "Objects", []) or [])
if getattr(obj, "Name", "") not in before_names
]
def _top_level_objects(objects):
object_set = set(objects or [])
result = []
for obj in objects or []:
parents = set(getattr(obj, "InList", []) or [])
if parents.intersection(object_set):
continue
result.append(obj)
return result
def _open_fcstd_source(path):
source_doc = App.openDocument(path, hidden=True, temporary=True)
return source_doc
def _import_carrier_objects_from_path(doc, path):
if not _supported_carrier_asset(path):
raise ManualWiringPanelError("请选择 STEP/STP/FCStd 线槽或导轨模型。")
suffix = Path(path).suffix.lower()
if suffix == ".fcstd":
source_doc = None
try:
source_doc = _open_fcstd_source(path)
copied = []
for source_obj in _top_level_objects(list(getattr(source_doc, "Objects", []) or [])):
copied.append(doc.copyObject(source_obj, True))
return copied
finally:
if source_doc is not None:
try:
App.closeDocument(source_doc.Name)
except Exception:
pass
if ImportGui is None:
raise ManualWiringPanelError("当前 FreeCAD 无法导入 STEP/STP 文件。")
before = _existing_object_names(doc)
ImportGui.insert(
name=path,
docName=doc.Name,
merge=False,
useLinkGroup=True,
)
return _top_level_objects(_new_objects_since(doc, before))
def _ensure_float_property(obj, prop_name, value, description):
if prop_name not in getattr(obj, "PropertiesList", []):
obj.addProperty("App::PropertyFloat", prop_name, "QET Wiring", description)
setattr(obj, prop_name, float(value or 0.0))
def _set_carrier_properties(obj, carrier_kind, source_path="", base_length=DEFAULT_CARRIER_BASE_LENGTH):
role_label = _carrier_role_label(carrier_kind)
TerminalObjects.ensure_string_property(
obj,
"QetCarrierKind",
"QET Wiring",
"3D wiring carrier kind",
carrier_kind,
)
TerminalObjects.ensure_string_property(
obj,
"QetCarrierRoleLabel",
"QET Wiring",
"3D wiring carrier role label",
role_label,
)
TerminalObjects.ensure_string_property(
obj,
"QetCarrierAxis",
"QET Wiring",
"3D wiring carrier axis",
"x",
)
TerminalObjects.ensure_string_property(
obj,
"QetCarrierSourcePath",
"QET Wiring",
"3D wiring carrier source path",
source_path,
)
_ensure_float_property(obj, "QetCarrierBaseLength", base_length, "Carrier base length")
return obj
def _iter_object_tree(obj):
yield obj
for child in list(getattr(obj, "Group", []) or []):
for nested in _iter_object_tree(child):
yield nested
def _scale_shape_x(obj, factor):
shape = getattr(obj, "Shape", None)
if shape is None or not hasattr(shape, "transformGeometry"):
return False
matrix = App.Matrix()
matrix.scale(float(factor), 1.0, 1.0)
obj.Shape = shape.transformGeometry(matrix)
return True
def _apply_carrier_length(carrier, length_mm):
length = max(float(length_mm or 0.0), 1.0)
base_length = float(getattr(carrier, "QetCarrierBaseLength", 0.0) or DEFAULT_CARRIER_BASE_LENGTH)
target_scale = length / base_length
current_scale = float(getattr(carrier, "QetCarrierScaleX", 1.0) or 1.0)
factor = target_scale / current_scale if current_scale else target_scale
for obj in _iter_object_tree(carrier):
try:
_scale_shape_x(obj, factor)
except Exception:
pass
_ensure_float_property(carrier, "QetCarrierLength", length, "Carrier length")
_ensure_float_property(carrier, "QetCarrierScaleX", target_scale, "Carrier X scale")
return carrier
def _selected_waypoint():
for picked in _selection_ex():
picked_points = list(getattr(picked, "PickedPoints", []) or [])
@ -324,7 +502,7 @@ def _selected_waypoint():
"carrier_kind": _carrier_kind_from_object(obj),
"carrier_axis": carrier_axis,
"source_label": getattr(obj, "Label", "") if obj is not None else "",
"source_object_name": getattr(obj, "Name", "") if obj is not None else "",
"source_object_name": getattr(_carrier_object_from_object(obj) or obj, "Name", "") if obj is not None else "",
"subelement_name": subelement_names[0] if subelement_names else "",
}
return None
@ -488,6 +666,65 @@ class ManualWiringController:
marked.append(obj)
return marked
def import_carrier_asset(
self,
asset_path,
carrier_kind,
length_mm=DEFAULT_CARRIER_BASE_LENGTH,
importer=None,
):
carrier_kind = (carrier_kind or "").strip()
if carrier_kind not in {"wire_duct", "rail"}:
raise ManualWiringPanelError("只能导入线槽或导轨资产。")
path = str(asset_path or "").strip()
if not path:
path = _builtin_carrier_asset_path(carrier_kind)
if not path:
raise ManualWiringPanelError("找不到内置线槽/导轨资产。")
doc = _active_document()
project_uuid = getattr(TerminalObjects.ensure_root_group(doc), "QetProjectUuid", "").strip()
carrier_group = WiringObjects.ensure_carrier_group(doc, project_uuid)
imported = list((importer or _import_carrier_objects_from_path)(doc, path) or [])
if not imported:
raise ManualWiringPanelError("没有从模型文件导入任何对象。")
role_label = _carrier_role_label(carrier_kind)
container_name = "QETCarrier_{0}".format(TerminalObjects.safe_token(carrier_kind))
container = doc.addObject("App::DocumentObjectGroup", container_name)
container.Label = "QET {0}".format(role_label or carrier_kind)
carrier_group.addObject(container)
for obj in imported:
if obj not in getattr(container, "Group", []):
container.addObject(obj)
_set_carrier_properties(container, carrier_kind, source_path=path)
for obj in imported:
_set_carrier_properties(obj, carrier_kind, source_path=path)
_apply_carrier_length(container, length_mm)
try:
doc.recompute()
except Exception:
pass
return container
def apply_length_to_selected_carriers(self, length_mm):
selected = _selected_carrier_objects()
if not selected:
raise ManualWiringPanelError("请先选择线槽或导轨对象。")
updated = []
for carrier in selected:
if not (getattr(carrier, "QetCarrierKind", "") or "").strip():
continue
updated.append(_apply_carrier_length(carrier, length_mm))
if not updated:
raise ManualWiringPanelError("所选对象不是已标记的线槽或导轨。")
try:
_active_document().recompute()
except Exception:
pass
return updated
def _clear_preview_objects(self):
doc = getattr(App, "ActiveDocument", None)
if doc is None:
@ -527,10 +764,12 @@ class ManualWiringController:
start_terminal = _find_terminal_by_uuid(
doc,
getattr(task, "QetStartTerminalUuid", ""),
element_uuid=getattr(task, "QetStartElementUuid", ""),
)
end_terminal = _find_terminal_by_uuid(
doc,
getattr(task, "QetEndTerminalUuid", ""),
element_uuid=getattr(task, "QetEndElementUuid", ""),
)
if start_terminal is None or end_terminal is None:
raise ManualWiringPanelError("导线任务的起点或终点工程端子未找到。")
@ -576,6 +815,7 @@ class ManualWiringController:
end_terminal = _find_terminal_by_uuid(
doc,
getattr(self.current_task, "QetEndTerminalUuid", ""),
element_uuid=getattr(self.current_task, "QetEndElementUuid", ""),
)
wire_kwargs = _task_wire_kwargs(self.current_task)
else:
@ -668,6 +908,15 @@ class ManualWiringTaskPanel:
self.exit_length_input.setSingleStep(5.0)
self.exit_length_input.setSuffix(" mm")
self.exit_length_input.setValue(self.controller.terminal_exit_length)
self.carrier_length_input = QtWidgets.QDoubleSpinBox()
self.carrier_length_input.setRange(1.0, 10000.0)
self.carrier_length_input.setDecimals(1)
self.carrier_length_input.setSingleStep(50.0)
self.carrier_length_input.setSuffix(" mm")
self.carrier_length_input.setValue(DEFAULT_CARRIER_BASE_LENGTH)
self.import_duct_button = QtWidgets.QPushButton("导入线槽")
self.import_rail_button = QtWidgets.QPushButton("导入导轨")
self.apply_carrier_length_button = QtWidgets.QPushButton("应用载体长度")
self.start_button = QtWidgets.QPushButton("设为起点")
self.mark_duct_button = QtWidgets.QPushButton("标记为线槽")
self.mark_cabinet_button = QtWidgets.QPushButton("标记为柜面")
@ -687,6 +936,15 @@ class ManualWiringTaskPanel:
exit_layout.addWidget(QtWidgets.QLabel("端子出线长度"))
exit_layout.addWidget(self.exit_length_input)
layout.addLayout(exit_layout)
carrier_length_layout = QtWidgets.QHBoxLayout()
carrier_length_layout.addWidget(QtWidgets.QLabel("载体长度"))
carrier_length_layout.addWidget(self.carrier_length_input)
layout.addLayout(carrier_length_layout)
import_layout = QtWidgets.QHBoxLayout()
import_layout.addWidget(self.import_duct_button)
import_layout.addWidget(self.import_rail_button)
import_layout.addWidget(self.apply_carrier_length_button)
layout.addLayout(import_layout)
carrier_layout = QtWidgets.QHBoxLayout()
carrier_layout.addWidget(self.mark_duct_button)
carrier_layout.addWidget(self.mark_cabinet_button)
@ -712,6 +970,9 @@ class ManualWiringTaskPanel:
self.use_task_button.clicked.connect(self.use_selected_task)
self.reload_tasks_button.clicked.connect(self._refresh_task_list)
self.exit_length_input.valueChanged.connect(self.set_exit_length)
self.import_duct_button.clicked.connect(self.import_wire_duct)
self.import_rail_button.clicked.connect(self.import_din_rail)
self.apply_carrier_length_button.clicked.connect(self.apply_carrier_length)
self.mark_duct_button.clicked.connect(self.mark_wire_duct)
self.mark_cabinet_button.clicked.connect(self.mark_cabinet)
self.mark_rail_button.clicked.connect(self.mark_rail)
@ -787,6 +1048,60 @@ class ManualWiringTaskPanel:
except Exception as exc:
self._set_error(str(exc))
def _select_carrier_asset_path(self, carrier_kind):
default_path = _builtin_carrier_asset_path(carrier_kind)
default_dir = str(Path(default_path).parent) if default_path else ""
title = "选择线槽模型" if carrier_kind == "wire_duct" else "选择导轨模型"
if not hasattr(QtWidgets, "QFileDialog"):
return default_path
path, _selected_filter = QtWidgets.QFileDialog.getOpenFileName(
self.form,
title,
default_dir,
"3D carrier asset (*.FCStd *.fcstd *.step *.stp);;All files (*.*)",
)
return path or default_path
def import_carrier(self, carrier_kind):
path = self._select_carrier_asset_path(carrier_kind)
carrier = self.controller.import_carrier_asset(
path,
carrier_kind,
length_mm=self.carrier_length_input.value(),
)
self._set_status(
"已导入{0}{1}".format(
_carrier_role_label(carrier_kind) or carrier_kind,
getattr(carrier, "Label", "") or getattr(carrier, "Name", ""),
)
)
def import_wire_duct(self):
try:
self.import_carrier("wire_duct")
except Exception as exc:
self._set_error(str(exc))
def import_din_rail(self):
try:
self.import_carrier("rail")
except Exception as exc:
self._set_error(str(exc))
def apply_carrier_length(self):
try:
updated = self.controller.apply_length_to_selected_carriers(
self.carrier_length_input.value()
)
self._set_status(
"已更新 {0} 个线槽/导轨长度为 {1:.1f} mm。".format(
len(updated),
self.carrier_length_input.value(),
)
)
except Exception as exc:
self._set_error(str(exc))
def mark_carrier(self, carrier_kind):
marked = self.controller.mark_selected_carriers(carrier_kind)
role_label = _carrier_role_label(carrier_kind) or carrier_kind

@ -3,7 +3,11 @@
from collections import OrderedDict
import FreeCAD as App
import FreeCADGui as Gui
try:
import FreeCADGui as Gui
except ImportError:
Gui = None
import DeviceImport
import TerminalObjects as TerminalObjects
@ -76,6 +80,59 @@ def _payload_device_lookup(payload):
}
def _payload_device_instance_by_element(payload):
result = {}
for item in payload.get("devices", []) or []:
if not isinstance(item, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
instance_id = (item.get("instance_id") or "").strip()
if element_uuid and instance_id and element_uuid not in result:
result[element_uuid] = instance_id
return result
def _wire_endpoint_terminal_entries(payload, existing_keys):
wires = payload.get("wires", []) or []
if not isinstance(wires, list):
return []
instance_by_element = _payload_device_instance_by_element(payload)
seen = set(existing_keys or set())
entries = []
for wire in wires:
if not isinstance(wire, dict):
continue
for side in ("start", "end"):
terminal_uuid = (wire.get("{0}_terminal_uuid".format(side)) or "").strip()
element_uuid = (wire.get("{0}_element_uuid".format(side)) or "").strip()
instance_id = (wire.get("{0}_instance_id".format(side)) or "").strip()
if not instance_id and element_uuid:
instance_id = instance_by_element.get(element_uuid, "")
if not terminal_uuid or not (element_uuid or instance_id):
continue
key = (element_uuid, terminal_uuid)
if key in seen:
continue
seen.add(key)
terminal_display = (
wire.get("{0}_terminal_display".format(side))
or wire.get("{0}_terminal_label".format(side))
or ""
)
entries.append(
{
"terminal_uuid": terminal_uuid,
"element_uuid": element_uuid,
"instance_id": instance_id,
"terminal_display": terminal_display,
"slot_name_hint": terminal_display,
}
)
return entries
def _terminal_belongs_to_payload_devices(entry, device_lookup):
instance_id = entry["instance_id"]
element_uuid = entry["element_uuid"]
@ -95,6 +152,21 @@ def _ensure_visible(obj):
pass
def _set_terminal_geometry_source(obj, source):
source_text = (source or "").strip()
if source_text == "fallback":
source_text = "generated_bbox_fallback"
if not source_text:
source_text = "template"
TerminalObjects.ensure_string_property(
obj,
"QetTerminalGeometrySource",
"QET Exchange",
"How this engineering terminal geometry was resolved",
source_text,
)
def _hide_object(obj):
try:
if getattr(obj, "ViewObject", None) is not None:
@ -266,6 +338,7 @@ def _create_terminal_object(doc, terminal_uuid, entry, slot, terminal_group, pro
label=terminal_label,
slot_name=slot.get("name", ""),
)
_set_terminal_geometry_source(terminal_obj, slot.get("source", "template"))
_ensure_visible(terminal_obj)
return terminal_obj
@ -287,6 +360,17 @@ def import_terminals_from_payload(payload, scene_path=""):
terminal_entries = payload.get("terminals", [])
if not isinstance(terminal_entries, list):
raise TerminalImportError("Field 'terminals' must be a list.")
terminal_entries = list(terminal_entries)
terminal_entry_keys = set()
for item in terminal_entries:
if not isinstance(item, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
terminal_uuid = (item.get("terminal_uuid") or "").strip()
if element_uuid and terminal_uuid:
terminal_entry_keys.add((element_uuid, terminal_uuid))
synthesized_entries = _wire_endpoint_terminal_entries(payload, terminal_entry_keys)
terminal_entries.extend(synthesized_entries)
device_lookup = _payload_device_lookup(payload)
@ -300,6 +384,8 @@ def import_terminals_from_payload(payload, scene_path=""):
"removed_terminals": 0,
"reused_template_hints": 0,
"matched_by_slot_hint": 0,
"generated_fallback_slots": 0,
"synthesized_wire_endpoint_terminals": len(synthesized_entries),
"skipped_missing_slot": 0,
"skipped_missing_device": 0,
"skipped_invalid_entry": 0,
@ -364,6 +450,14 @@ def import_terminals_from_payload(payload, scene_path=""):
resolved_model_path,
len(entries),
)
if len(fallback_slots) < len(entries):
generated_slots = TemplateSemantics.build_fallback_terminal_slots(
device_group,
len(entries),
)
if generated_slots:
fallback_slots = generated_slots
report["generated_fallback_slots"] += len(generated_slots)
slot_lookup = _build_slot_lookup(template_slots)
for index, entry in enumerate(entries):
@ -415,6 +509,7 @@ def import_terminals_from_payload(payload, scene_path=""):
label=_terminal_entry_label(entry, slot, terminal_uuid),
slot_name=slot.get("name", ""),
)
_set_terminal_geometry_source(terminal_obj, slot.get("source", "template"))
try:
terminal_obj.Placement = _slot_placement(slot)
except Exception:
@ -443,6 +538,7 @@ def import_terminals_from_payload(payload, scene_path=""):
label=_terminal_entry_label(entry, slot, terminal_uuid),
slot_name=slot.get("name", ""),
)
_set_terminal_geometry_source(terminal_obj, slot.get("source", "template"))
try:
terminal_obj.Placement = _slot_placement(slot)
except Exception:
@ -465,6 +561,8 @@ def import_terminals_from_payload(payload, scene_path=""):
continue
if terminal_obj in used_objects:
continue
if TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
report["warnings"].append(
"Removed stale terminal {0} from device {1}.".format(
terminal_uuid, device_element_uuid
@ -474,10 +572,11 @@ def import_terminals_from_payload(payload, scene_path=""):
report["removed_terminals"] += 1
doc.recompute()
try:
Gui.SendMsgToActiveView("ViewFit")
except Exception:
pass
if Gui is not None:
try:
Gui.SendMsgToActiveView("ViewFit")
except Exception:
pass
_append_debug_log(
"TerminalImport finished: imported={0}, updated={1}, removed={2}, skipped_unmatched_parent={3}, skipped_missing_slot={4}".format(

@ -132,6 +132,40 @@ def _find_task_by_wire_uuid(task_group, wire_uuid):
return None
def _remove_task_object(doc, task_group, task):
try:
if task in list(getattr(task_group, "Group", []) or []):
task_group.removeObject(task)
except Exception:
try:
task_group.Group = [
candidate
for candidate in list(getattr(task_group, "Group", []) or [])
if candidate is not task
]
except Exception:
pass
try:
doc.removeObject(getattr(task, "Name", ""))
except Exception:
pass
def _remove_stale_wire_tasks(doc, task_group, active_wire_uuids):
active = {(wire_uuid or "").strip() for wire_uuid in active_wire_uuids if (wire_uuid or "").strip()}
removed = 0
for candidate in list(getattr(task_group, "Group", []) or []):
wire_uuid = (getattr(candidate, "QetWireUuid", "") or "").strip()
route_type = (getattr(candidate, "RouteType", "") or "").strip()
if not wire_uuid or route_type != "Task":
continue
if wire_uuid in active:
continue
_remove_task_object(doc, task_group, candidate)
removed += 1
return removed
def _ensure_string_property(obj, prop_name, value, description="QET wire task property"):
TerminalObjects.ensure_string_property(
obj,
@ -221,11 +255,13 @@ def import_wire_tasks_from_payload(payload, doc=None):
"total_wires": len(wires),
"imported_tasks": 0,
"updated_tasks": 0,
"removed_stale_tasks": 0,
"skipped_invalid": 0,
"warnings": [],
}
device_labels = _device_display_map(payload)
active_wire_uuids = []
for index, item in enumerate(wires):
try:
entry = _normalize_wire_entry(item, index, device_labels=device_labels)
@ -234,10 +270,17 @@ def import_wire_tasks_from_payload(payload, doc=None):
report["warnings"].append(str(exc))
continue
active_wire_uuids.append(entry["wire_uuid"])
_task, created = _upsert_wire_task(doc, task_group, project_uuid, entry)
if created:
report["imported_tasks"] += 1
else:
report["updated_tasks"] += 1
report["removed_stale_tasks"] = _remove_stale_wire_tasks(
doc,
task_group,
active_wire_uuids,
)
return report

@ -347,6 +347,62 @@ class ManualWiringPanelTest(unittest.TestCase):
self.assertEqual("线槽", getattr(carrier, "QetCarrierRoleLabel", ""))
self.assertIn(carrier, carrier_group.Group)
def test_controller_imports_wire_duct_carrier_from_asset_path(self):
_selection_state = _install_fake_freecad()
terminal_objects, panel = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
app.ActiveDocument = doc
terminal_objects.ensure_root_group(doc, "project-1")
def importer(doc, path):
obj = doc.addObject("Part::Feature", "ImportedWireDuct")
obj.Label = "Imported Wire Duct"
return [obj]
carrier = panel.ManualWiringController().import_carrier_asset(
r"D:\assets\duct.FCStd",
"wire_duct",
length_mm=600.0,
importer=importer,
)
carrier_group = doc.getObject("QETWiring_02_Carriers")
self.assertEqual("wire_duct", getattr(carrier, "QetCarrierKind", ""))
self.assertEqual("线槽", getattr(carrier, "QetCarrierRoleLabel", ""))
self.assertEqual(r"D:\assets\duct.FCStd", getattr(carrier, "QetCarrierSourcePath", ""))
self.assertEqual(200.0, getattr(carrier, "QetCarrierBaseLength", None))
self.assertEqual(600.0, getattr(carrier, "QetCarrierLength", None))
self.assertIn(carrier, carrier_group.Group)
self.assertEqual(3.0, getattr(carrier, "QetCarrierScaleX", None))
def test_controller_applies_length_to_selected_carrier(self):
selection_state = _install_fake_freecad()
terminal_objects, panel = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
app.ActiveDocument = doc
terminal_objects.ensure_root_group(doc, "project-1")
carrier = doc.addObject("App::DocumentObjectGroup", "Carrier")
terminal_objects.ensure_string_property(
carrier,
"QetCarrierKind",
"QET Wiring",
"Carrier kind",
"rail",
)
carrier.addProperty("App::PropertyFloat", "QetCarrierBaseLength", "QET Wiring", "Base length")
carrier.QetCarrierBaseLength = 200.0
selection_state["selection"] = [carrier]
updated = panel.ManualWiringController().apply_length_to_selected_carriers(500.0)
self.assertEqual([carrier], updated)
self.assertEqual(500.0, getattr(carrier, "QetCarrierLength", None))
self.assertEqual(2.5, getattr(carrier, "QetCarrierScaleX", None))
def test_controller_deletes_last_waypoint_and_preview_point(self):
selection_state = _install_fake_freecad()
terminal_objects, panel = _reload_modules()
@ -647,6 +703,70 @@ class ManualWiringPanelTest(unittest.TestCase):
self.assertEqual("terminal-end", getattr(wire, "QetEndTerminalUuid", ""))
self.assertEqual("Routed", getattr(task, "RouteStatus", ""))
def test_controller_prefers_task_element_uuid_when_terminal_uuid_is_reused(self):
_install_fake_freecad()
terminal_objects, panel = _reload_modules()
wiring_objects = importlib.import_module("WiringObjects")
app = sys.modules["FreeCAD"]
doc = FakeDocument()
app.ActiveDocument = doc
root = terminal_objects.ensure_root_group(doc, "project-1")
def add_device(element_uuid, instance_id, terminal_name):
device = doc.addObject("App::DocumentObjectGroup", "QETDevice_" + element_uuid)
root.addObject(device)
terminal_objects.ensure_string_property(device, "QetElementUuid", "QET Exchange", "", element_uuid)
terminal_objects.ensure_string_property(device, "QetInstanceId", "QET Exchange", "", instance_id)
terminal = doc.addObject("Part::LocalCoordinateSystem", terminal_name)
terminal.Placement = app.Placement(app.Vector(1, 2, 3), app.Rotation())
device.addObject(terminal)
terminal_objects.set_terminal_semantics(
terminal,
"project-1",
element_uuid,
"terminal-reused",
instance_id,
label=terminal_name,
)
return terminal
wrong_start = add_device("device-a", "instance-a", "WrongStart")
correct_start = add_device("device-b", "instance-b", "CorrectStart")
correct_end = add_device("device-c", "instance-c", "CorrectEnd")
_ = wrong_start
task = wiring_objects.create_wire_task(
doc,
"project-1",
"wire-1",
"W001",
"terminal-reused",
"terminal-reused",
"",
"",
)
terminal_objects.ensure_string_property(
task,
"QetStartElementUuid",
"QET Wiring",
"",
"device-b",
)
terminal_objects.ensure_string_property(
task,
"QetEndElementUuid",
"QET Wiring",
"",
"device-c",
)
controller = panel.ManualWiringController()
controller.set_task_from_object(task)
self.assertIs(correct_start, controller.start_terminal)
self.assertIs(correct_end, panel._find_terminal_by_uuid(doc, "terminal-reused", element_uuid="device-c"))
if __name__ == "__main__":
unittest.main()

@ -128,7 +128,7 @@ def _reload_modules():
class TerminalImportTemplateSlotPolicyTest(unittest.TestCase):
def test_import_skips_terminal_when_device_has_no_template_slots(self):
def test_import_creates_fallback_terminal_when_device_has_no_template_slots(self):
_install_fake_freecad()
terminal_import, terminal_objects, device_import = _reload_modules()
@ -183,9 +183,170 @@ class TerminalImportTemplateSlotPolicyTest(unittest.TestCase):
terminal_objects.TERMINAL_GROUP_KIND,
)
self.assertEqual(0, report["imported_terminals"])
self.assertEqual(1, report.get("skipped_missing_slot"))
self.assertEqual([], terminal_objects.collect_terminal_objects(terminal_group))
terminals = terminal_objects.collect_terminal_objects(terminal_group)
self.assertEqual(1, report["imported_terminals"])
self.assertEqual(0, report.get("skipped_missing_slot"))
self.assertEqual(1, report.get("generated_fallback_slots"))
self.assertEqual(1, len(terminals))
self.assertEqual("terminal-a", terminals[0].QetTerminalUuid)
self.assertEqual("generated_bbox_fallback", terminals[0].QetTerminalGeometrySource)
def test_import_preserves_local_terminals_when_payload_has_no_entry_for_device(self):
_install_fake_freecad()
terminal_import, terminal_objects, device_import = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, project_uuid="project-1")
device = doc.addObject("App::Part", "QETDevice_device_a")
root.addObject(device)
terminal_objects.ensure_string_property(
device,
"QetProjectUuid",
"QET Exchange",
"Project UUID",
"project-1",
)
terminal_objects.ensure_string_property(
device,
"QetElementUuid",
"QET Exchange",
"Element UUID",
"device-a",
)
terminal_objects.ensure_string_property(
device,
"QetInstanceId",
"QET Exchange",
"Instance ID",
"instance-a",
)
terminal_group = terminal_objects.ensure_terminal_group(
doc,
device,
project_uuid="project-1",
instance_id="instance-a",
)
local_terminal = terminal_objects.create_lcs_object(
doc,
"QETTerminal_instance_a_P1",
placement=app.Placement(app.Vector(1, 0, 0), app.Rotation()),
label="P1",
)
terminal_group.addObject(local_terminal)
terminal_objects.set_terminal_semantics(
local_terminal,
"project-1",
"device-a",
"local:instance-a:P1",
"instance-a",
label="P1",
slot_name="P1",
)
report = terminal_import.import_terminals_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"element_uuid": "device-a",
"instance_id": "instance-a",
}
],
"terminals": [],
}
)
terminals = terminal_objects.collect_terminal_objects(terminal_group)
self.assertEqual(0, report["removed_terminals"])
self.assertEqual([local_terminal], terminals)
self.assertEqual("local:instance-a:P1", local_terminal.QetTerminalUuid)
def test_import_synthesizes_missing_terminal_entries_from_wire_endpoints(self):
_install_fake_freecad()
terminal_import, terminal_objects, device_import = _reload_modules()
doc = FakeDocument()
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, project_uuid="project-1")
def add_device(element_uuid, instance_id):
device = doc.addObject("App::Part", "QETDevice_" + element_uuid)
root.addObject(device)
terminal_objects.ensure_string_property(
device,
"QetProjectUuid",
"QET Exchange",
"Project UUID",
"project-1",
)
terminal_objects.ensure_string_property(
device,
"QetElementUuid",
"QET Exchange",
"Element UUID",
element_uuid,
)
terminal_objects.ensure_string_property(
device,
"QetInstanceId",
"QET Exchange",
"Instance ID",
instance_id,
)
return device
start_device = add_device("device-a", "instance-a")
end_device = add_device("device-b", "instance-b")
report = terminal_import.import_terminals_from_payload(
{
"project_uuid": "project-1",
"devices": [
{"element_uuid": "device-a", "instance_id": "instance-a"},
{"element_uuid": "device-b", "instance_id": "instance-b"},
],
"terminals": [],
"wires": [
{
"wire_id": "wire-1",
"start_element_uuid": "device-a",
"start_terminal_uuid": "terminal-a",
"start_instance_id": "",
"end_element_uuid": "device-b",
"end_terminal_uuid": "terminal-b",
"end_instance_id": "",
}
],
}
)
start_terminals = terminal_objects.collect_terminal_objects(
terminal_objects.ensure_terminal_group(
doc,
start_device,
project_uuid="project-1",
instance_id="instance-a",
)
)
end_terminals = terminal_objects.collect_terminal_objects(
terminal_objects.ensure_terminal_group(
doc,
end_device,
project_uuid="project-1",
instance_id="instance-b",
)
)
self.assertEqual(2, report["imported_terminals"])
self.assertEqual(2, report["synthesized_wire_endpoint_terminals"])
self.assertEqual("terminal-a", start_terminals[0].QetTerminalUuid)
self.assertEqual("device-a", start_terminals[0].QetElementUuid)
self.assertEqual("terminal-b", end_terminals[0].QetTerminalUuid)
self.assertEqual("device-b", end_terminals[0].QetElementUuid)
def test_import_uses_slot_name_hint_to_match_template_slots(self):
_install_fake_freecad()

@ -84,6 +84,17 @@ class FakeDocument:
return obj
return None
def removeObject(self, name):
obj = self.getObject(name)
if obj is None:
return
for candidate in list(self.Objects):
if obj in getattr(candidate, "Group", []):
candidate.Group.remove(obj)
if candidate in getattr(obj, "InList", []):
obj.InList.remove(candidate)
self.Objects.remove(obj)
def _reload_modules():
for name in ["TerminalObjects", "WiringObjects", "WiringImport"]:
@ -160,6 +171,44 @@ class WiringImportTest(unittest.TestCase):
self.assertEqual("W001-updated", task_group.Group[0].QetWireMark)
self.assertEqual("Routed", task_group.Group[0].RouteStatus)
def test_reimport_removes_stale_wire_tasks_not_in_current_payload(self):
_install_fake_freecad()
terminal_objects, wiring_objects, wiring_import = _reload_modules()
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
task_group = wiring_objects.ensure_task_group(doc, "project-1")
stale_task = wiring_objects.create_wire_task(
doc,
"project-1",
"conductor:old-wire",
"old wire",
"old-start",
"old-end",
"",
"",
)
self.assertIn(stale_task, task_group.Group)
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "direction:new-wire:0",
"start_terminal_uuid": "terminal-a",
"end_terminal_uuid": "terminal-b",
}
],
}
report = wiring_import.import_wire_tasks_from_payload(payload, doc)
self.assertEqual(1, report["imported_tasks"])
self.assertEqual(1, report["removed_stale_tasks"])
self.assertIsNone(doc.getObject(stale_task.Name))
self.assertEqual(1, len(task_group.Group))
self.assertEqual("direction:new-wire:0", task_group.Group[0].QetWireUuid)
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save