feat: 保留自动布线端点元数据

dev
Zhaowenlong 3 weeks ago
parent b5ee5c9c01
commit 9422fcc149

@ -553,9 +553,46 @@ def _set_string(obj, name, value, description="Routing connection property"):
TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value)
def _route_payload(route_data, collisions, wire_style_id=""):
def _clean_endpoint_metadata(endpoint_metadata):
if not isinstance(endpoint_metadata, dict):
return {}
allowed = (
"start_element_uuid",
"start_terminal_display",
"start_device_label",
"end_element_uuid",
"end_terminal_display",
"end_device_label",
"endpoint_label",
)
cleaned = {}
for key in allowed:
value = str(endpoint_metadata.get(key, "") or "").strip()
if value:
cleaned[key] = value
return cleaned
def _set_endpoint_metadata(wire, endpoint_metadata):
metadata = _clean_endpoint_metadata(endpoint_metadata)
property_names = {
"start_element_uuid": "QetStartElementUuid",
"start_terminal_display": "QetStartTerminalDisplay",
"start_device_label": "QetStartDeviceLabel",
"end_element_uuid": "QetEndElementUuid",
"end_terminal_display": "QetEndTerminalDisplay",
"end_device_label": "QetEndDeviceLabel",
"endpoint_label": "QetEndpointLabel",
}
for key, prop_name in property_names.items():
if key in metadata:
_set_string(wire, prop_name, metadata[key], "QET routed wire endpoint metadata")
return metadata
def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=None):
points = route_data.get("points", [])
return {
payload = {
"algorithm": route_data.get("algorithm", ""),
"length_mm": _route_length(points),
"wire_style_id": str(wire_style_id or "").strip(),
@ -566,10 +603,15 @@ def _route_payload(route_data, collisions, wire_style_id=""):
"network": route_data.get("network", {}),
"route_track": route_data.get("route_track", {}),
}
metadata = _clean_endpoint_metadata(endpoint_metadata)
if metadata:
payload["endpoint_metadata"] = metadata
return payload
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=""):
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id="", endpoint_metadata=None):
length_mm = _route_length(route_data.get("points", []))
cleaned_endpoint_metadata = _set_endpoint_metadata(wire, endpoint_metadata)
_set_string(
wire,
"QetRouteAlgorithm",
@ -591,7 +633,15 @@ def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id
_set_string(
wire,
"QetRouteDiagnosticsJson",
json.dumps(_route_payload(route_data, collisions, wire_style_id=wire_style_id), ensure_ascii=False),
json.dumps(
_route_payload(
route_data,
collisions,
wire_style_id=wire_style_id,
endpoint_metadata=cleaned_endpoint_metadata,
),
ensure_ascii=False,
),
"Routing connection diagnostics",
)
if route_data.get("network"):
@ -1024,6 +1074,7 @@ def route_eplan_connection_between_terminals(
wire_mark="",
wire_mark_is_manual=False,
wire_style_id="",
endpoint_metadata=None,
):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
@ -1096,7 +1147,13 @@ def route_eplan_connection_between_terminals(
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
_set_routing_connection_metadata(
wire,
route_data,
collisions,
wire_style_id=effective_wire_style_id,
endpoint_metadata=endpoint_metadata,
)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
@ -1282,6 +1339,15 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
lane_key = _route_lane_key(start_uuid, end_uuid)
route_lane_index = lane_indexes_by_pair.get(lane_key, 0)
try:
endpoint_metadata = {
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
}
result = route_eplan_connection_between_terminals(
doc,
start_terminal,
@ -1295,6 +1361,7 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
endpoint_metadata=endpoint_metadata,
)
except Exception as exc:
error_text = str(exc)
@ -1338,7 +1405,14 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
"wire_style_id": _wire_item_value(item, "wire_style_id"),
"start_terminal_uuid": start_uuid,
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
"start_device_label": _wire_item_value(item, "start_device_label"),
"end_terminal_uuid": end_uuid,
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
"end_device_label": _wire_item_value(item, "end_device_label"),
"endpoint_label": _wire_item_value(item, "endpoint_label"),
"algorithm": result["algorithm"],
"route_status": result["route_status"],
"length_mm": route_length,
@ -1561,10 +1635,13 @@ def _wire_tasks_payload(doc):
"start_instance_id": (getattr(task, "QetStartInstanceId", "") or "").strip(),
"start_terminal_uuid": (getattr(task, "QetStartTerminalUuid", "") or "").strip(),
"start_terminal_display": (getattr(task, "QetStartTerminalDisplay", "") or "").strip(),
"start_device_label": (getattr(task, "QetStartDeviceLabel", "") or "").strip(),
"end_element_uuid": (getattr(task, "QetEndElementUuid", "") or "").strip(),
"end_instance_id": (getattr(task, "QetEndInstanceId", "") or "").strip(),
"end_terminal_uuid": (getattr(task, "QetEndTerminalUuid", "") or "").strip(),
"end_terminal_display": (getattr(task, "QetEndTerminalDisplay", "") or "").strip(),
"end_device_label": (getattr(task, "QetEndDeviceLabel", "") or "").strip(),
"endpoint_label": (getattr(task, "QetEndpointLabel", "") or "").strip(),
}
)
return payload

