fix: 增强FreeCAD三维导入和布线诊断

dev
Zhaowenlong 3 weeks ago
parent e6ab63be4f
commit 5c813b66dd

@ -1182,6 +1182,7 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
"missing_endpoint_samples": [],
"collision_samples": [],
"errors": [],
"error_samples": [],
"routes": [],
}
if isinstance(prepared_layout, dict):
@ -1237,7 +1238,18 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
wire_style_id=_wire_item_value(item, "wire_style_id"),
)
except Exception as exc:
report["errors"].append(str(exc))
error_text = str(exc)
report["errors"].append(error_text)
if len(report["error_samples"]) < 8:
report["error_samples"].append(
{
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"start_terminal_uuid": start_uuid,
"end_terminal_uuid": end_uuid,
"error": error_text,
}
)
continue
lane_indexes_by_pair[lane_key] = route_lane_index + 1
if result["route_status"] == "CollisionWarning":

@ -629,16 +629,6 @@ def _remove_template_terminal_hints(doc, container):
return removed
def _clear_group_contents(doc, group):
for child in list(getattr(group, "Group", []) or []):
child_name = getattr(child, "Name", "")
if child_name.startswith(TERMINAL_GROUP_PREFIX) or child_name.startswith(WIRE_GROUP_PREFIX):
continue
if getattr(child, "QetGroupKind", "").strip() in {GROUP_KIND_TERMINALS, GROUP_KIND_WIRES}:
continue
_remove_object_tree(doc, child)
def _existing_group_objects(doc, group):
result = []
for child in list(getattr(group, "Group", []) or []):
@ -654,6 +644,19 @@ def _is_exchange_sidecar_group(obj):
return getattr(obj, "QetGroupKind", "").strip() in {GROUP_KIND_TERMINALS, GROUP_KIND_WIRES}
def _existing_model_objects(doc, group):
return [
obj
for obj in _existing_group_objects(doc, group)
if not _is_exchange_sidecar_group(obj)
]
def _remove_model_objects(doc, objects):
for obj in list(objects or []):
_remove_object_tree(doc, obj)
def _keep_only_direct_model_children(device_group, direct_model_objects):
allowed_ids = {id(obj) for obj in direct_model_objects if obj is not None}
kept_children = []
@ -902,9 +905,12 @@ def _import_cabinet_model(doc, root_group, cabinet, report):
return
project_uuid = getattr(root_group, "QetProjectUuid", "").strip()
existing_group = _find_cabinet_group(doc, _cabinet_instance_id(cabinet))
previous_path = ""
if existing_group is not None:
previous_path = getattr(existing_group, "QetCabinetResolvedScenePath", "").strip()
cabinet_group = _ensure_cabinet_model_group(doc, root_group, cabinet, project_uuid)
existing_model_objects = _existing_group_objects(doc, cabinet_group)
previous_path = getattr(cabinet_group, "QetCabinetResolvedScenePath", "").strip()
existing_model_objects = _existing_model_objects(doc, cabinet_group)
same_source = _normalized_path_key(previous_path) == _normalized_path_key(resolved_scene_path)
if existing_model_objects and same_source:
report.setdefault("cabinet_reused", 0)
@ -917,7 +923,6 @@ def _import_cabinet_model(doc, root_group, cabinet, report):
return
had_existing_model = bool(existing_model_objects)
_clear_group_contents(doc, cabinet_group)
_ensure_string_property(
cabinet_group,
"QetCabinetResolvedScenePath",
@ -938,6 +943,7 @@ def _import_cabinet_model(doc, root_group, cabinet, report):
merge=False,
use_link_group=True,
)
_remove_model_objects(doc, existing_model_objects)
report["cabinet_imported"] += 1
if had_existing_model:
report.setdefault("cabinet_reimported", 0)
@ -947,6 +953,14 @@ def _import_cabinet_model(doc, root_group, cabinet, report):
report["cabinet_added"] += 1
_append_debug_log("DeviceImport cabinet import succeeded")
except Exception as exc:
if had_existing_model:
_ensure_string_property(
cabinet_group,
"QetCabinetResolvedScenePath",
"QET Exchange",
"Resolved local cabinet scene path from QET exchange",
previous_path,
)
report["cabinet_skipped_import_error"] += 1
report["warnings"].append(
"机柜 3D 导入失败:{0}".format(exc)
@ -1039,6 +1053,9 @@ def import_devices_from_payload(payload, scene_path=""):
instance_id = existing_instance_id or _generate_instance_id(project_uuid, element_uuid)
report.setdefault("generated_instance_ids", 0)
report["generated_instance_ids"] += 1
previous_model_path = ""
if existing_group is not None:
previous_model_path = getattr(existing_group, "QetResolvedModelPath", "").strip()
device_group, created_now = _ensure_device_group(
doc,
root_group,
@ -1048,7 +1065,7 @@ def import_devices_from_payload(payload, scene_path=""):
display_tag,
index,
)
_clear_group_contents(doc, device_group)
existing_model_objects = _existing_model_objects(doc, device_group)
try:
_append_debug_log(
@ -1060,7 +1077,16 @@ def import_devices_from_payload(payload, scene_path=""):
_append_debug_log(
"DeviceImport import succeeded for element_uuid={0}".format(element_uuid)
)
_remove_model_objects(doc, existing_model_objects)
except Exception as exc:
if existing_model_objects:
_ensure_string_property(
device_group,
"QetResolvedModelPath",
"QET Exchange",
"Resolved local model path from QET exchange",
previous_model_path,
)
report["skipped_import_error"] += 1
report["warnings"].append(
"{0} 导入失败:{1}".format(

@ -1241,6 +1241,41 @@ class AutoRoutingTest(unittest.TestCase):
self.assertEqual(1, route["network"]["carriers"])
self.assertEqual("WireDuct", route["route_track"]["segments"][0]["carrier"]["kind"])
def test_route_eplan_connections_records_wire_identity_for_errors(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
_terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "wire-bad",
"wire_label": "N500",
"start_terminal_uuid": "terminal-start",
"end_terminal_uuid": "terminal-start",
}
],
}
report = auto_routing.route_eplan_connections_from_payload(doc, payload)
self.assertEqual(1, len(report["errors"]))
self.assertIn("error_samples", report)
self.assertEqual("wire-bad", report["error_samples"][0]["wire_uuid"])
self.assertEqual("N500", report["error_samples"][0]["wire_label"])
self.assertEqual("terminal-start", report["error_samples"][0]["start_terminal_uuid"])
self.assertEqual("terminal-start", report["error_samples"][0]["end_terminal_uuid"])
self.assertIn("different", report["error_samples"][0]["error"])
def test_route_eplan_connections_lane_index_is_per_terminal_pair(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()

@ -200,6 +200,9 @@ class FakeDocument:
if target in getattr(obj, "InList", []):
obj.InList.remove(target)
def recompute(self):
return None
def copyObject(self, source_obj, recursive):
copies = {}
@ -355,6 +358,108 @@ class FcstdDeviceImportTest(unittest.TestCase):
self.assertIn(group_a, root_group.Group)
self.assertIn(group_b, root_group.Group)
def test_failed_cabinet_reimport_keeps_existing_model(self):
with tempfile.TemporaryDirectory() as temp_dir:
cabinet_path = Path(temp_dir) / "cabinet.step"
cabinet_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
doc = FakeDocument("QETScene")
root_group = device_import._ensure_root_group(doc, None, "project-1")
cabinet = {
"location_id": 1,
"resolved_scene_path": str(cabinet_path),
"display_text": "Main Cabinet",
}
cabinet_group = device_import._ensure_cabinet_model_group(
doc,
root_group,
cabinet,
"project-1",
)
old_cabinet_path = str(Path(temp_dir) / "old-cabinet.step")
cabinet_group.QetCabinetResolvedScenePath = old_cabinet_path
old_body = doc.addObject("Part::Feature", "OldCabinetBody")
cabinet_group.addObject(old_body)
def failing_import(*_args, **_kwargs):
raise RuntimeError("simulated import failure")
device_import._import_model_into_group = failing_import
report = {
"cabinet_imported": 0,
"cabinet_added": 0,
"cabinet_reimported": 0,
"cabinet_reused": 0,
"cabinet_skipped_missing_model": 0,
"cabinet_skipped_missing_file": 0,
"cabinet_skipped_unsupported_format": 0,
"cabinet_skipped_import_error": 0,
"warnings": [],
}
device_import._import_cabinet_model(doc, root_group, cabinet, report)
self.assertEqual(1, report["cabinet_skipped_import_error"])
self.assertIs(doc.getObject("OldCabinetBody"), old_body)
self.assertIn(old_body, cabinet_group.Group)
self.assertEqual(old_cabinet_path, cabinet_group.QetCabinetResolvedScenePath)
def test_failed_device_reimport_keeps_existing_model(self):
with tempfile.TemporaryDirectory() as temp_dir:
model_path = Path(temp_dir) / "breaker.step"
model_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
doc = FakeDocument("QETScene")
root_group = device_import._ensure_root_group(doc, None, "project-1")
device_group, _created = device_import._ensure_device_group(
doc,
root_group,
"device-1",
"instance-1",
str(model_path),
"QF1",
0,
)
old_model_path = str(Path(temp_dir) / "old-breaker.step")
device_group.QetResolvedModelPath = old_model_path
old_body = doc.addObject("Part::Feature", "OldDeviceBody")
device_group.addObject(old_body)
def failing_import(*_args, **_kwargs):
raise RuntimeError("simulated import failure")
device_import._import_model_into_group = failing_import
sys.modules["DevicePreview"].find_main_exchange_document = lambda _name: doc
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"element_uuid": "device-1",
"instance_id": "instance-1",
"display_tag": "QF1",
}
],
"device_models": [
{
"element_uuid": "device-1",
"resolved_model_path": str(model_path),
}
],
}
)
self.assertEqual(1, report["skipped_import_error"])
self.assertIs(doc.getObject("OldDeviceBody"), old_body)
self.assertIn(old_body, device_group.Group)
self.assertEqual(old_model_path, device_group.QetResolvedModelPath)
def test_fcstd_import_preserves_template_slots_without_live_template_lcs(self):
source = FakeDocument("Source", r"D:\models\breaker.FCStd")
_install_fake_freecad(source)

Loading…
Cancel
Save