|
|
|
|
@ -23,9 +23,8 @@ import WiringObjects
|
|
|
|
|
DEFAULT_OPTIONS = {
|
|
|
|
|
# 端子出来先走一小段,避免导线贴着设备外壳起步。
|
|
|
|
|
"terminal_exit_length": 20.0,
|
|
|
|
|
"lane_axis": "auto",
|
|
|
|
|
"lane_axis": "y",
|
|
|
|
|
"lane_spacing": 10.0,
|
|
|
|
|
"segment_reuse_penalty": 200.0,
|
|
|
|
|
# 线槽网络相关参数。
|
|
|
|
|
"use_routing_network": True,
|
|
|
|
|
"network_entry_max_distance": 1000.0,
|
|
|
|
|
@ -140,30 +139,9 @@ def _with_axis(point, axis, value):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _auto_lane_axis(route_points):
|
|
|
|
|
points = [_vector(point) for point in route_points or []]
|
|
|
|
|
if len(points) < 2:
|
|
|
|
|
return "y"
|
|
|
|
|
extents = {"x": 0.0, "y": 0.0, "z": 0.0}
|
|
|
|
|
for index in range(len(points) - 1):
|
|
|
|
|
start = points[index]
|
|
|
|
|
end = points[index + 1]
|
|
|
|
|
extents["x"] += abs(float(end.x) - float(start.x))
|
|
|
|
|
extents["y"] += abs(float(end.y) - float(start.y))
|
|
|
|
|
extents["z"] += abs(float(end.z) - float(start.z))
|
|
|
|
|
dominant_axis = max(extents, key=lambda axis: extents[axis])
|
|
|
|
|
if dominant_axis == "y":
|
|
|
|
|
return "x"
|
|
|
|
|
if dominant_axis == "x":
|
|
|
|
|
return "y"
|
|
|
|
|
return "x"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _lane_payload(route_index, options, route_points=None):
|
|
|
|
|
def _lane_payload(route_index, options):
|
|
|
|
|
opts = options or {}
|
|
|
|
|
lane_axis = (opts.get("lane_axis") or "y").lower()
|
|
|
|
|
if lane_axis == "auto":
|
|
|
|
|
lane_axis = _auto_lane_axis(route_points)
|
|
|
|
|
if lane_axis not in {"x", "y", "z"}:
|
|
|
|
|
lane_axis = "y"
|
|
|
|
|
lane_index = max(int(route_index or 0), 0)
|
|
|
|
|
@ -228,79 +206,6 @@ def _append_orthogonal(points, target_point, preferred_axis=None):
|
|
|
|
|
_append_unique(points, point)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _collinear_points(first, middle, last):
|
|
|
|
|
ax = float(middle.x) - float(first.x)
|
|
|
|
|
ay = float(middle.y) - float(first.y)
|
|
|
|
|
az = float(middle.z) - float(first.z)
|
|
|
|
|
bx = float(last.x) - float(middle.x)
|
|
|
|
|
by = float(last.y) - float(middle.y)
|
|
|
|
|
bz = float(last.z) - float(middle.z)
|
|
|
|
|
cross_x = ay * bz - az * by
|
|
|
|
|
cross_y = az * bx - ax * bz
|
|
|
|
|
cross_z = ax * by - ay * bx
|
|
|
|
|
dot = ax * bx + ay * by + az * bz
|
|
|
|
|
return (
|
|
|
|
|
abs(cross_x) <= 0.000001
|
|
|
|
|
and abs(cross_y) <= 0.000001
|
|
|
|
|
and abs(cross_z) <= 0.000001
|
|
|
|
|
and dot >= -0.000001
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _route_point_key(point, tolerance=0.001):
|
|
|
|
|
scale = 1.0 / float(tolerance or 0.001)
|
|
|
|
|
return (
|
|
|
|
|
int(round(float(point.x) * scale)),
|
|
|
|
|
int(round(float(point.y) * scale)),
|
|
|
|
|
int(round(float(point.z) * scale)),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _simplify_collinear_points(points, preserved_point_keys=None):
|
|
|
|
|
normalized = [_vector(point) for point in points or [] if _is_finite_point(_vector(point))]
|
|
|
|
|
if len(normalized) <= 2:
|
|
|
|
|
return normalized
|
|
|
|
|
preserved_indices = {0, 1, len(normalized) - 2, len(normalized) - 1}
|
|
|
|
|
preserved_point_keys = set(preserved_point_keys or [])
|
|
|
|
|
simplified = [normalized[0]]
|
|
|
|
|
simplified_indices = [0]
|
|
|
|
|
for index, point in enumerate(normalized[1:], start=1):
|
|
|
|
|
_append_unique(simplified, point)
|
|
|
|
|
if len(simplified_indices) < len(simplified):
|
|
|
|
|
simplified_indices.append(index)
|
|
|
|
|
while len(simplified) >= 3 and _collinear_points(
|
|
|
|
|
simplified[-3],
|
|
|
|
|
simplified[-2],
|
|
|
|
|
simplified[-1],
|
|
|
|
|
):
|
|
|
|
|
if (
|
|
|
|
|
simplified_indices[-2] in preserved_indices
|
|
|
|
|
or _route_point_key(simplified[-2]) in preserved_point_keys
|
|
|
|
|
):
|
|
|
|
|
break
|
|
|
|
|
simplified.pop(-2)
|
|
|
|
|
simplified_indices.pop(-2)
|
|
|
|
|
return simplified
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _important_route_node_keys(network, path_keys, path_result):
|
|
|
|
|
edges = network.get("edges", {}) if isinstance(network, dict) else {}
|
|
|
|
|
important = {
|
|
|
|
|
key
|
|
|
|
|
for key in path_keys or []
|
|
|
|
|
if len(edges.get(key, []) or []) != 2
|
|
|
|
|
}
|
|
|
|
|
segments = path_result.get("segments", []) if isinstance(path_result, dict) else []
|
|
|
|
|
for index in range(1, len(path_keys or []) - 1):
|
|
|
|
|
previous_segment = segments[index - 1] if index - 1 < len(segments) else {}
|
|
|
|
|
next_segment = segments[index] if index < len(segments) else {}
|
|
|
|
|
previous_carrier = (previous_segment.get("carrier") or {}).get("name", "")
|
|
|
|
|
next_carrier = (next_segment.get("carrier") or {}).get("name", "")
|
|
|
|
|
if previous_carrier != next_carrier:
|
|
|
|
|
important.add(path_keys[index])
|
|
|
|
|
return important
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _offset(point, direction, distance):
|
|
|
|
|
return App.Vector(
|
|
|
|
|
float(point.x) + float(direction.x) * float(distance),
|
|
|
|
|
@ -648,46 +553,9 @@ def _set_string(obj, name, value, description="Routing connection property"):
|
|
|
|
|
TerminalObjects.ensure_string_property(obj, name, "QET Routing", description, value)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _clean_endpoint_metadata(endpoint_metadata):
|
|
|
|
|
if not isinstance(endpoint_metadata, dict):
|
|
|
|
|
return {}
|
|
|
|
|
allowed = (
|
|
|
|
|
"start_element_uuid",
|
|
|
|
|
"start_terminal_display",
|
|
|
|
|
"start_device_label",
|
|
|
|
|
"end_element_uuid",
|
|
|
|
|
"end_terminal_display",
|
|
|
|
|
"end_device_label",
|
|
|
|
|
"endpoint_label",
|
|
|
|
|
)
|
|
|
|
|
cleaned = {}
|
|
|
|
|
for key in allowed:
|
|
|
|
|
value = str(endpoint_metadata.get(key, "") or "").strip()
|
|
|
|
|
if value:
|
|
|
|
|
cleaned[key] = value
|
|
|
|
|
return cleaned
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_endpoint_metadata(wire, endpoint_metadata):
|
|
|
|
|
metadata = _clean_endpoint_metadata(endpoint_metadata)
|
|
|
|
|
property_names = {
|
|
|
|
|
"start_element_uuid": "QetStartElementUuid",
|
|
|
|
|
"start_terminal_display": "QetStartTerminalDisplay",
|
|
|
|
|
"start_device_label": "QetStartDeviceLabel",
|
|
|
|
|
"end_element_uuid": "QetEndElementUuid",
|
|
|
|
|
"end_terminal_display": "QetEndTerminalDisplay",
|
|
|
|
|
"end_device_label": "QetEndDeviceLabel",
|
|
|
|
|
"endpoint_label": "QetEndpointLabel",
|
|
|
|
|
}
|
|
|
|
|
for key, prop_name in property_names.items():
|
|
|
|
|
if key in metadata:
|
|
|
|
|
_set_string(wire, prop_name, metadata[key], "QET routed wire endpoint metadata")
|
|
|
|
|
return metadata
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=None):
|
|
|
|
|
def _route_payload(route_data, collisions, wire_style_id=""):
|
|
|
|
|
points = route_data.get("points", [])
|
|
|
|
|
payload = {
|
|
|
|
|
return {
|
|
|
|
|
"algorithm": route_data.get("algorithm", ""),
|
|
|
|
|
"length_mm": _route_length(points),
|
|
|
|
|
"wire_style_id": str(wire_style_id or "").strip(),
|
|
|
|
|
@ -698,15 +566,10 @@ def _route_payload(route_data, collisions, wire_style_id="", endpoint_metadata=N
|
|
|
|
|
"network": route_data.get("network", {}),
|
|
|
|
|
"route_track": route_data.get("route_track", {}),
|
|
|
|
|
}
|
|
|
|
|
metadata = _clean_endpoint_metadata(endpoint_metadata)
|
|
|
|
|
if metadata:
|
|
|
|
|
payload["endpoint_metadata"] = metadata
|
|
|
|
|
return payload
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id="", endpoint_metadata=None):
|
|
|
|
|
def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=""):
|
|
|
|
|
length_mm = _route_length(route_data.get("points", []))
|
|
|
|
|
cleaned_endpoint_metadata = _set_endpoint_metadata(wire, endpoint_metadata)
|
|
|
|
|
_set_string(
|
|
|
|
|
wire,
|
|
|
|
|
"QetRouteAlgorithm",
|
|
|
|
|
@ -728,15 +591,7 @@ def _set_routing_connection_metadata(wire, route_data, collisions, wire_style_id
|
|
|
|
|
_set_string(
|
|
|
|
|
wire,
|
|
|
|
|
"QetRouteDiagnosticsJson",
|
|
|
|
|
json.dumps(
|
|
|
|
|
_route_payload(
|
|
|
|
|
route_data,
|
|
|
|
|
collisions,
|
|
|
|
|
wire_style_id=wire_style_id,
|
|
|
|
|
endpoint_metadata=cleaned_endpoint_metadata,
|
|
|
|
|
),
|
|
|
|
|
ensure_ascii=False,
|
|
|
|
|
),
|
|
|
|
|
json.dumps(_route_payload(route_data, collisions, wire_style_id=wire_style_id), ensure_ascii=False),
|
|
|
|
|
"Routing connection diagnostics",
|
|
|
|
|
)
|
|
|
|
|
if route_data.get("network"):
|
|
|
|
|
@ -774,8 +629,8 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
|
|
|
|
|
if network.get("segment_count", 0) <= 0:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
start_key, start_distance, start_mode = RoutingNetwork.connect_point_to_network(network, start_exit)
|
|
|
|
|
end_key, end_distance, end_mode = RoutingNetwork.connect_point_to_network(network, end_exit)
|
|
|
|
|
start_key, start_distance = RoutingNetwork.nearest_node(network, start_exit)
|
|
|
|
|
end_key, end_distance = RoutingNetwork.nearest_node(network, end_exit)
|
|
|
|
|
if start_key is None or end_key is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
@ -792,8 +647,6 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
|
|
|
|
|
end_key,
|
|
|
|
|
bend_penalty=float(opts.get("bend_penalty", 0.0) or 0.0),
|
|
|
|
|
kind_cost_factors=opts.get("carrier_kind_cost_factors", {}),
|
|
|
|
|
segment_usage_costs=opts.get("segment_usage_costs", {}),
|
|
|
|
|
segment_reuse_penalty=float(opts.get("segment_reuse_penalty", 0.0) or 0.0),
|
|
|
|
|
)
|
|
|
|
|
path_keys = path_result.get("path", []) if isinstance(path_result, dict) else []
|
|
|
|
|
if not path_keys:
|
|
|
|
|
@ -802,7 +655,7 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
|
|
|
|
|
carrier_points = RoutingNetwork.path_points(network, path_keys)
|
|
|
|
|
if not carrier_points:
|
|
|
|
|
return None
|
|
|
|
|
lane = _lane_payload(route_index, opts, route_points=carrier_points)
|
|
|
|
|
lane = _lane_payload(route_index, opts)
|
|
|
|
|
carrier_points = _apply_lane_offset(carrier_points, lane)
|
|
|
|
|
|
|
|
|
|
points = []
|
|
|
|
|
@ -813,10 +666,6 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
|
|
|
|
|
_append_unique(points, point)
|
|
|
|
|
_append_orthogonal(points, end_exit)
|
|
|
|
|
_append_unique(points, end_origin)
|
|
|
|
|
points = _simplify_collinear_points(
|
|
|
|
|
points,
|
|
|
|
|
preserved_point_keys=_important_route_node_keys(network, path_keys, path_result),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"algorithm": "network-dijkstra-v1",
|
|
|
|
|
@ -828,8 +677,6 @@ def build_network_route(start_terminal, end_terminal, route_index=0, options=Non
|
|
|
|
|
"nodes": len(network.get("nodes", {})),
|
|
|
|
|
"entry_distance": float(start_distance or 0.0),
|
|
|
|
|
"exit_distance": float(end_distance or 0.0),
|
|
|
|
|
"entry_point_mode": start_mode,
|
|
|
|
|
"exit_point_mode": end_mode,
|
|
|
|
|
"obstacle_aware": bool(obstacle_aware),
|
|
|
|
|
},
|
|
|
|
|
"route_track": path_result,
|
|
|
|
|
@ -1029,19 +876,14 @@ def detect_collisions(points, obstacles, ignored_segment_indices=None):
|
|
|
|
|
end = points[index + 1]
|
|
|
|
|
for obstacle in obstacles:
|
|
|
|
|
if _segment_intersects_bbox(start, end, obstacle["bbox"]):
|
|
|
|
|
raw_bbox = obstacle.get("raw_bbox") or obstacle.get("bbox") or {}
|
|
|
|
|
collision_kind = "HardIntersection"
|
|
|
|
|
if raw_bbox and not _segment_intersects_bbox(start, end, raw_bbox):
|
|
|
|
|
collision_kind = "ClearanceWarning"
|
|
|
|
|
collisions.append(
|
|
|
|
|
{
|
|
|
|
|
"segment_index": index,
|
|
|
|
|
"segment_start": _point_payload(start),
|
|
|
|
|
"segment_end": _point_payload(end),
|
|
|
|
|
"collision_kind": collision_kind,
|
|
|
|
|
"obstacle_name": obstacle.get("name", ""),
|
|
|
|
|
"obstacle_label": obstacle.get("label", ""),
|
|
|
|
|
"obstacle_bbox": dict(raw_bbox),
|
|
|
|
|
"obstacle_bbox": dict(obstacle.get("raw_bbox") or obstacle.get("bbox") or {}),
|
|
|
|
|
"collision_bbox": dict(obstacle.get("bbox", {}) or {}),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
@ -1077,8 +919,8 @@ def _detach_object_from_groups(doc, obj):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
|
|
|
|
|
matches = []
|
|
|
|
|
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
|
|
|
|
|
removed = 0
|
|
|
|
|
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
|
|
|
|
|
if (getattr(obj, "RouteType", "") or "").strip() != "RoutedConnection":
|
|
|
|
|
continue
|
|
|
|
|
@ -1096,13 +938,6 @@ def _matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=
|
|
|
|
|
)
|
|
|
|
|
if not same_direction and not reverse_direction:
|
|
|
|
|
continue
|
|
|
|
|
matches.append(obj)
|
|
|
|
|
return matches
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _remove_routing_connection_objects(doc, objects):
|
|
|
|
|
removed = 0
|
|
|
|
|
for obj in list(objects or []):
|
|
|
|
|
try:
|
|
|
|
|
_detach_object_from_groups(doc, obj)
|
|
|
|
|
doc.removeObject(obj.Name)
|
|
|
|
|
@ -1112,13 +947,6 @@ def _remove_routing_connection_objects(doc, objects):
|
|
|
|
|
return removed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=""):
|
|
|
|
|
return _remove_routing_connection_objects(
|
|
|
|
|
doc,
|
|
|
|
|
_matching_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _find_task_by_wire_uuid(doc, wire_uuid):
|
|
|
|
|
if not wire_uuid:
|
|
|
|
|
return None
|
|
|
|
|
@ -1175,7 +1003,6 @@ def route_eplan_connection_between_terminals(
|
|
|
|
|
wire_mark="",
|
|
|
|
|
wire_mark_is_manual=False,
|
|
|
|
|
wire_style_id="",
|
|
|
|
|
endpoint_metadata=None,
|
|
|
|
|
):
|
|
|
|
|
if doc is None:
|
|
|
|
|
raise AutoRoutingError("No FreeCAD document is available.")
|
|
|
|
|
@ -1194,6 +1021,9 @@ def route_eplan_connection_between_terminals(
|
|
|
|
|
if not project_uuid:
|
|
|
|
|
raise AutoRoutingError("Project UUID is required for routing connections.")
|
|
|
|
|
|
|
|
|
|
if opts.get("replace_existing", True):
|
|
|
|
|
_remove_existing_routing_connections(doc, start_uuid, end_uuid, wire_uuid=wire_uuid)
|
|
|
|
|
|
|
|
|
|
route_data = build_network_route(
|
|
|
|
|
start_terminal,
|
|
|
|
|
end_terminal,
|
|
|
|
|
@ -1217,67 +1047,39 @@ def route_eplan_connection_between_terminals(
|
|
|
|
|
collisions = detect_collisions(points, obstacles, ignored_segment_indices=ignored_collision_segments)
|
|
|
|
|
status = "CollisionWarning" if collisions else "Routed"
|
|
|
|
|
|
|
|
|
|
existing_replacements = []
|
|
|
|
|
if opts.get("replace_existing", True):
|
|
|
|
|
existing_replacements = _matching_existing_routing_connections(
|
|
|
|
|
doc,
|
|
|
|
|
start_uuid,
|
|
|
|
|
end_uuid,
|
|
|
|
|
wire_uuid=wire_uuid,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
wire_name = _unique_name(doc, _wire_object_name(start_terminal, end_terminal, wire_uuid))
|
|
|
|
|
wire = None
|
|
|
|
|
try:
|
|
|
|
|
wire = _create_wire_geometry(doc, wire_name, points)
|
|
|
|
|
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
|
|
|
|
|
WiringObjects.set_routed_wire_semantics(
|
|
|
|
|
wire,
|
|
|
|
|
project_uuid,
|
|
|
|
|
wire_uuid,
|
|
|
|
|
wire_label or wire_mark or wire_uuid,
|
|
|
|
|
start_uuid,
|
|
|
|
|
end_uuid,
|
|
|
|
|
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
|
|
|
|
|
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
|
|
|
|
|
route_type="RoutedConnection",
|
|
|
|
|
route_status=status,
|
|
|
|
|
route_mode="EplanRoute",
|
|
|
|
|
net_uuid=net_uuid,
|
|
|
|
|
group_uuid=group_uuid,
|
|
|
|
|
wire_mark=wire_mark,
|
|
|
|
|
wire_mark_is_manual=wire_mark_is_manual,
|
|
|
|
|
)
|
|
|
|
|
_set_routing_connection_metadata(
|
|
|
|
|
wire,
|
|
|
|
|
route_data,
|
|
|
|
|
collisions,
|
|
|
|
|
wire_style_id=effective_wire_style_id,
|
|
|
|
|
endpoint_metadata=endpoint_metadata,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
|
|
|
|
|
if wire not in getattr(routed_group, "Group", []):
|
|
|
|
|
routed_group.addObject(wire)
|
|
|
|
|
try:
|
|
|
|
|
routed_group.ViewObject.Visibility = True
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
_style_wire(wire, collision_count=len(collisions))
|
|
|
|
|
wire = _create_wire_geometry(doc, wire_name, points)
|
|
|
|
|
wire.Label = wire_label or wire_mark or wire_uuid or "QET Routed Connection"
|
|
|
|
|
WiringObjects.set_routed_wire_semantics(
|
|
|
|
|
wire,
|
|
|
|
|
project_uuid,
|
|
|
|
|
wire_uuid,
|
|
|
|
|
wire_label or wire_mark or wire_uuid,
|
|
|
|
|
start_uuid,
|
|
|
|
|
end_uuid,
|
|
|
|
|
(getattr(start_terminal, "QetInstanceId", "") or "").strip(),
|
|
|
|
|
(getattr(end_terminal, "QetInstanceId", "") or "").strip(),
|
|
|
|
|
route_type="RoutedConnection",
|
|
|
|
|
route_status=status,
|
|
|
|
|
route_mode="EplanRoute",
|
|
|
|
|
net_uuid=net_uuid,
|
|
|
|
|
group_uuid=group_uuid,
|
|
|
|
|
wire_mark=wire_mark,
|
|
|
|
|
wire_mark_is_manual=wire_mark_is_manual,
|
|
|
|
|
)
|
|
|
|
|
_set_routing_connection_metadata(wire, route_data, collisions, wire_style_id=effective_wire_style_id)
|
|
|
|
|
|
|
|
|
|
task = _find_task_by_wire_uuid(doc, wire_uuid)
|
|
|
|
|
_set_task_status(task, status)
|
|
|
|
|
routed_group = WiringObjects.ensure_routed_group(doc, project_uuid)
|
|
|
|
|
if wire not in getattr(routed_group, "Group", []):
|
|
|
|
|
routed_group.addObject(wire)
|
|
|
|
|
try:
|
|
|
|
|
routed_group.ViewObject.Visibility = True
|
|
|
|
|
except Exception:
|
|
|
|
|
if wire is not None:
|
|
|
|
|
_remove_routing_connection_objects(doc, [wire])
|
|
|
|
|
raise
|
|
|
|
|
pass
|
|
|
|
|
_style_wire(wire, collision_count=len(collisions))
|
|
|
|
|
|
|
|
|
|
if existing_replacements:
|
|
|
|
|
removed_existing = _remove_routing_connection_objects(doc, existing_replacements)
|
|
|
|
|
if removed_existing != len(existing_replacements):
|
|
|
|
|
if wire is not None:
|
|
|
|
|
_remove_routing_connection_objects(doc, [wire])
|
|
|
|
|
raise AutoRoutingError("Failed to replace existing routed connection.")
|
|
|
|
|
task = _find_task_by_wire_uuid(doc, wire_uuid)
|
|
|
|
|
_set_task_status(task, status)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
doc.recompute()
|
|
|
|
|
@ -1320,65 +1122,6 @@ def _route_lane_key(start_uuid, end_uuid):
|
|
|
|
|
return tuple(endpoints)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _route_segment_key(segment):
|
|
|
|
|
if not isinstance(segment, dict):
|
|
|
|
|
return None
|
|
|
|
|
carrier = segment.get("carrier") or {}
|
|
|
|
|
carrier_name = str(carrier.get("name", "") or "").strip()
|
|
|
|
|
from_key = tuple(segment.get("from_key", []) or [])
|
|
|
|
|
to_key = tuple(segment.get("to_key", []) or [])
|
|
|
|
|
if not from_key or not to_key:
|
|
|
|
|
return None
|
|
|
|
|
return (
|
|
|
|
|
carrier_name,
|
|
|
|
|
tuple(sorted((from_key, to_key))),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _route_segment_keys(result):
|
|
|
|
|
route_track = result.get("route_track", {}) if isinstance(result, dict) else {}
|
|
|
|
|
return _route_track_segment_keys(route_track)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _route_track_segment_keys(route_track):
|
|
|
|
|
segments = route_track.get("segments", []) if isinstance(route_track, dict) else []
|
|
|
|
|
keys = []
|
|
|
|
|
for segment in segments or []:
|
|
|
|
|
key = _route_segment_key(segment)
|
|
|
|
|
if key is not None:
|
|
|
|
|
keys.append(key)
|
|
|
|
|
return keys
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _incoming_wire_uuids(wires):
|
|
|
|
|
wire_uuids = set()
|
|
|
|
|
for item in wires or []:
|
|
|
|
|
if not isinstance(item, dict):
|
|
|
|
|
continue
|
|
|
|
|
wire_uuid = _wire_item_value(item, "wire_id", "wire_uuid", "id")
|
|
|
|
|
if wire_uuid:
|
|
|
|
|
wire_uuids.add(wire_uuid)
|
|
|
|
|
return wire_uuids
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _existing_routed_segment_usage(doc, excluded_wire_uuids=None):
|
|
|
|
|
excluded_wire_uuids = set(excluded_wire_uuids or [])
|
|
|
|
|
usage = {}
|
|
|
|
|
for wire in list(WiringObjects.iter_routed_wire_objects(doc)):
|
|
|
|
|
if (getattr(wire, "RouteType", "") or "").strip() != "RoutedConnection":
|
|
|
|
|
continue
|
|
|
|
|
wire_uuid = (getattr(wire, "QetWireUuid", "") or "").strip()
|
|
|
|
|
if wire_uuid and wire_uuid in excluded_wire_uuids:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
route_track = json.loads((getattr(wire, "QetRouteTrackJson", "") or "").strip() or "{}")
|
|
|
|
|
except Exception:
|
|
|
|
|
route_track = {}
|
|
|
|
|
for segment_key in _route_track_segment_keys(route_track):
|
|
|
|
|
usage[segment_key] = usage.get(segment_key, 0) + 1
|
|
|
|
|
return usage
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def bind_wire_task_terminals_from_payload(doc, payload):
|
|
|
|
|
"""Bind local template terminals to QET terminal UUIDs without creating wires."""
|
|
|
|
|
if doc is None:
|
|
|
|
|
@ -1459,36 +1202,11 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
|
|
|
|
|
report["prepared_layout"] = prepared_layout
|
|
|
|
|
missing_endpoint_uuids = set()
|
|
|
|
|
lane_indexes_by_pair = {}
|
|
|
|
|
lane_indexes_by_segment = {}
|
|
|
|
|
segment_usage_costs = _existing_routed_segment_usage(
|
|
|
|
|
doc,
|
|
|
|
|
excluded_wire_uuids=_incoming_wire_uuids(wires),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def add_status(status):
|
|
|
|
|
key = str(status or "").strip() or "Unknown"
|
|
|
|
|
report["route_status_counts"][key] = report["route_status_counts"].get(key, 0) + 1
|
|
|
|
|
|
|
|
|
|
def create_route(route_lane_index, item, start_terminal, end_terminal, endpoint_metadata):
|
|
|
|
|
route_options = dict(options or {})
|
|
|
|
|
if isinstance(item, dict) and "__segment_usage_costs" in item:
|
|
|
|
|
route_options["segment_usage_costs"] = item.get("__segment_usage_costs", {})
|
|
|
|
|
return route_eplan_connection_between_terminals(
|
|
|
|
|
doc,
|
|
|
|
|
start_terminal,
|
|
|
|
|
end_terminal,
|
|
|
|
|
route_index=route_lane_index,
|
|
|
|
|
options=route_options,
|
|
|
|
|
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
|
|
|
|
|
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
|
|
|
|
|
net_uuid=_wire_item_value(item, "net_uuid"),
|
|
|
|
|
group_uuid=_wire_item_value(item, "group_uuid"),
|
|
|
|
|
wire_mark=_wire_item_value(item, "wire_mark"),
|
|
|
|
|
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
|
|
|
|
|
wire_style_id=_wire_item_value(item, "wire_style_id"),
|
|
|
|
|
endpoint_metadata=endpoint_metadata,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for item in wires:
|
|
|
|
|
if not isinstance(item, dict):
|
|
|
|
|
report["skipped_invalid"] += 1
|
|
|
|
|
@ -1524,42 +1242,20 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
|
|
|
|
|
lane_key = _route_lane_key(start_uuid, end_uuid)
|
|
|
|
|
route_lane_index = lane_indexes_by_pair.get(lane_key, 0)
|
|
|
|
|
try:
|
|
|
|
|
endpoint_metadata = {
|
|
|
|
|
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
|
|
|
|
|
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
|
|
|
|
|
"start_device_label": _wire_item_value(item, "start_device_label"),
|
|
|
|
|
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
|
|
|
|
|
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
|
|
|
|
|
"end_device_label": _wire_item_value(item, "end_device_label"),
|
|
|
|
|
"endpoint_label": _wire_item_value(item, "endpoint_label"),
|
|
|
|
|
}
|
|
|
|
|
result = create_route(
|
|
|
|
|
route_lane_index,
|
|
|
|
|
dict(item, __segment_usage_costs=segment_usage_costs),
|
|
|
|
|
result = route_eplan_connection_between_terminals(
|
|
|
|
|
doc,
|
|
|
|
|
start_terminal,
|
|
|
|
|
end_terminal,
|
|
|
|
|
endpoint_metadata,
|
|
|
|
|
route_index=route_lane_index,
|
|
|
|
|
options=options,
|
|
|
|
|
wire_uuid=_wire_item_value(item, "wire_id", "wire_uuid", "id"),
|
|
|
|
|
wire_label=_wire_item_value(item, "wire_label", "wire_mark"),
|
|
|
|
|
net_uuid=_wire_item_value(item, "net_uuid"),
|
|
|
|
|
group_uuid=_wire_item_value(item, "group_uuid"),
|
|
|
|
|
wire_mark=_wire_item_value(item, "wire_mark"),
|
|
|
|
|
wire_mark_is_manual=bool(item.get("wire_mark_is_manual", False)),
|
|
|
|
|
wire_style_id=_wire_item_value(item, "wire_style_id"),
|
|
|
|
|
)
|
|
|
|
|
route_segment_keys = _route_segment_keys(result)
|
|
|
|
|
shared_lane_index = max(
|
|
|
|
|
[lane_indexes_by_segment.get(key, 0) for key in route_segment_keys] or [0]
|
|
|
|
|
)
|
|
|
|
|
final_lane_index = max(route_lane_index, shared_lane_index)
|
|
|
|
|
if final_lane_index != route_lane_index:
|
|
|
|
|
initial_wire = result.get("wire") if isinstance(result, dict) else None
|
|
|
|
|
try:
|
|
|
|
|
result = create_route(
|
|
|
|
|
final_lane_index,
|
|
|
|
|
dict(item, __segment_usage_costs=segment_usage_costs),
|
|
|
|
|
start_terminal,
|
|
|
|
|
end_terminal,
|
|
|
|
|
endpoint_metadata,
|
|
|
|
|
)
|
|
|
|
|
except Exception:
|
|
|
|
|
if initial_wire is not None:
|
|
|
|
|
_remove_routing_connection_objects(doc, [initial_wire])
|
|
|
|
|
raise
|
|
|
|
|
route_segment_keys = _route_segment_keys(result)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
error_text = str(exc)
|
|
|
|
|
report["errors"].append(error_text)
|
|
|
|
|
@ -1570,28 +1266,12 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
|
|
|
|
|
"wire_uuid": _wire_item_value(item, "wire_id", "wire_uuid", "id"),
|
|
|
|
|
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
|
|
|
|
|
"start_terminal_uuid": start_uuid,
|
|
|
|
|
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
|
|
|
|
|
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
|
|
|
|
|
"start_device_label": _wire_item_value(item, "start_device_label"),
|
|
|
|
|
"end_terminal_uuid": end_uuid,
|
|
|
|
|
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
|
|
|
|
|
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
|
|
|
|
|
"end_device_label": _wire_item_value(item, "end_device_label"),
|
|
|
|
|
"endpoint_label": _wire_item_value(item, "endpoint_label"),
|
|
|
|
|
"error": error_text,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
lane_indexes_by_pair[lane_key] = max(
|
|
|
|
|
lane_indexes_by_pair.get(lane_key, 0),
|
|
|
|
|
int(result.get("lane", {}).get("index", 0) or 0) + 1,
|
|
|
|
|
)
|
|
|
|
|
for segment_key in route_segment_keys:
|
|
|
|
|
lane_indexes_by_segment[segment_key] = max(
|
|
|
|
|
lane_indexes_by_segment.get(segment_key, 0),
|
|
|
|
|
int(result.get("lane", {}).get("index", 0) or 0) + 1,
|
|
|
|
|
)
|
|
|
|
|
segment_usage_costs[segment_key] = segment_usage_costs.get(segment_key, 0) + 1
|
|
|
|
|
lane_indexes_by_pair[lane_key] = route_lane_index + 1
|
|
|
|
|
if result["route_status"] == "CollisionWarning":
|
|
|
|
|
report["collision_warnings"] += 1
|
|
|
|
|
add_status(result["route_status"])
|
|
|
|
|
@ -1618,14 +1298,7 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
|
|
|
|
|
"wire_label": _wire_item_value(item, "wire_label", "wire_mark"),
|
|
|
|
|
"wire_style_id": _wire_item_value(item, "wire_style_id"),
|
|
|
|
|
"start_terminal_uuid": start_uuid,
|
|
|
|
|
"start_element_uuid": _wire_item_value(item, "start_element_uuid"),
|
|
|
|
|
"start_terminal_display": _wire_item_value(item, "start_terminal_display"),
|
|
|
|
|
"start_device_label": _wire_item_value(item, "start_device_label"),
|
|
|
|
|
"end_terminal_uuid": end_uuid,
|
|
|
|
|
"end_element_uuid": _wire_item_value(item, "end_element_uuid"),
|
|
|
|
|
"end_terminal_display": _wire_item_value(item, "end_terminal_display"),
|
|
|
|
|
"end_device_label": _wire_item_value(item, "end_device_label"),
|
|
|
|
|
"endpoint_label": _wire_item_value(item, "endpoint_label"),
|
|
|
|
|
"algorithm": result["algorithm"],
|
|
|
|
|
"route_status": result["route_status"],
|
|
|
|
|
"length_mm": route_length,
|
|
|
|
|
@ -1641,54 +1314,6 @@ def route_eplan_connections_from_payload(doc, payload, options=None, prepared_la
|
|
|
|
|
return report
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _missing_endpoint_label(sample, side):
|
|
|
|
|
terminal_uuid = str(sample.get("{0}_terminal_uuid".format(side), "") or "").strip()
|
|
|
|
|
element_uuid = str(sample.get("{0}_element_uuid".format(side), "") or "").strip()
|
|
|
|
|
terminal_display = str(sample.get("{0}_terminal_display".format(side), "") or "").strip()
|
|
|
|
|
if element_uuid and terminal_display:
|
|
|
|
|
label = "{0}/{1}".format(element_uuid, terminal_display)
|
|
|
|
|
elif terminal_display:
|
|
|
|
|
label = terminal_display
|
|
|
|
|
elif element_uuid:
|
|
|
|
|
label = element_uuid
|
|
|
|
|
else:
|
|
|
|
|
return terminal_uuid
|
|
|
|
|
if terminal_uuid and terminal_uuid != label:
|
|
|
|
|
return "{0} ({1})".format(label, terminal_uuid)
|
|
|
|
|
return label
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _missing_endpoint_side_summary(sample):
|
|
|
|
|
missing_sides = []
|
|
|
|
|
if sample.get("start_found") is False:
|
|
|
|
|
missing_sides.append("起点")
|
|
|
|
|
if sample.get("end_found") is False:
|
|
|
|
|
missing_sides.append("终点")
|
|
|
|
|
if not missing_sides:
|
|
|
|
|
return ""
|
|
|
|
|
if len(missing_sides) == 2:
|
|
|
|
|
return "(缺失:两端)"
|
|
|
|
|
return "(缺失:{0})".format(missing_sides[0])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _wire_sample_text(sample):
|
|
|
|
|
return (
|
|
|
|
|
str(sample.get("wire_label", "") or "").strip()
|
|
|
|
|
or str(sample.get("wire_uuid", "") or "").strip()
|
|
|
|
|
or "未知导线"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _endpoint_pair_text(sample):
|
|
|
|
|
endpoint_label = str(sample.get("endpoint_label", "") or "").strip()
|
|
|
|
|
if endpoint_label:
|
|
|
|
|
return endpoint_label
|
|
|
|
|
return "{0} -> {1}".format(
|
|
|
|
|
_missing_endpoint_label(sample, "start"),
|
|
|
|
|
_missing_endpoint_label(sample, "end"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_eplan_connection_route_report(report):
|
|
|
|
|
message = "批量生成布线连接完成:routed={0}, collision_warnings={1}, missing_terminals={2}".format(
|
|
|
|
|
report.get("routed", 0),
|
|
|
|
|
@ -1734,13 +1359,6 @@ def format_eplan_connection_route_report(report):
|
|
|
|
|
errors = report.get("errors", []) or []
|
|
|
|
|
if errors:
|
|
|
|
|
message += "\n首个错误:{0}".format(str(errors[0]))
|
|
|
|
|
error_sample = (report.get("error_samples") or [None])[0]
|
|
|
|
|
if error_sample:
|
|
|
|
|
message += "\n错误示例:导线 {0},{1}:{2}".format(
|
|
|
|
|
_wire_sample_text(error_sample),
|
|
|
|
|
_endpoint_pair_text(error_sample),
|
|
|
|
|
error_sample.get("error", ""),
|
|
|
|
|
)
|
|
|
|
|
collision_sample = (report.get("collision_samples") or [None])[0]
|
|
|
|
|
if collision_sample:
|
|
|
|
|
obstacle_text = (
|
|
|
|
|
@ -1748,21 +1366,12 @@ def format_eplan_connection_route_report(report):
|
|
|
|
|
or collision_sample.get("obstacle_name")
|
|
|
|
|
or "未知对象"
|
|
|
|
|
)
|
|
|
|
|
wire_text = (
|
|
|
|
|
message += "\n碰撞示例:导线 {0} 碰到 {1}。".format(
|
|
|
|
|
collision_sample.get("wire_label")
|
|
|
|
|
or collision_sample.get("wire_uuid")
|
|
|
|
|
or "未知导线"
|
|
|
|
|
or "未知导线",
|
|
|
|
|
obstacle_text,
|
|
|
|
|
)
|
|
|
|
|
if collision_sample.get("collision_kind") == "ClearanceWarning":
|
|
|
|
|
message += "\n碰撞示例:导线 {0} 进入 {1} 的安全间隙。".format(
|
|
|
|
|
wire_text,
|
|
|
|
|
obstacle_text,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
message += "\n碰撞示例:导线 {0} 碰到 {1}。".format(
|
|
|
|
|
wire_text,
|
|
|
|
|
obstacle_text,
|
|
|
|
|
)
|
|
|
|
|
auto_bound = report.get("auto_bound_terminals", 0)
|
|
|
|
|
auto_created = report.get("auto_created_terminals", 0)
|
|
|
|
|
if auto_bound or auto_created:
|
|
|
|
|
@ -1782,10 +1391,9 @@ def format_eplan_connection_route_report(report):
|
|
|
|
|
message += " 请先从 QET 重新导入/更新工程端子,使端子 UUID 不再是 local:...。"
|
|
|
|
|
sample = (report.get("missing_endpoint_samples") or [None])[0]
|
|
|
|
|
if sample:
|
|
|
|
|
message += "\n缺失示例:{0} -> {1}{2}".format(
|
|
|
|
|
_missing_endpoint_label(sample, "start"),
|
|
|
|
|
_missing_endpoint_label(sample, "end"),
|
|
|
|
|
_missing_endpoint_side_summary(sample),
|
|
|
|
|
message += "\n缺失示例:{0} -> {1}".format(
|
|
|
|
|
sample.get("start_terminal_uuid", ""),
|
|
|
|
|
sample.get("end_terminal_uuid", ""),
|
|
|
|
|
)
|
|
|
|
|
return message
|
|
|
|
|
|
|
|
|
|
@ -1873,13 +1481,10 @@ def _wire_tasks_payload(doc):
|
|
|
|
|
"start_instance_id": (getattr(task, "QetStartInstanceId", "") or "").strip(),
|
|
|
|
|
"start_terminal_uuid": (getattr(task, "QetStartTerminalUuid", "") or "").strip(),
|
|
|
|
|
"start_terminal_display": (getattr(task, "QetStartTerminalDisplay", "") or "").strip(),
|
|
|
|
|
"start_device_label": (getattr(task, "QetStartDeviceLabel", "") or "").strip(),
|
|
|
|
|
"end_element_uuid": (getattr(task, "QetEndElementUuid", "") or "").strip(),
|
|
|
|
|
"end_instance_id": (getattr(task, "QetEndInstanceId", "") or "").strip(),
|
|
|
|
|
"end_terminal_uuid": (getattr(task, "QetEndTerminalUuid", "") or "").strip(),
|
|
|
|
|
"end_terminal_display": (getattr(task, "QetEndTerminalDisplay", "") or "").strip(),
|
|
|
|
|
"end_device_label": (getattr(task, "QetEndDeviceLabel", "") or "").strip(),
|
|
|
|
|
"endpoint_label": (getattr(task, "QetEndpointLabel", "") or "").strip(),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
return payload
|
|
|
|
|
@ -1961,92 +1566,6 @@ def check_eplan_routing_path_network(doc, project_uuid="", options=None):
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_distance_mm(value):
|
|
|
|
|
try:
|
|
|
|
|
return "{0:.1f} mm".format(float(value))
|
|
|
|
|
except Exception:
|
|
|
|
|
return "未知距离"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_point_text(point):
|
|
|
|
|
if not isinstance(point, dict):
|
|
|
|
|
return "未知位置"
|
|
|
|
|
try:
|
|
|
|
|
return "({0:.1f}, {1:.1f}, {2:.1f})".format(
|
|
|
|
|
float(point.get("x", 0.0)),
|
|
|
|
|
float(point.get("y", 0.0)),
|
|
|
|
|
float(point.get("z", 0.0)),
|
|
|
|
|
)
|
|
|
|
|
except Exception:
|
|
|
|
|
return "未知位置"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _diagnostic_terminal_text(item):
|
|
|
|
|
if not isinstance(item, dict):
|
|
|
|
|
return "未知端子"
|
|
|
|
|
return (
|
|
|
|
|
item.get("terminal_uuid")
|
|
|
|
|
or item.get("label")
|
|
|
|
|
or item.get("name")
|
|
|
|
|
or "未知端子"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _dict_items(value):
|
|
|
|
|
if not isinstance(value, list):
|
|
|
|
|
return []
|
|
|
|
|
return [item for item in value if isinstance(item, dict)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_routing_path_network_report(diagnostic):
|
|
|
|
|
"""Return an actionable Chinese summary for routing path network diagnostics."""
|
|
|
|
|
if not isinstance(diagnostic, dict):
|
|
|
|
|
return "布线路径网络检查失败:诊断结果无效。"
|
|
|
|
|
|
|
|
|
|
summary = diagnostic.get("summary", {}) if isinstance(diagnostic.get("summary", {}), dict) else {}
|
|
|
|
|
issues = _dict_items(diagnostic.get("issues", []) or [])
|
|
|
|
|
if not issues:
|
|
|
|
|
return "布线路径网络检查通过:{0} 条 carrier / {1} 段 / {2} 个节点。".format(
|
|
|
|
|
summary.get("carriers", 0),
|
|
|
|
|
summary.get("segments", 0),
|
|
|
|
|
summary.get("nodes", 0),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
message = "布线路径网络检查发现 {0} 类问题。".format(len(issues))
|
|
|
|
|
unconnected = _dict_items(diagnostic.get("unconnected_terminals", []) or [])
|
|
|
|
|
if unconnected:
|
|
|
|
|
sample = unconnected[0]
|
|
|
|
|
message += "\n端子未接入:{0},距离最近网络 {1}。请重新生成布线路径网络,或补一段线槽/辅助路径到该端子。".format(
|
|
|
|
|
_diagnostic_terminal_text(sample),
|
|
|
|
|
_format_distance_mm(sample.get("nearest_network_distance_mm")),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
possible_breaks = _dict_items(diagnostic.get("possible_breaks", []) or [])
|
|
|
|
|
if possible_breaks:
|
|
|
|
|
sample = possible_breaks[0]
|
|
|
|
|
carrier = sample.get("carrier", {}) if isinstance(sample.get("carrier", {}), dict) else {}
|
|
|
|
|
carrier_text = carrier.get("label") or carrier.get("name") or "未知线槽"
|
|
|
|
|
message += "\n线槽端点疑似断开:{0} @ {1}。请补齐相邻线槽、开口或辅助路径。".format(
|
|
|
|
|
carrier_text,
|
|
|
|
|
_format_point_text(sample.get("point")),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
isolated = _dict_items(diagnostic.get("isolated_components", []) or [])
|
|
|
|
|
if isolated:
|
|
|
|
|
sample = isolated[0]
|
|
|
|
|
carriers = sample.get("carrier_labels") or sample.get("carrier_names") or []
|
|
|
|
|
carrier_text = "、".join([str(item) for item in carriers[:3]]) if carriers else "未知 carrier"
|
|
|
|
|
message += "\n存在孤立路径网络:{0}。请用线槽/辅助路径把孤立网络接入主网络。".format(carrier_text)
|
|
|
|
|
|
|
|
|
|
if not (unconnected or possible_breaks or isolated):
|
|
|
|
|
first_issue = issues[0]
|
|
|
|
|
message += "\n首个问题:{0} ({1})。".format(
|
|
|
|
|
first_issue.get("code", "unknown"),
|
|
|
|
|
first_issue.get("count", 0),
|
|
|
|
|
)
|
|
|
|
|
return message
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_eplan_routing_path_network(doc, project_uuid="", options=None, selection_ex=None):
|
|
|
|
|
"""Update the routing path network before EPLAN-style Route."""
|
|
|
|
|
return generate_eplan_routing_path_network(
|
|
|
|
|
|