You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

597 lines
20 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# FreeCADExchange wire task import helpers.
import json
import re
import FreeCAD as App
import TerminalObjects
import WiringObjects
class WiringImportError(RuntimeError):
pass
def _string_value(item, field_name):
value = item.get(field_name, "")
if value is None:
return ""
if not isinstance(value, str):
return str(value).strip()
return value.strip()
def _bool_value(item, field_name):
return bool(item.get(field_name, False))
def _int_text_value(item, field_name):
value = item.get(field_name, "")
if value is None:
return ""
try:
return str(int(value)).strip()
except Exception:
return str(value).strip()
def _conductor_uuids(item):
values = item.get("conductor_uuids", [])
if not isinstance(values, list):
return []
return [str(value).strip() for value in values if str(value).strip()]
def _wire_style_payload(item):
wire_style = item.get("wire_style", {})
if not isinstance(wire_style, dict):
return {}
return dict(wire_style)
def _canonical_wire_style_json(wire_style):
if not isinstance(wire_style, dict):
wire_style = {}
return json.dumps(wire_style, ensure_ascii=False, sort_keys=True)
def _style_text(wire_style, key):
if not isinstance(wire_style, dict):
return ""
value = wire_style.get(key, "")
if value is None:
return ""
return str(value).strip()
def _style_float_text(wire_style, key):
text = _style_text(wire_style, key)
if not text:
return ""
try:
return "{0:g}".format(float(text))
except Exception:
return text
def _parse_line_color(value):
text = str(value or "").strip()
if not text:
return None
match = re.fullmatch(r"#?([0-9a-fA-F]{6})", text)
if match:
raw = match.group(1)
return tuple(int(raw[index:index + 2], 16) / 255.0 for index in (0, 2, 4))
if "," in text:
try:
numbers = [float(part.strip()) for part in text.split(",")]
except Exception:
numbers = []
if len(numbers) == 3:
if max(numbers) > 1.0:
numbers = [number / 255.0 for number in numbers]
return tuple(max(0.0, min(1.0, number)) for number in numbers)
return None
def _line_width_value(wire_style):
try:
value = float(_style_text(wire_style, "line_width") or 0)
except Exception:
value = 0.0
if value <= 0.0:
return None
return max(0.1, min(20.0, value))
def _draw_style_value(wire_style):
text = _style_text(wire_style, "line_type").lower()
if not text:
return "Solid"
normalized = text.replace("_", "").replace("-", "").replace(" ", "")
if normalized in {"dashline", "dashed", "dash", "dashes", "虚线"}:
return "Dashed"
if normalized in {"dotline", "dotted", "dot", "dots", "点线"}:
return "Dotted"
if normalized in {"dashdotline", "dashdot", "点划线"}:
return "Dashdot"
return "Solid"
def _geometry_style_fields(wire_style):
return {
key: _style_text(wire_style, key)
for key in ("diameter_mm", "bend_radius_mm", "bend_radius_factor", "area_or_spec")
}
def _json_dict(text):
try:
value = json.loads(text or "{}")
except Exception:
return {}
return value if isinstance(value, dict) else {}
def _device_display_map(payload):
labels = {}
for item in payload.get("devices", []) or []:
if not isinstance(item, dict):
continue
display_tag = _string_value(item, "display_tag")
if not display_tag:
continue
# 新交换协议中,一个 3D 设备实例可能合并多个 2D 符号;
# 导线端点仍用 element_uuid所以这里要把组内所有 2D 符号都映射到同一设备标注。
candidate_element_uuids = []
for terminal in item.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = _string_value(terminal, "element_uuid")
if element_uuid:
candidate_element_uuids.append(element_uuid)
for element_uuid in candidate_element_uuids:
labels[element_uuid] = display_tag
return labels
def _endpoint_instance_map(payload):
by_terminal = {}
by_element = {}
for device in payload.get("devices", []) or []:
if not isinstance(device, dict):
continue
device_instance_id = _string_value(device, "device_instance_id")
if not device_instance_id:
continue
for terminal in device.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = _string_value(terminal, "element_uuid")
terminal_uuid = _string_value(terminal, "terminal_uuid")
if element_uuid:
by_element.setdefault(element_uuid, device_instance_id)
if element_uuid and terminal_uuid:
by_terminal[(element_uuid, terminal_uuid)] = device_instance_id
return by_terminal, by_element
def _endpoint_text(device_label, terminal_display, terminal_uuid):
terminal = terminal_display or terminal_uuid
if device_label and terminal:
return "{0}:{1}".format(device_label, terminal)
return device_label or terminal or "未命名端子"
def _normalize_wire_entry(item, index, device_labels=None, endpoint_instances=None):
if not isinstance(item, dict):
raise WiringImportError("Wire entry #{0} must be an object.".format(index))
device_labels = device_labels or {}
endpoint_by_terminal, endpoint_by_element = endpoint_instances or ({}, {})
wire_uuid = (
_string_value(item, "wire_id")
or _string_value(item, "wire_uuid")
or _string_value(item, "id")
)
start_terminal_uuid = _string_value(item, "start_terminal_uuid")
end_terminal_uuid = _string_value(item, "end_terminal_uuid")
if not wire_uuid:
raise WiringImportError("Wire entry #{0} is missing wire_id.".format(index))
if not start_terminal_uuid or not end_terminal_uuid:
raise WiringImportError(
"Wire {0} is missing start_terminal_uuid or end_terminal_uuid.".format(
wire_uuid
)
)
wire_mark = _string_value(item, "wire_mark")
wire_label = _string_value(item, "wire_label") or wire_mark or wire_uuid
start_element_uuid = _string_value(item, "start_element_uuid")
end_element_uuid = _string_value(item, "end_element_uuid")
start_terminal_display = _string_value(item, "start_terminal_display")
end_terminal_display = _string_value(item, "end_terminal_display")
start_instance_id = endpoint_by_terminal.get(
(start_element_uuid, start_terminal_uuid),
endpoint_by_element.get(start_element_uuid, ""),
)
end_instance_id = endpoint_by_terminal.get(
(end_element_uuid, end_terminal_uuid),
endpoint_by_element.get(end_element_uuid, ""),
)
start_device_label = _string_value(item, "start_device_label") or device_labels.get(
start_element_uuid, ""
)
end_device_label = _string_value(item, "end_device_label") or device_labels.get(
end_element_uuid, ""
)
endpoint_label = "{0} -> {1}".format(
_endpoint_text(start_device_label, start_terminal_display, start_terminal_uuid),
_endpoint_text(end_device_label, end_terminal_display, end_terminal_uuid),
)
wire_style = _wire_style_payload(item)
return {
"wire_uuid": wire_uuid,
"wire_label": wire_label,
"net_uuid": _string_value(item, "net_uuid"),
"group_uuid": _string_value(item, "group_uuid"),
"wire_mark": wire_mark,
"wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"),
"wire_style_id": _int_text_value(item, "wire_style_id"),
"wire_style": wire_style,
"start_element_uuid": start_element_uuid,
"start_terminal_uuid": start_terminal_uuid,
"start_instance_id": start_instance_id,
"start_terminal_display": start_terminal_display,
"start_device_label": start_device_label,
"end_element_uuid": end_element_uuid,
"end_terminal_uuid": end_terminal_uuid,
"end_instance_id": end_instance_id,
"end_terminal_display": end_terminal_display,
"end_device_label": end_device_label,
"endpoint_label": endpoint_label,
"conductor_uuids": _conductor_uuids(item),
}
def _unique_object_name(doc, base_name):
name = TerminalObjects.safe_token(base_name)
if doc.getObject(name) is None:
return name
suffix = 1
while doc.getObject("{0}_{1}".format(name, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(name, suffix)
def _task_label(entry):
mark = entry["wire_mark"] or entry["wire_label"] or entry["wire_uuid"]
return "{0} {1}".format(mark, entry["endpoint_label"])
def _find_task_by_wire_uuid(task_group, wire_uuid):
target = (wire_uuid or "").strip()
if not target:
return None
for candidate in list(getattr(task_group, "Group", []) or []):
if getattr(candidate, "QetWireUuid", "").strip() == target:
return candidate
return None
def _ensure_string_property(obj, prop_name, value, description="QET wire task property"):
TerminalObjects.ensure_string_property(
obj,
prop_name,
"QET Wiring",
description,
value,
)
def _ensure_bool_property(obj, prop_name, value, description="QET wire task property"):
TerminalObjects.ensure_bool_property(
obj,
prop_name,
"QET Wiring",
description,
value,
)
def _set_wire_style_properties(obj, wire_style_id, wire_style):
style = wire_style if isinstance(wire_style, dict) else {}
new_style_json = _canonical_wire_style_json(style)
old_style_json = (getattr(obj, "QetWireStyleJson", "") or "").strip()
old_style = _json_dict(old_style_json)
changed = old_style_json != new_style_json
_ensure_string_property(obj, "QetWireStyleId", str(wire_style_id or "").strip())
_ensure_string_property(
obj,
"QetWireStyleJson",
new_style_json,
"QET wire style JSON from 2D payload",
)
_ensure_string_property(obj, "QetWireStyleName", _style_text(style, "name"))
_ensure_string_property(obj, "QetWireLineColor", _style_text(style, "line_color"))
_ensure_string_property(obj, "QetWireLineWidth", _style_float_text(style, "line_width"))
_ensure_string_property(obj, "QetWireDiameterMm", _style_float_text(style, "diameter_mm"))
_ensure_string_property(obj, "QetWireBendRadiusMm", _style_float_text(style, "bend_radius_mm"))
if old_style_json and changed and _geometry_style_fields(old_style) != _geometry_style_fields(style):
_ensure_bool_property(
obj,
"QetWireStyleAffectsGeometry",
True,
"Whether latest QET wire style changes geometry-sensitive fields",
)
elif "QetWireStyleAffectsGeometry" not in getattr(obj, "PropertiesList", []):
_ensure_bool_property(
obj,
"QetWireStyleAffectsGeometry",
False,
"Whether latest QET wire style changes geometry-sensitive fields",
)
return changed
def _apply_wire_style_view(obj, wire_style):
changed = False
view = getattr(obj, "ViewObject", None)
if view is None:
return changed
color = _parse_line_color(_style_text(wire_style, "line_color"))
if color is not None:
for prop_name in ("LineColor", "ShapeColor", "PointColor"):
if not hasattr(view, prop_name):
continue
try:
if tuple(getattr(view, prop_name, ())) != tuple(color):
setattr(view, prop_name, color)
changed = True
except Exception:
pass
draw_style = _draw_style_value(wire_style)
if hasattr(view, "DrawStyle"):
try:
if str(getattr(view, "DrawStyle", "") or "") != draw_style:
view.DrawStyle = draw_style
changed = True
except Exception:
pass
if hasattr(view, "DisplayMode"):
try:
if str(getattr(view, "DisplayMode", "") or "") != "Wireframe":
view.DisplayMode = "Wireframe"
changed = True
except Exception:
pass
width = _line_width_value(wire_style)
if width is not None and hasattr(view, "LineWidth"):
try:
if float(getattr(view, "LineWidth", 0.0) or 0.0) != width:
view.LineWidth = width
changed = True
except Exception:
pass
_ensure_bool_property(
obj,
"QetWireStyleApplied",
bool(isinstance(wire_style, dict) and wire_style),
"Whether the QET wire style has been applied to the visible 3D wire",
)
_ensure_string_property(
obj,
"QetAppliedWireLineColor",
_style_text(wire_style, "line_color"),
"Applied QET wire line color text",
)
_ensure_string_property(
obj,
"QetAppliedWireDrawStyle",
draw_style,
"Applied QET wire draw style",
)
if color is not None:
_ensure_string_property(
obj,
"QetAppliedWireLineColorRgb",
",".join(str(value) for value in color),
"Applied QET wire RGB color",
)
if width is not None:
_ensure_string_property(
obj,
"QetAppliedWireLineWidth",
str(width),
"Applied QET wire line width",
)
return changed
def _set_task_extra_properties(task, entry):
_ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"])
_ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"])
_set_wire_style_properties(task, entry["wire_style_id"], entry["wire_style"])
_ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"])
_ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"])
_ensure_string_property(task, "QetStartDeviceLabel", entry["start_device_label"])
_ensure_string_property(task, "QetEndDeviceLabel", entry["end_device_label"])
_ensure_string_property(task, "QetEndpointLabel", entry["endpoint_label"])
_ensure_string_property(
task,
"QetConductorUuidsJson",
json.dumps(entry["conductor_uuids"], ensure_ascii=False),
)
def _upsert_wire_task(doc, task_group, project_uuid, entry):
task = _find_task_by_wire_uuid(task_group, entry["wire_uuid"])
created = task is None
previous_status = ""
if task is None:
task = doc.addObject(
"App::DocumentObjectGroup",
_unique_object_name(doc, "QETWireTask_{0}".format(entry["wire_uuid"])),
)
task_group.addObject(task)
else:
previous_status = (getattr(task, "RouteStatus", "") or "").strip()
WiringObjects.set_wire_task_semantics(
task,
project_uuid,
entry["wire_uuid"],
entry["wire_label"],
entry["start_terminal_uuid"],
entry["end_terminal_uuid"],
entry["start_instance_id"],
entry["end_instance_id"],
net_uuid=entry["net_uuid"],
group_uuid=entry["group_uuid"],
wire_mark=entry["wire_mark"],
wire_mark_is_manual=entry["wire_mark_is_manual"],
)
_set_task_extra_properties(task, entry)
if previous_status and previous_status != "Task":
TerminalObjects.ensure_string_property(
task,
"RouteStatus",
"QET Wiring",
"Wire task route status",
previous_status,
)
task.Label = _task_label(entry)
return task, created
def import_wire_tasks_from_payload(payload, doc=None):
if doc is None:
doc = getattr(App, "ActiveDocument", None)
if doc is None:
raise WiringImportError("No active FreeCAD document is available.")
if not isinstance(payload, dict):
raise WiringImportError("Exchange payload must be an object.")
project_uuid = _string_value(payload, "project_uuid")
if not project_uuid:
raise WiringImportError("Field 'project_uuid' is required for wire task import.")
wires = payload.get("wires", [])
if wires is None:
wires = []
if not isinstance(wires, list):
raise WiringImportError("Field 'wires' must be a list.")
task_group = WiringObjects.ensure_task_group(doc, project_uuid)
report = {
"project_uuid": project_uuid,
"total_wires": len(wires),
"imported_tasks": 0,
"updated_tasks": 0,
"removed_stale_tasks": 0,
"skipped_invalid": 0,
"warnings": [],
}
device_labels = _device_display_map(payload)
endpoint_instances = _endpoint_instance_map(payload)
for index, item in enumerate(wires):
try:
entry = _normalize_wire_entry(
item,
index,
device_labels=device_labels,
endpoint_instances=endpoint_instances,
)
except WiringImportError as exc:
report["skipped_invalid"] += 1
report["warnings"].append(str(exc))
continue
_task, created = _upsert_wire_task(doc, task_group, project_uuid, entry)
if created:
report["imported_tasks"] += 1
else:
report["updated_tasks"] += 1
return report
def apply_wire_styles_from_payload(payload, doc=None):
if doc is None:
doc = getattr(App, "ActiveDocument", None)
if doc is None:
raise WiringImportError("No active FreeCAD document is available.")
if not isinstance(payload, dict):
raise WiringImportError("Exchange payload must be an object.")
style_by_wire = {}
for item in payload.get("wires", []) or []:
if not isinstance(item, dict):
continue
wire_uuid = (
_string_value(item, "wire_id")
or _string_value(item, "wire_uuid")
or _string_value(item, "id")
)
if not wire_uuid:
continue
style_by_wire[wire_uuid] = {
"wire_style_id": _int_text_value(item, "wire_style_id"),
"wire_style": _wire_style_payload(item),
}
report = {
"total_styles": len(style_by_wire),
"routed_wires_seen": 0,
"routed_wires_matched": 0,
"updated_properties": 0,
"updated_view": 0,
"updated_samples": [],
"warnings": [],
}
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
report["routed_wires_seen"] += 1
wire_uuid = (getattr(obj, "QetWireUuid", "") or "").strip()
if not wire_uuid or wire_uuid not in style_by_wire:
continue
report["routed_wires_matched"] += 1
entry = style_by_wire[wire_uuid]
prop_changed = _set_wire_style_properties(
obj,
entry["wire_style_id"],
entry["wire_style"],
)
view_changed = _apply_wire_style_view(obj, entry["wire_style"])
if prop_changed:
report["updated_properties"] += 1
if view_changed:
report["updated_view"] += 1
if len(report["updated_samples"]) < 8:
report["updated_samples"].append(
{
"wire_uuid": wire_uuid,
"line_color": _style_text(entry["wire_style"], "line_color"),
"line_width": _style_text(entry["wire_style"], "line_width"),
"view_changed": bool(view_changed),
}
)
return report