You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
11 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# FreeCADExchange wire task import helpers.
import json
import FreeCAD as App
import TerminalObjects
import WiringObjects
class WiringImportError(RuntimeError):
pass
def _string_value(item, field_name):
value = item.get(field_name, "")
if value is None:
return ""
if not isinstance(value, str):
return str(value).strip()
return value.strip()
def _bool_value(item, field_name):
return bool(item.get(field_name, False))
def _int_text_value(item, field_name):
value = item.get(field_name, "")
if value is None:
return ""
try:
return str(int(value)).strip()
except Exception:
return str(value).strip()
def _conductor_uuids(item):
values = item.get("conductor_uuids", [])
if not isinstance(values, list):
return []
return [str(value).strip() for value in values if str(value).strip()]
def _device_display_map(payload):
labels = {}
for item in payload.get("devices", []) or []:
if not isinstance(item, dict):
continue
display_tag = _string_value(item, "display_tag")
if not display_tag:
continue
# 新交换协议中,一个 3D 设备实例可能合并多个 2D 符号;
# 导线端点仍用 element_uuid所以这里要把组内所有 2D 符号都映射到同一设备标注。
candidate_element_uuids = []
for terminal in item.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = _string_value(terminal, "element_uuid")
if element_uuid:
candidate_element_uuids.append(element_uuid)
for element_uuid in candidate_element_uuids:
labels[element_uuid] = display_tag
return labels
def _endpoint_instance_map(payload):
by_terminal = {}
by_element = {}
for device in payload.get("devices", []) or []:
if not isinstance(device, dict):
continue
device_instance_id = _string_value(device, "device_instance_id")
if not device_instance_id:
continue
for terminal in device.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = _string_value(terminal, "element_uuid")
terminal_uuid = _string_value(terminal, "terminal_uuid")
if element_uuid:
by_element.setdefault(element_uuid, device_instance_id)
if element_uuid and terminal_uuid:
by_terminal[(element_uuid, terminal_uuid)] = device_instance_id
return by_terminal, by_element
def _endpoint_text(device_label, terminal_display, terminal_uuid):
terminal = terminal_display or terminal_uuid
if device_label and terminal:
return "{0}:{1}".format(device_label, terminal)
return device_label or terminal or "未命名端子"
def _normalize_wire_entry(item, index, device_labels=None, endpoint_instances=None):
if not isinstance(item, dict):
raise WiringImportError("Wire entry #{0} must be an object.".format(index))
device_labels = device_labels or {}
endpoint_by_terminal, endpoint_by_element = endpoint_instances or ({}, {})
wire_uuid = (
_string_value(item, "wire_id")
or _string_value(item, "wire_uuid")
or _string_value(item, "id")
)
start_terminal_uuid = _string_value(item, "start_terminal_uuid")
end_terminal_uuid = _string_value(item, "end_terminal_uuid")
if not wire_uuid:
raise WiringImportError("Wire entry #{0} is missing wire_id.".format(index))
if not start_terminal_uuid or not end_terminal_uuid:
raise WiringImportError(
"Wire {0} is missing start_terminal_uuid or end_terminal_uuid.".format(
wire_uuid
)
)
wire_mark = _string_value(item, "wire_mark")
wire_label = _string_value(item, "wire_label") or wire_mark or wire_uuid
start_element_uuid = _string_value(item, "start_element_uuid")
end_element_uuid = _string_value(item, "end_element_uuid")
start_terminal_display = _string_value(item, "start_terminal_display")
end_terminal_display = _string_value(item, "end_terminal_display")
start_instance_id = endpoint_by_terminal.get(
(start_element_uuid, start_terminal_uuid),
endpoint_by_element.get(start_element_uuid, ""),
)
end_instance_id = endpoint_by_terminal.get(
(end_element_uuid, end_terminal_uuid),
endpoint_by_element.get(end_element_uuid, ""),
)
start_device_label = _string_value(item, "start_device_label") or device_labels.get(
start_element_uuid, ""
)
end_device_label = _string_value(item, "end_device_label") or device_labels.get(
end_element_uuid, ""
)
endpoint_label = "{0} -> {1}".format(
_endpoint_text(start_device_label, start_terminal_display, start_terminal_uuid),
_endpoint_text(end_device_label, end_terminal_display, end_terminal_uuid),
)
return {
"wire_uuid": wire_uuid,
"wire_label": wire_label,
"net_uuid": _string_value(item, "net_uuid"),
"group_uuid": _string_value(item, "group_uuid"),
"wire_mark": wire_mark,
"wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"),
"wire_style_id": _int_text_value(item, "wire_style_id"),
"start_element_uuid": start_element_uuid,
"start_terminal_uuid": start_terminal_uuid,
"start_instance_id": start_instance_id,
"start_terminal_display": start_terminal_display,
"start_device_label": start_device_label,
"end_element_uuid": end_element_uuid,
"end_terminal_uuid": end_terminal_uuid,
"end_instance_id": end_instance_id,
"end_terminal_display": end_terminal_display,
"end_device_label": end_device_label,
"endpoint_label": endpoint_label,
"conductor_uuids": _conductor_uuids(item),
}
def _unique_object_name(doc, base_name):
name = TerminalObjects.safe_token(base_name)
if doc.getObject(name) is None:
return name
suffix = 1
while doc.getObject("{0}_{1}".format(name, suffix)) is not None:
suffix += 1
return "{0}_{1}".format(name, suffix)
def _task_label(entry):
mark = entry["wire_mark"] or entry["wire_label"] or entry["wire_uuid"]
return "{0} {1}".format(mark, entry["endpoint_label"])
def _find_task_by_wire_uuid(task_group, wire_uuid):
target = (wire_uuid or "").strip()
if not target:
return None
for candidate in list(getattr(task_group, "Group", []) or []):
if getattr(candidate, "QetWireUuid", "").strip() == target:
return candidate
return None
def _ensure_string_property(obj, prop_name, value, description="QET wire task property"):
TerminalObjects.ensure_string_property(
obj,
prop_name,
"QET Wiring",
description,
value,
)
def _set_task_extra_properties(task, entry):
_ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"])
_ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"])
_ensure_string_property(task, "QetWireStyleId", entry["wire_style_id"])
_ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"])
_ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"])
_ensure_string_property(task, "QetStartDeviceLabel", entry["start_device_label"])
_ensure_string_property(task, "QetEndDeviceLabel", entry["end_device_label"])
_ensure_string_property(task, "QetEndpointLabel", entry["endpoint_label"])
_ensure_string_property(
task,
"QetConductorUuidsJson",
json.dumps(entry["conductor_uuids"], ensure_ascii=False),
)
def _upsert_wire_task(doc, task_group, project_uuid, entry):
task = _find_task_by_wire_uuid(task_group, entry["wire_uuid"])
created = task is None
previous_status = ""
if task is None:
task = doc.addObject(
"App::DocumentObjectGroup",
_unique_object_name(doc, "QETWireTask_{0}".format(entry["wire_uuid"])),
)
task_group.addObject(task)
else:
previous_status = (getattr(task, "RouteStatus", "") or "").strip()
WiringObjects.set_wire_task_semantics(
task,
project_uuid,
entry["wire_uuid"],
entry["wire_label"],
entry["start_terminal_uuid"],
entry["end_terminal_uuid"],
entry["start_instance_id"],
entry["end_instance_id"],
net_uuid=entry["net_uuid"],
group_uuid=entry["group_uuid"],
wire_mark=entry["wire_mark"],
wire_mark_is_manual=entry["wire_mark_is_manual"],
)
_set_task_extra_properties(task, entry)
if previous_status and previous_status != "Task":
TerminalObjects.ensure_string_property(
task,
"RouteStatus",
"QET Wiring",
"Wire task route status",
previous_status,
)
task.Label = _task_label(entry)
return task, created
def import_wire_tasks_from_payload(payload, doc=None):
if doc is None:
doc = getattr(App, "ActiveDocument", None)
if doc is None:
raise WiringImportError("No active FreeCAD document is available.")
if not isinstance(payload, dict):
raise WiringImportError("Exchange payload must be an object.")
project_uuid = _string_value(payload, "project_uuid")
if not project_uuid:
raise WiringImportError("Field 'project_uuid' is required for wire task import.")
wires = payload.get("wires", [])
if wires is None:
wires = []
if not isinstance(wires, list):
raise WiringImportError("Field 'wires' must be a list.")
task_group = WiringObjects.ensure_task_group(doc, project_uuid)
report = {
"project_uuid": project_uuid,
"total_wires": len(wires),
"imported_tasks": 0,
"updated_tasks": 0,
"removed_stale_tasks": 0,
"skipped_invalid": 0,
"warnings": [],
}
device_labels = _device_display_map(payload)
endpoint_instances = _endpoint_instance_map(payload)
for index, item in enumerate(wires):
try:
entry = _normalize_wire_entry(
item,
index,
device_labels=device_labels,
endpoint_instances=endpoint_instances,
)
except WiringImportError as exc:
report["skipped_invalid"] += 1
report["warnings"].append(str(exc))
continue
_task, created = _upsert_wire_task(doc, task_group, project_uuid, entry)
if created:
report["imported_tasks"] += 1
else:
report["updated_tasks"] += 1
return report