fix: 自动布线失败时保留旧线并清理半成品

dev
Zhaowenlong 3 weeks ago
parent 065ae182e3
commit 41eec935d5

@ -926,8 +926,8 @@ def _detach_object_from_groups(doc, obj):
pass pass
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""): def _matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
removed = 0 matches = []
for obj in list(WiringObjects.iter_routed_wire_objects(doc)): for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
if (getattr(obj, "RouteType", "") or "").strip() != "RoutedConnection": if (getattr(obj, "RouteType", "") or "").strip() != "RoutedConnection":
continue continue
@ -945,6 +945,13 @@ def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""
) )
if not same_direction and not reverse_direction: if not same_direction and not reverse_direction:
continue continue
matches.append(obj)
return matches
def _remove_routing_connection_objects(doc, objects):
removed = 0
for obj in list(objects or []):
try: try:
_detach_object_from_groups(doc, obj) _detach_object_from_groups(doc, obj)
doc.removeObject(obj.Name) doc.removeObject(obj.Name)
@ -954,6 +961,13 @@ def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""
return removed return removed
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
return _remove_routing_connection_objects(
doc,
_matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid),
)
def _find_task_by_wire_uuid(doc, wire_uuid): def _find_task_by_wire_uuid(doc, wire_uuid):
if not wire_uuid: if not wire_uuid:
return None return None
@ -1051,42 +1065,57 @@ def route_eplan_connection_between_terminals(
collisions = detect_collisions(points, obstacles, ignored_segment_indices=ignored_collision_segments) collisions = detect_collisions(points, obstacles, ignored_segment_indices=ignored_collision_segments)
status = "CollisionWarning" if collisions else "Routed" status = "CollisionWarning" if collisions else "Routed"
existing_replacements = []
if opts.get("replace_existing", True): if opts.get("replace_existing", True):
_remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid) existing_replacements = _matching_existing_routing_connections(
doc,
start_uuid,
end_uuid,
wire_uuid=wire_uuid,
)
wire_name = _unique_name(doc, _wire_object_name(start_terminal, end_terminal, wire_uuid)) wire_name = _unique_name(doc, _wire_object_name(start_terminal, end_terminal, wire_uuid))
wire = _create_wire_geometry(doc, wire_name, points) wire = None
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
WiringObjects.set_routed_wire_semantics(
wire,
project_uuid,
wire_uuid,
wire_label or wire_mark or wire_uuid,
start_uuid,
end_uuid,
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
route_type="RoutedConnection",
route_status=status,
route_mode="EplanRoute",
net_uuid=net_uuid,
group_uuid=group_uuid,
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
routed_group.addObject(wire)
try: try:
routed_group.ViewObject.Visibility = True wire = _create_wire_geometry(doc, wire_name, points)
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
WiringObjects.set_routed_wire_semantics(
wire,
project_uuid,
wire_uuid,
wire_label or wire_mark or wire_uuid,
start_uuid,
end_uuid,
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
route_type="RoutedConnection",
route_status=status,
route_mode="EplanRoute",
net_uuid=net_uuid,
group_uuid=group_uuid,
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
routed_group.addObject(wire)
try:
routed_group.ViewObject.Visibility = True
except Exception:
pass
_style_wire(wire, collision_count=len(collisions))
task = _find_task_by_wire_uuid(doc, wire_uuid)
_set_task_status(task, status)
except Exception: except Exception:
pass if wire is not None:
_style_wire(wire, collision_count=len(collisions)) _remove_routing_connection_objects(doc, [wire])
raise
task = _find_task_by_wire_uuid(doc, wire_uuid) if existing_replacements:
_set_task_status(task, status) _remove_routing_connection_objects(doc, existing_replacements)
try: try:
doc.recompute() doc.recompute()

@ -367,6 +367,7 @@ class AutoRoutingTest(unittest.TestCase):
terminal_objects, wiring_objects, routing_network, auto_routing = _reload_modules() terminal_objects, wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"] app = sys.modules["FreeCAD"]
doc = FakeDocument() doc = FakeDocument()
app.ActiveDocument = doc
terminal_objects.ensure_root_group(doc, "project-1") terminal_objects.ensure_root_group(doc, "project-1")
start = _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0)) start = _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
end = _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0)) end = _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0))
@ -396,6 +397,113 @@ class AutoRoutingTest(unittest.TestCase):
self.assertEqual([first], routed_wires) self.assertEqual([first], routed_wires)
self.assertIsNotNone(doc.getObject(first.Name)) self.assertIsNotNone(doc.getObject(first.Name))
def test_eplan_connection_route_keeps_existing_wire_when_new_geometry_creation_fails(self):
_install_fake_freecad()
terminal_objects, wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
start = _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
end = _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
first = auto_routing.route_eplan_connection_between_terminals(
doc,
start,
end,
wire_uuid="wire-1",
)["wire"]
original_create_wire_geometry = auto_routing._create_wire_geometry
def failing_create_wire_geometry(_doc, _name, _points):
raise RuntimeError("create failed")
auto_routing._create_wire_geometry = failing_create_wire_geometry
try:
with self.assertRaises(RuntimeError):
auto_routing.route_eplan_connection_between_terminals(
doc,
start,
end,
wire_uuid="wire-1",
)
finally:
auto_routing._create_wire_geometry = original_create_wire_geometry
routed_wires = list(wiring_objects.iter_routed_wire_objects(doc))
self.assertEqual([first], routed_wires)
self.assertIsNotNone(doc.getObject(first.Name))
def test_eplan_connection_route_cleans_up_half_created_wire_when_draft_fallback_fails(self):
_install_fake_freecad()
terminal_objects, wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
start = _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
end = _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
first = auto_routing.route_eplan_connection_between_terminals(
doc,
start,
end,
wire_uuid="wire-1",
)["wire"]
part_module = sys.modules["Part"]
draft_module = sys.modules.get("Draft")
if draft_module is None:
draft_module = types.ModuleType("Draft")
sys.modules["Draft"] = draft_module
original_make_polygon = part_module.makePolygon
original_make_wire = getattr(draft_module, "make_wire", None)
original_add_object = doc.addObject
def failing_make_polygon(*args, **kwargs):
raise RuntimeError("part unavailable")
def half_created_make_wire(points, closed=False, placement=None, face=None, support=None, bs2wire=False):
obj = doc.addObject("Part::FeaturePython", "Wire")
obj.Points = list(points)
raise RuntimeError("draft failed")
def failing_add_object(type_name, name):
if type_name == "App::FeaturePython":
raise RuntimeError("fallback failed")
return original_add_object(type_name, name)
part_module.makePolygon = failing_make_polygon
draft_module.make_wire = half_created_make_wire
doc.addObject = failing_add_object
try:
with self.assertRaises(RuntimeError):
auto_routing.route_eplan_connection_between_terminals(
doc,
start,
end,
wire_uuid="wire-1",
)
finally:
part_module.makePolygon = original_make_polygon
if original_make_wire is None:
delattr(draft_module, "make_wire")
else:
draft_module.make_wire = original_make_wire
doc.addObject = original_add_object
routed_wires = list(wiring_objects.iter_routed_wire_objects(doc))
self.assertEqual([first], routed_wires)
self.assertEqual(0, len([obj for obj in doc.Objects if obj.Name == "Wire"]))
def test_route_carrier_styles_make_generated_objects_distinguishable(self): def test_route_carrier_styles_make_generated_objects_distinguishable(self):
_install_fake_freecad() _install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, _auto_routing = _reload_modules() terminal_objects, _wiring_objects, routing_network, _auto_routing = _reload_modules()

Loading…
Cancel
Save