diff --git a/src/Mod/FreeCADExchange/AutoRouting.py b/src/Mod/FreeCADExchange/AutoRouting.py index e259a9a..544385b 100644 --- a/src/Mod/FreeCADExchange/AutoRouting.py +++ b/src/Mod/FreeCADExchange/AutoRouting.py @@ -1182,6 +1182,7 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la "missing_endpoint_samples": [], "collision_samples": [], "errors": [], + "error_samples": [], "routes": [], } if isinstance(prepared_layout, dict): @@ -1237,7 +1238,18 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la wire_style_id=_wire_item_value(item, "wire_style_id"), ) except Exception as exc: - report["errors"].append(str(exc)) + error_text = str(exc) + report["errors"].append(error_text) + if len(report["error_samples"]) < 8: + report["error_samples"].append( + { + "wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"), + "wire_label": _wire_item_value(item, "wire_label", "wire_mark"), + "start_terminal_uuid": start_uuid, + "end_terminal_uuid": end_uuid, + "error": error_text, + } + ) continue lane_indexes_by_pair[lane_key] = route_lane_index + 1 if result["route_status"] == "CollisionWarning": diff --git a/src/Mod/FreeCADExchange/DeviceImport.py b/src/Mod/FreeCADExchange/DeviceImport.py index 646c3e3..f9cbb4f 100644 --- a/src/Mod/FreeCADExchange/DeviceImport.py +++ b/src/Mod/FreeCADExchange/DeviceImport.py @@ -629,16 +629,6 @@ def _remove_template_terminal_hints(doc, container): return removed -def _clear_group_contents(doc, group): - for child in list(getattr(group, "Group", []) or []): - child_name = getattr(child, "Name", "") - if child_name.startswith(TERMINAL_GROUP_PREFIX) or child_name.startswith(WIRE_GROUP_PREFIX): - continue - if getattr(child, "QetGroupKind", "").strip() in {GROUP_KIND_TERMINALS, GROUP_KIND_WIRES}: - continue - _remove_object_tree(doc, child) - - def _existing_group_objects(doc, group): result = [] for child in list(getattr(group, "Group", []) or []): @@ -654,6 +644,19 @@ def _is_exchange_sidecar_group(obj): return getattr(obj, "QetGroupKind", "").strip() in {GROUP_KIND_TERMINALS, GROUP_KIND_WIRES} +def _existing_model_objects(doc, group): + return [ + obj + for obj in _existing_group_objects(doc, group) + if not _is_exchange_sidecar_group(obj) + ] + + +def _remove_model_objects(doc, objects): + for obj in list(objects or []): + _remove_object_tree(doc, obj) + + def _keep_only_direct_model_children(device_group, direct_model_objects): allowed_ids = {id(obj) for obj in direct_model_objects if obj is not None} kept_children = [] @@ -902,9 +905,12 @@ def _import_cabinet_model(doc, root_group, cabinet, report): return project_uuid = getattr(root_group, "QetProjectUuid", "").strip() + existing_group = _find_cabinet_group(doc, _cabinet_instance_id(cabinet)) + previous_path = "" + if existing_group is not None: + previous_path = getattr(existing_group, "QetCabinetResolvedScenePath", "").strip() cabinet_group = _ensure_cabinet_model_group(doc, root_group, cabinet, project_uuid) - existing_model_objects = _existing_group_objects(doc, cabinet_group) - previous_path = getattr(cabinet_group, "QetCabinetResolvedScenePath", "").strip() + existing_model_objects = _existing_model_objects(doc, cabinet_group) same_source = _normalized_path_key(previous_path) == _normalized_path_key(resolved_scene_path) if existing_model_objects and same_source: report.setdefault("cabinet_reused", 0) @@ -917,7 +923,6 @@ def _import_cabinet_model(doc, root_group, cabinet, report): return had_existing_model = bool(existing_model_objects) - _clear_group_contents(doc, cabinet_group) _ensure_string_property( cabinet_group, "QetCabinetResolvedScenePath", @@ -938,6 +943,7 @@ def _import_cabinet_model(doc, root_group, cabinet, report): merge=False, use_link_group=True, ) + _remove_model_objects(doc, existing_model_objects) report["cabinet_imported"] += 1 if had_existing_model: report.setdefault("cabinet_reimported", 0) @@ -947,6 +953,14 @@ def _import_cabinet_model(doc, root_group, cabinet, report): report["cabinet_added"] += 1 _append_debug_log("DeviceImport cabinet import succeeded") except Exception as exc: + if had_existing_model: + _ensure_string_property( + cabinet_group, + "QetCabinetResolvedScenePath", + "QET Exchange", + "Resolved local cabinet scene path from QET exchange", + previous_path, + ) report["cabinet_skipped_import_error"] += 1 report["warnings"].append( "机柜 3D 导入失败:{0}".format(exc) @@ -1039,6 +1053,9 @@ def import_devices_from_payload(payload, scene_path=""): instance_id = existing_instance_id or _generate_instance_id(project_uuid, element_uuid) report.setdefault("generated_instance_ids", 0) report["generated_instance_ids"] += 1 + previous_model_path = "" + if existing_group is not None: + previous_model_path = getattr(existing_group, "QetResolvedModelPath", "").strip() device_group, created_now = _ensure_device_group( doc, root_group, @@ -1048,7 +1065,7 @@ def import_devices_from_payload(payload, scene_path=""): display_tag, index, ) - _clear_group_contents(doc, device_group) + existing_model_objects = _existing_model_objects(doc, device_group) try: _append_debug_log( @@ -1060,7 +1077,16 @@ def import_devices_from_payload(payload, scene_path=""): _append_debug_log( "DeviceImport import succeeded for element_uuid={0}".format(element_uuid) ) + _remove_model_objects(doc, existing_model_objects) except Exception as exc: + if existing_model_objects: + _ensure_string_property( + device_group, + "QetResolvedModelPath", + "QET Exchange", + "Resolved local model path from QET exchange", + previous_model_path, + ) report["skipped_import_error"] += 1 report["warnings"].append( "{0} 导入失败:{1}".format( diff --git a/tests/python/freecad_exchange_auto_routing_test.py b/tests/python/freecad_exchange_auto_routing_test.py index 7648c1f..8d1ffbc 100644 --- a/tests/python/freecad_exchange_auto_routing_test.py +++ b/tests/python/freecad_exchange_auto_routing_test.py @@ -1241,6 +1241,41 @@ class AutoRoutingTest(unittest.TestCase): self.assertEqual(1, route["network"]["carriers"]) self.assertEqual("WireDuct", route["route_track"]["segments"][0]["carrier"]["kind"]) + def test_route_eplan_connections_records_wire_identity_for_errors(self): + _install_fake_freecad() + terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules() + app = sys.modules["FreeCAD"] + doc = FakeDocument() + terminal_objects.ensure_root_group(doc, "project-1") + _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0)) + routing_network.create_route_carrier( + doc, + [app.Vector(0, 0, 20), app.Vector(100, 0, 20)], + project_uuid="project-1", + kind="WireDuct", + ) + payload = { + "project_uuid": "project-1", + "wires": [ + { + "wire_id": "wire-bad", + "wire_label": "N500", + "start_terminal_uuid": "terminal-start", + "end_terminal_uuid": "terminal-start", + } + ], + } + + report = auto_routing.route_eplan_connections_from_payload(doc, payload) + + self.assertEqual(1, len(report["errors"])) + self.assertIn("error_samples", report) + self.assertEqual("wire-bad", report["error_samples"][0]["wire_uuid"]) + self.assertEqual("N500", report["error_samples"][0]["wire_label"]) + self.assertEqual("terminal-start", report["error_samples"][0]["start_terminal_uuid"]) + self.assertEqual("terminal-start", report["error_samples"][0]["end_terminal_uuid"]) + self.assertIn("different", report["error_samples"][0]["error"]) + def test_route_eplan_connections_lane_index_is_per_terminal_pair(self): _install_fake_freecad() terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules() diff --git a/tests/python/freecad_exchange_device_import_fcstd_test.py b/tests/python/freecad_exchange_device_import_fcstd_test.py index ec567d5..0326859 100644 --- a/tests/python/freecad_exchange_device_import_fcstd_test.py +++ b/tests/python/freecad_exchange_device_import_fcstd_test.py @@ -200,6 +200,9 @@ class FakeDocument: if target in getattr(obj, "InList", []): obj.InList.remove(target) + def recompute(self): + return None + def copyObject(self, source_obj, recursive): copies = {} @@ -355,6 +358,108 @@ class FcstdDeviceImportTest(unittest.TestCase): self.assertIn(group_a, root_group.Group) self.assertIn(group_b, root_group.Group) + def test_failed_cabinet_reimport_keeps_existing_model(self): + with tempfile.TemporaryDirectory() as temp_dir: + cabinet_path = Path(temp_dir) / "cabinet.step" + cabinet_path.write_text("fake step placeholder", encoding="utf-8") + _install_fake_freecad(None) + + device_import, _ = _reload_modules() + + doc = FakeDocument("QETScene") + root_group = device_import._ensure_root_group(doc, None, "project-1") + cabinet = { + "location_id": 1, + "resolved_scene_path": str(cabinet_path), + "display_text": "Main Cabinet", + } + cabinet_group = device_import._ensure_cabinet_model_group( + doc, + root_group, + cabinet, + "project-1", + ) + old_cabinet_path = str(Path(temp_dir) / "old-cabinet.step") + cabinet_group.QetCabinetResolvedScenePath = old_cabinet_path + old_body = doc.addObject("Part::Feature", "OldCabinetBody") + cabinet_group.addObject(old_body) + + def failing_import(*_args, **_kwargs): + raise RuntimeError("simulated import failure") + + device_import._import_model_into_group = failing_import + report = { + "cabinet_imported": 0, + "cabinet_added": 0, + "cabinet_reimported": 0, + "cabinet_reused": 0, + "cabinet_skipped_missing_model": 0, + "cabinet_skipped_missing_file": 0, + "cabinet_skipped_unsupported_format": 0, + "cabinet_skipped_import_error": 0, + "warnings": [], + } + device_import._import_cabinet_model(doc, root_group, cabinet, report) + + self.assertEqual(1, report["cabinet_skipped_import_error"]) + self.assertIs(doc.getObject("OldCabinetBody"), old_body) + self.assertIn(old_body, cabinet_group.Group) + self.assertEqual(old_cabinet_path, cabinet_group.QetCabinetResolvedScenePath) + + def test_failed_device_reimport_keeps_existing_model(self): + with tempfile.TemporaryDirectory() as temp_dir: + model_path = Path(temp_dir) / "breaker.step" + model_path.write_text("fake step placeholder", encoding="utf-8") + _install_fake_freecad(None) + + device_import, _ = _reload_modules() + + doc = FakeDocument("QETScene") + root_group = device_import._ensure_root_group(doc, None, "project-1") + device_group, _created = device_import._ensure_device_group( + doc, + root_group, + "device-1", + "instance-1", + str(model_path), + "QF1", + 0, + ) + old_model_path = str(Path(temp_dir) / "old-breaker.step") + device_group.QetResolvedModelPath = old_model_path + old_body = doc.addObject("Part::Feature", "OldDeviceBody") + device_group.addObject(old_body) + + def failing_import(*_args, **_kwargs): + raise RuntimeError("simulated import failure") + + device_import._import_model_into_group = failing_import + sys.modules["DevicePreview"].find_main_exchange_document = lambda _name: doc + + report = device_import.import_devices_from_payload( + { + "project_uuid": "project-1", + "devices": [ + { + "element_uuid": "device-1", + "instance_id": "instance-1", + "display_tag": "QF1", + } + ], + "device_models": [ + { + "element_uuid": "device-1", + "resolved_model_path": str(model_path), + } + ], + } + ) + + self.assertEqual(1, report["skipped_import_error"]) + self.assertIs(doc.getObject("OldDeviceBody"), old_body) + self.assertIn(old_body, device_group.Group) + self.assertEqual(old_model_path, device_group.QetResolvedModelPath) + def test_fcstd_import_preserves_template_slots_without_live_template_lcs(self): source = FakeDocument("Source", r"D:\models\breaker.FCStd") _install_fake_freecad(source)