Merge remote-tracking branch 'origin/dev' into dev

dev
jiangyini 4 weeks ago
commit b2e9d5b442

@ -102,6 +102,14 @@ def _point_payload(point):
}
def _route_length(points):
total = 0.0
normalized = [_vector(point) for point in points or []]
for index in range(len(normalized) - 1):
total += _distance(normalized[index], normalized[index + 1])
return total
def _is_finite_point(point):
try:
return all(
@ -514,26 +522,43 @@ def _set_string(obj, name, value, description="Auto-routing property"):
TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value)
def _route_payload(route_data, collisions):
def _route_payload(route_data, collisions, wire_style_id=""):
points = route_data.get("points", [])
return {
"algorithm": route_data.get("algorithm", ""),
"points": [_point_payload(point) for point in route_data.get("points", [])],
"length_mm": _route_length(points),
"wire_style_id": str(wire_style_id or "").strip(),
"points": [_point_payload(point) for point in points],
"collision_count": len(collisions),
"collisions": collisions,
"network": route_data.get("network", {}),
}
def _set_auto_metadata(wire, route_data, collisions):
def _set_auto_metadata(wire, route_data, collisions, wire_style_id=""):
length_mm = _route_length(route_data.get("points", []))
_set_string(
wire,
"QetAutoRouteAlgorithm",
route_data.get("algorithm", ""),
"Auto-routing algorithm used for this wire",
)
_set_string(
wire,
"QetAutoRouteLengthMm",
"{0:.3f}".format(length_mm),
"Auto route length in millimeters",
)
_set_string(
wire,
"QetWireStyleId",
str(wire_style_id or "").strip(),
"QET wire style ID",
)
_set_string(
wire,
"QetAutoRouteDiagnosticsJson",
json.dumps(_route_payload(route_data, collisions), ensure_ascii=False),
json.dumps(_route_payload(route_data, collisions, wire_style_id=wire_style_id), ensure_ascii=False),
"Auto-routing diagnostics",
)
if route_data.get("network"):
@ -870,6 +895,7 @@ def route_between_terminals(
group_uuid="",
wire_mark="",
wire_mark_is_manual=False,
wire_style_id="",
):
if doc is None:
raise AutoRoutingError("No FreeCAD document is available.")
@ -881,6 +907,7 @@ def route_between_terminals(
raise AutoRoutingError("Start and end terminal must be different.")
opts = _merged_options(options)
effective_wire_style_id = str(wire_style_id or opts.get("wire_style_id", "") or "").strip()
start_uuid = (getattr(start_terminal, "QetTerminalUuid", "") or "").strip()
end_uuid = (getattr(end_terminal, "QetTerminalUuid", "") or "").strip()
project_uuid = _project_uuid(doc, start_terminal, end_terminal)
@ -937,7 +964,7 @@ def route_between_terminals(
wire_mark=wire_mark,
wire_mark_is_manual=wire_mark_is_manual,
)
_set_auto_metadata(wire, route_data, collisions)
_set_auto_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
if wire not in getattr(routed_group, "Group", []):
@ -1094,6 +1121,7 @@ def route_all_from_payload(doc, payload, options=None):
group_uuid=_wire_item_value(item, "group_uuid"),
wire_mark=_wire_item_value(item, "wire_mark"),
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
wire_style_id=_wire_item_value(item, "wire_style_id"),
)
except Exception as exc:
report["errors"].append(str(exc))
@ -1110,6 +1138,7 @@ def route_all_from_payload(doc, payload, options=None):
}
)
report["missing_endpoint_uuids"] = sorted(missing_endpoint_uuids)
_write_auto_route_batch_diagnostic(doc, report)
return report
@ -1126,6 +1155,9 @@ def format_route_all_report(report):
prepared_layout.get("surface_carriers", 0),
prepared_layout.get("terminal_access_carriers", 0),
)
errors = report.get("errors", []) or []
if errors:
message += "\n首个错误:{0}".format(str(errors[0]))
auto_bound = report.get("auto_bound_terminals", 0)
auto_created = report.get("auto_created_terminals", 0)
if auto_bound or auto_created:
@ -1143,15 +1175,62 @@ def format_route_all_report(report):
)
if report.get("local_terminals", 0) > 0:
message += " 请先从 QET 重新导入/更新工程端子,使端子 UUID 不再是 local:...。"
sample = (report.get("missing_endpoint_samples") or [None])[0]
if sample:
message += "\n缺失示例:{0} -> {1}".format(
sample.get("start_terminal_uuid", ""),
sample.get("end_terminal_uuid", ""),
)
sample = (report.get("missing_endpoint_samples") or [None])[0]
if sample:
message += "\n缺失示例:{0} -> {1}".format(
sample.get("start_terminal_uuid", ""),
sample.get("end_terminal_uuid", ""),
)
return message
def _clear_auto_route_batch_diagnostics(doc):
group = WiringObjects.ensure_diagnostic_group(doc, _project_uuid(doc))
removed = 0
for obj in list(getattr(group, "Group", []) or []):
if (getattr(obj, "QetDiagnosticKind", "") or "").strip() != "AutoRouteBatch":
continue
try:
group.removeObject(obj)
except Exception:
try:
group.Group = [
candidate
for candidate in list(getattr(group, "Group", []) or [])
if candidate is not obj
]
except Exception:
pass
try:
if doc.getObject(getattr(obj, "Name", "")) is not None:
doc.removeObject(obj.Name)
removed += 1
except Exception:
pass
return removed
def _write_auto_route_batch_diagnostic(doc, report):
if doc is None or not isinstance(report, dict):
return None
if not report.get("errors") and not report.get("missing_endpoint_uuids") and report.get("collision_warnings", 0) <= 0:
return None
project_uuid = _project_uuid(doc)
group = WiringObjects.ensure_diagnostic_group(doc, project_uuid)
_clear_auto_route_batch_diagnostics(doc)
diagnostic = doc.addObject("App::DocumentObjectGroup", _unique_name(doc, "QETAutoRouteDiagnostic"))
diagnostic.Label = "QET Auto Route Diagnostic"
_set_string(diagnostic, "QetDiagnosticKind", "AutoRouteBatch", "QET diagnostic kind")
_set_string(
diagnostic,
"QetDiagnosticJson",
json.dumps(report, ensure_ascii=False),
"QET auto-routing batch diagnostic payload",
)
group.addObject(diagnostic)
return diagnostic
def _iter_wire_tasks(doc):
try:
task_group = doc.getObject("QETWiring_01_Tasks")
@ -1175,6 +1254,7 @@ def _wire_tasks_payload(doc):
"wire_label": (getattr(task, "QetWireLabel", "") or "").strip(),
"wire_mark": (getattr(task, "QetWireMark", "") or "").strip(),
"wire_mark_is_manual": bool(getattr(task, "QetWireMarkIsManual", False)),
"wire_style_id": (getattr(task, "QetWireStyleId", "") or "").strip(),
"net_uuid": (getattr(task, "QetNetUuid", "") or "").strip(),
"group_uuid": (getattr(task, "QetGroupUuid", "") or "").strip(),
"start_element_uuid": (getattr(task, "QetStartElementUuid", "") or "").strip(),