@ -1572,6 +1572,87 @@ class AutoRoutingTest(unittest.TestCase):
self.assertEqual(1, route["network"]["carriers"])
self.assertEqual("WireDuct", route["route_track"]["segments"][0]["carrier"]["kind"])
def test_route_eplan_connections_preserves_endpoint_metadata_on_routed_wire(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
_terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
_terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "wire-1",
"start_element_uuid": "device-a",
"start_terminal_uuid": "terminal-start",
"start_terminal_display": "A1",
"end_element_uuid": "device-b",
"end_terminal_uuid": "terminal-end",
"end_terminal_display": "B1",
}
],
}
report = auto_routing.route_eplan_connections_from_payload(doc, payload)
routed_group = doc.getObject("QETWiring_04_Routed")
wire = routed_group.Group[0]
diagnostics = json.loads(wire.QetRouteDiagnosticsJson)
self.assertEqual("device-a", wire.QetStartElementUuid)
self.assertEqual("A1", wire.QetStartTerminalDisplay)
self.assertEqual("device-b", wire.QetEndElementUuid)
self.assertEqual("B1", wire.QetEndTerminalDisplay)
self.assertEqual("device-a", report["routes"][0]["start_element_uuid"])
self.assertEqual("B1", report["routes"][0]["end_terminal_display"])
self.assertEqual("A1", diagnostics["endpoint_metadata"]["start_terminal_display"])
def test_route_eplan_connection_tasks_preserve_task_endpoint_labels_on_routed_wire(self):
_install_fake_freecad()
terminal_objects, wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
_terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
_terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
task = wiring_objects.create_wire_task(
doc,
"project-1",
"wire-1",
"N4111",
"terminal-start",
"terminal-end",
"instance-a",
"instance-b",
)
terminal_objects.ensure_string_property(task, "QetStartDeviceLabel", "QET Wiring", "", "QF1")
terminal_objects.ensure_string_property(task, "QetEndDeviceLabel", "QET Wiring", "", "X1")
terminal_objects.ensure_string_property(task, "QetEndpointLabel", "QET Wiring", "", "QF1:A1 -> X1:B1")
report = auto_routing.route_eplan_connection_tasks(doc)
routed_group = doc.getObject("QETWiring_04_Routed")
wire = routed_group.Group[0]
diagnostics = json.loads(wire.QetRouteDiagnosticsJson)
self.assertEqual("QF1", wire.QetStartDeviceLabel)
self.assertEqual("X1", wire.QetEndDeviceLabel)
self.assertEqual("QF1:A1 -> X1:B1", wire.QetEndpointLabel)
self.assertEqual("QF1:A1 -> X1:B1", report["routes"][0]["endpoint_label"])
self.assertEqual("QF1", diagnostics["endpoint_metadata"]["start_device_label"])
def test_route_eplan_connections_records_wire_identity_for_errors(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()

Loading…
Cancel
Save