feat: 按共享路由段错开自动布线

dev
Zhaowenlong 3 weeks ago
parent 967ad4b7f1
commit a225104a51

@ -1296,6 +1296,32 @@ def _route_lane_key(start_uuid, end_uuid):
return tuple(endpoints)
def _route_segment_key(segment):
if not isinstance(segment, dict):
return None
carrier = segment.get("carrier") or {}
carrier_name = str(carrier.get("name", "") or "").strip()
from_key = tuple(segment.get("from_key", []) or [])
to_key = tuple(segment.get("to_key", []) or [])
if not from_key or not to_key:
return None
return (
carrier_name,
tuple(sorted((from_key, to_key))),
)
def _route_segment_keys(result):
route_track = result.get("route_track", {}) if isinstance(result, dict) else {}
segments = route_track.get("segments", []) if isinstance(route_track, dict) else []
keys = []
for segment in segments or []:
key = _route_segment_key(segment)
if key is not None:
keys.append(key)
return keys
def bind_wire_task_terminals_from_payload(doc, payload):
"""Bind local template terminals to QET terminal UUIDs without creating wires."""
if doc is None:
@ -1376,11 +1402,29 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
report["prepared_layout"] = prepared_layout
missing_endpoint_uuids = set()
lane_indexes_by_pair = {}
lane_indexes_by_segment = {}
def add_status(status):
key = str(status or "").strip() or "Unknown"
report["route_status_counts"][key] = report["route_status_counts"].get(key, 0) + 1
def create_route(route_lane_index, item, start_terminal, end_terminal, endpoint_metadata):
return route_eplan_connection_between_terminals(
doc,
start_terminal,
end_terminal,
route_index=route_lane_index,
options=options,
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
net_uuid=_wire_item_value(item, "net_uuid"),
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
endpoint_metadata=endpoint_metadata,
)
for item in wires:
if not isinstance(item, dict):
report["skipped_invalid"] += 1
@ -1425,21 +1469,33 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
}
result = route_eplan_connection_between_terminals(
doc,
result = create_route(
route_lane_index,
item,
start_terminal,
end_terminal,
route_index=route_lane_index,
options=options,
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
net_uuid=_wire_item_value(item, "net_uuid"),
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
endpoint_metadata=endpoint_metadata,
endpoint_metadata,
)
route_segment_keys = _route_segment_keys(result)
shared_lane_index = max(
[lane_indexes_by_segment.get(key, 0) for key in route_segment_keys] or [0]
)
final_lane_index = max(route_lane_index, shared_lane_index)
if final_lane_index != route_lane_index:
initial_wire = result.get("wire") if isinstance(result, dict) else None
try:
result = create_route(
final_lane_index,
item,
start_terminal,
end_terminal,
endpoint_metadata,
)
except Exception:
if initial_wire is not None:
_remove_routing_connection_objects(doc, [initial_wire])
raise
route_segment_keys = _route_segment_keys(result)
except Exception as exc:
error_text = str(exc)
report["errors"].append(error_text)
@ -1462,7 +1518,15 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
}
)
continue
lane_indexes_by_pair[lane_key] = route_lane_index + 1
lane_indexes_by_pair[lane_key] = max(
lane_indexes_by_pair.get(lane_key, 0),
int(result.get("lane", {}).get("index", 0) or 0) + 1,
)
for segment_key in route_segment_keys:
lane_indexes_by_segment[segment_key] = max(
lane_indexes_by_segment.get(segment_key, 0),
int(result.get("lane", {}).get("index", 0) or 0) + 1,
)
if result["route_status"] == "CollisionWarning":
report["collision_warnings"] += 1
add_status(result["route_status"])

@ -1870,6 +1870,54 @@ class AutoRoutingTest(unittest.TestCase):
self.assertEqual(0, report["routes"][1]["lane"]["index"])
self.assertEqual(1, report["routes"][2]["lane"]["index"])
def test_route_eplan_connections_lane_index_increments_for_shared_route_segments(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
_terminal(doc, terminal_objects, "TerminalStartA", "terminal-start-a", app.Vector(0, 0, 0))
_terminal(doc, terminal_objects, "TerminalEndA", "terminal-end-a", app.Vector(100, 0, 0))
_terminal(doc, terminal_objects, "TerminalStartB", "terminal-start-b", app.Vector(0, 0, 0))
_terminal(doc, terminal_objects, "TerminalEndB", "terminal-end-b", app.Vector(100, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "wire-a",
"start_terminal_uuid": "terminal-start-a",
"end_terminal_uuid": "terminal-end-a",
},
{
"wire_id": "wire-b",
"start_terminal_uuid": "terminal-start-b",
"end_terminal_uuid": "terminal-end-b",
},
],
}
report = auto_routing.route_eplan_connections_from_payload(
doc,
payload,
options={"lane_spacing": 10.0, "lane_axis": "y"},
)
self.assertEqual(0, report["routes"][0]["lane"]["index"])
self.assertEqual(1, report["routes"][1]["lane"]["index"])
routed_group = doc.getObject("QETWiring_04_Routed")
second_wire = [
wire
for wire in list(getattr(routed_group, "Group", []) or [])
if getattr(wire, "QetWireUuid", "") == "wire-b"
][0]
self.assertTrue(any(abs(point.y - 10.0) <= 0.001 for point in second_wire.Points[1:-1]))
def test_route_eplan_connections_report_includes_collision_samples(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()

Loading…
Cancel
Save