@ -25,6 +25,16 @@ def _bool_value(item, field_name):
return bool(item.get(field_name, False))
def _int_text_value(item, field_name):
value = item.get(field_name, "")
if value is None:
return ""
try:
return str(int(value)).strip()
except Exception:
return str(value).strip()
def _conductor_uuids(item):
values = item.get("conductor_uuids", [])
if not isinstance(values, list):
@ -92,6 +102,7 @@ def _normalize_wire_entry(item, index, device_labels=None):
"group_uuid": _string_value(item, "group_uuid"),
"wire_mark": wire_mark,
"wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"),
"wire_style_id": _int_text_value(item, "wire_style_id"),
"start_element_uuid": start_element_uuid,
"start_terminal_uuid": start_terminal_uuid,
"start_instance_id": _string_value(item, "start_instance_id"),
@ -179,6 +190,7 @@ def _ensure_string_property(obj, prop_name, value, description="QET wire task pr
def _set_task_extra_properties(task, entry):
_ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"])
_ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"])
_ensure_string_property(task, "QetWireStyleId", entry["wire_style_id"])
_ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"])
_ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"])
_ensure_string_property(task, "QetStartDeviceLabel", entry["start_device_label"])

@ -87,6 +87,16 @@ def main():
}
doc.saveAs(OUT_FCSTD)
App.closeDocument(doc.Name)
reopened = App.openDocument(OUT_FCSTD)
routed_group = reopened.getObject("QETWiring_04_Routed")
reopened_wires = list(getattr(routed_group, "Group", []) or []) if routed_group else []
payload["reopened_routed_wire_count"] = len(reopened_wires)
payload["reopened_has_auto_route"] = any(
(getattr(wire, "RouteType", "") or "").strip() == "AutoSuggested"
for wire in reopened_wires
)
with open(OUT_JSON, "w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
print(json.dumps(payload, ensure_ascii=False, indent=2))

@ -1,4 +1,5 @@
import importlib
import json
import sys
import types
import unittest
@ -270,6 +271,37 @@ class AutoRoutingTest(unittest.TestCase):
self.assertEqual("Routed", result["route_status"])
self.assertTrue(any(point.y == 30.0 for point in result["points"]))
def test_auto_route_stores_length_and_wire_style_diagnostics(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
start = _terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
end = _terminal(doc, terminal_objects, "TerminalEnd", "terminal-end", app.Vector(100, 0, 0))
routing_network.create_route_carrier(
doc,
[app.Vector(0, 0, 20), app.Vector(100, 0, 20)],
project_uuid="project-1",
kind="WireDuct",
)
result = auto_routing.route_between_terminals(
doc,
start,
end,
wire_uuid="wire-1",
wire_label="N4111",
options={"wire_style_id": "42"},
)
wire = result["wire"]
payload = json.loads(wire.QetAutoRouteDiagnosticsJson)
self.assertGreater(float(wire.QetAutoRouteLengthMm), 0.0)
self.assertEqual("42", wire.QetWireStyleId)
self.assertEqual("42", payload["wire_style_id"])
self.assertGreater(payload["length_mm"], 0.0)
def test_route_carrier_styles_make_generated_objects_distinguishable(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, routing_network, _auto_routing = _reload_modules()
@ -897,6 +929,34 @@ class AutoRoutingTest(unittest.TestCase):
self.assertTrue(report["missing_endpoint_samples"][0]["start_found"])
self.assertFalse(report["missing_endpoint_samples"][0]["end_found"])
def test_route_all_writes_diagnostic_object_for_missing_terminal(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, _routing_network, auto_routing = _reload_modules()
app = sys.modules["FreeCAD"]
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
_terminal(doc, terminal_objects, "TerminalStart", "terminal-start", app.Vector(0, 0, 0))
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "wire-1",
"start_terminal_uuid": "terminal-start",
"end_terminal_uuid": "terminal-missing",
}
],
}
report = auto_routing.route_all_from_payload(doc, payload)
diagnostic_group = doc.getObject("QETWiring_05_Diagnostics")
self.assertEqual(1, report["skipped_missing_terminal"])
self.assertIsNotNone(diagnostic_group)
self.assertEqual(1, len(diagnostic_group.Group))
diagnostic = diagnostic_group.Group[0]
self.assertEqual("AutoRouteBatch", diagnostic.QetDiagnosticKind)
self.assertIn("terminal-missing", diagnostic.QetDiagnosticJson)
def test_route_all_report_calls_out_local_unbound_terminals(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, _routing_network, auto_routing = _reload_modules()
@ -929,6 +989,35 @@ class AutoRoutingTest(unittest.TestCase):
self.assertIn("端子匹配失败", message)
self.assertIn("local:", message)
def test_route_all_report_includes_network_and_first_error(self):
_install_fake_freecad()
_terminal_objects, _wiring_objects, _routing_network, auto_routing = _reload_modules()
report = {
"total_wires": 2,
"routed": 1,
"collision_warnings": 1,
"skipped_missing_terminal": 1,
"prepared_layout": {
"wire_duct_carriers": 2,
"surface_carriers": 4,
"terminal_access_carriers": 6,
},
"missing_endpoint_samples": [
{
"start_terminal_uuid": "terminal-a",
"end_terminal_uuid": "terminal-b",
}
],
"errors": ["没有可用的线槽/路由路径网络"],
}
message = auto_routing.format_route_all_report(report)
self.assertIn("routed=1", message)
self.assertIn("线槽路径 2 条", message)
self.assertIn("首个错误:没有可用的线槽/路由路径网络", message)
self.assertIn("缺失示例terminal-a -> terminal-b", message)
def test_bind_wire_task_terminals_from_payload_does_not_create_wires(self):
_install_fake_freecad()
terminal_objects, wiring_objects, _routing_network, auto_routing = _reload_modules()

@ -106,6 +106,44 @@ def _reload_modules():
class WiringImportTest(unittest.TestCase):
def test_import_wire_tasks_preserves_auto_routing_metadata(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, wiring_import = _reload_modules()
doc = FakeDocument()
terminal_objects.ensure_root_group(doc, "project-1")
payload = {
"project_uuid": "project-1",
"wires": [
{
"wire_id": "wire-1",
"wire_label": "W1",
"wire_mark": "N4111",
"wire_mark_is_manual": True,
"wire_style_id": 42,
"net_uuid": "net-1",
"group_uuid": "group-1",
"start_element_uuid": "device-a",
"start_instance_id": "instance-a",
"start_terminal_uuid": "terminal-a",
"start_terminal_display": "A1",
"end_element_uuid": "device-b",
"end_instance_id": "instance-b",
"end_terminal_uuid": "terminal-b",
"end_terminal_display": "B1",
}
],
}
report = wiring_import.import_wire_tasks_from_payload(payload, doc)
task = doc.getObject("QETWiring_01_Tasks").Group[0]
self.assertEqual(1, report["imported_tasks"])
self.assertEqual("42", task.QetWireStyleId)
self.assertEqual("instance-a", task.QetStartInstanceId)
self.assertEqual("instance-b", task.QetEndInstanceId)
self.assertEqual("A1", task.QetStartTerminalDisplay)
self.assertEqual("B1", task.QetEndTerminalDisplay)
def test_import_wire_tasks_creates_and_updates_qet_tasks(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, wiring_import = _reload_modules()

Loading…
Cancel
Save