You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

576 lines
20 KiB
Python

# FreeCADExchange terminal import helpers.
from collections import OrderedDict
import FreeCAD as App
try:
import FreeCADGui as Gui
except ImportError:
Gui = None
import DeviceImport
import TerminalObjects as TerminalObjects
import TemplateSemantics
class TerminalImportError(RuntimeError):
pass
def _append_debug_log(message):
try:
DeviceImport._append_debug_log(message)
except Exception:
pass
def _normalize_terminal_entry(item, index):
if not isinstance(item, dict):
raise TerminalImportError(
"Terminal entry #{0} must be an object.".format(index)
)
terminal_uuid = (item.get("terminal_uuid") or "").strip()
if not terminal_uuid:
raise TerminalImportError(
"Terminal entry #{0} is missing terminal_uuid.".format(index)
)
instance_id = (item.get("instance_id") or "").strip()
element_uuid = (item.get("element_uuid") or "").strip()
terminal_display = (item.get("terminal_display") or "").strip()
slot_name_hint = (
item.get("slot_name_hint")
or item.get("terminal_display")
or item.get("terminal_label")
or item.get("slot_name")
or item.get("display_tag")
or ""
).strip()
return {
"terminal_uuid": terminal_uuid,
"instance_id": instance_id,
"element_uuid": element_uuid,
"terminal_display": terminal_display,
"slot_name_hint": slot_name_hint,
}
def _payload_device_lookup(payload):
by_element_uuid = set()
by_instance_id = set()
for item in payload.get("devices", []) or []:
if not isinstance(item, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
instance_id = (item.get("instance_id") or "").strip()
if element_uuid:
by_element_uuid.add(element_uuid)
if instance_id:
by_instance_id.add(instance_id)
return {
"element_uuids": by_element_uuid,
"instance_ids": by_instance_id,
}
def _payload_device_instance_by_element(payload):
result = {}
for item in payload.get("devices", []) or []:
if not isinstance(item, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
instance_id = (item.get("instance_id") or "").strip()
if element_uuid and instance_id and element_uuid not in result:
result[element_uuid] = instance_id
return result
def _wire_endpoint_terminal_entries(payload, existing_keys):
wires = payload.get("wires", []) or []
if not isinstance(wires, list):
return []
instance_by_element = _payload_device_instance_by_element(payload)
seen = set(existing_keys or set())
entries = []
for wire in wires:
if not isinstance(wire, dict):
continue
for side in ("start", "end"):
terminal_uuid = (wire.get("{0}_terminal_uuid".format(side)) or "").strip()
element_uuid = (wire.get("{0}_element_uuid".format(side)) or "").strip()
instance_id = (wire.get("{0}_instance_id".format(side)) or "").strip()
if not instance_id and element_uuid:
instance_id = instance_by_element.get(element_uuid, "")
if not terminal_uuid or not (element_uuid or instance_id):
continue
key = (element_uuid, terminal_uuid)
if key in seen:
continue
seen.add(key)
terminal_display = (
wire.get("{0}_terminal_display".format(side))
or wire.get("{0}_terminal_label".format(side))
or ""
)
entries.append(
{
"terminal_uuid": terminal_uuid,
"element_uuid": element_uuid,
"instance_id": instance_id,
"terminal_display": terminal_display,
"slot_name_hint": terminal_display,
}
)
return entries
def _terminal_belongs_to_payload_devices(entry, device_lookup):
instance_id = entry["instance_id"]
element_uuid = entry["element_uuid"]
if instance_id and instance_id in device_lookup["instance_ids"]:
return True
if element_uuid and element_uuid in device_lookup["element_uuids"]:
return True
return False
def _ensure_visible(obj):
try:
if getattr(obj, "ViewObject", None) is not None:
obj.ViewObject.Visibility = True
except Exception:
pass
def _set_terminal_geometry_source(obj, source):
source_text = (source or "").strip()
if source_text == "fallback":
source_text = "generated_bbox_fallback"
if not source_text:
source_text = "template"
TerminalObjects.ensure_string_property(
obj,
"QetTerminalGeometrySource",
"QET Exchange",
"How this engineering terminal geometry was resolved",
source_text,
)
def _hide_object(obj):
try:
if getattr(obj, "ViewObject", None) is not None:
obj.ViewObject.Visibility = False
except Exception:
pass
def _terminal_existing_index(container):
index = OrderedDict()
for obj in TerminalObjects.collect_terminal_objects(container):
terminal_uuid = getattr(obj, "QetTerminalUuid", "").strip()
if terminal_uuid and terminal_uuid not in index:
index[terminal_uuid] = obj
return index
def _terminal_existing_local_by_slot(container):
index = {}
for obj in TerminalObjects.collect_terminal_objects(container):
terminal_uuid = getattr(obj, "QetTerminalUuid", "").strip()
if not TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
slot_name = _normalize_slot_name(getattr(obj, "QetTemplateSlotName", ""))
if slot_name and slot_name not in index:
index[slot_name] = obj
return index
def _device_key(device_group):
return getattr(device_group, "QetInstanceId", "").strip() or getattr(
device_group, "QetElementUuid", ""
).strip()
def _locate_device_group(doc, entry):
instance_id = entry["instance_id"]
element_uuid = entry["element_uuid"]
device_group = None
if element_uuid:
device_group = DeviceImport._find_device_group(doc, element_uuid)
if device_group is None and instance_id:
device_group = TerminalObjects.find_device_group_by_instance_id(doc, instance_id)
return device_group
def _terminal_container_for_device(doc, device_group, project_uuid):
device_instance_id = getattr(device_group, "QetInstanceId", "").strip()
return TerminalObjects.ensure_terminal_group(
doc,
device_group,
project_uuid=project_uuid,
instance_id=device_instance_id,
)
def _is_device_group(obj):
try:
return (
obj is not None
and getattr(obj, "Name", "").startswith(DeviceImport.DEVICE_GROUP_PREFIX)
and "QetElementUuid" in getattr(obj, "PropertiesList", [])
)
except Exception:
return False
def _terminal_slot_label(slot, terminal_uuid):
label = (slot.get("label") or "").strip()
if label:
return label
return terminal_uuid
def _terminal_entry_label(entry, slot, terminal_uuid):
entry_label = (entry.get("terminal_display") or "").strip()
if entry_label:
return entry_label
return _terminal_slot_label(slot, terminal_uuid)
def _normalize_slot_name(value):
return (value or "").strip().lower()
def _slot_lookup_key(slot):
return _normalize_slot_name(slot.get("name", "")), _normalize_slot_name(slot.get("label", ""))
def _build_slot_lookup(slots):
lookup = {}
for slot in slots or []:
for key in _slot_lookup_key(slot):
if key and key not in lookup:
lookup[key] = slot
return lookup
def _resolve_entry_slot(entry, slots, fallback_slots, used_slot_names, index):
hint = _normalize_slot_name(entry.get("slot_name_hint", ""))
if hint:
slot = _build_slot_lookup(slots).get(hint)
if slot is not None:
slot_name = _normalize_slot_name(slot.get("name", ""))
if not slot_name or slot_name not in used_slot_names:
return slot
if index < len(fallback_slots):
slot = fallback_slots[index]
slot_name = _normalize_slot_name(slot.get("name", ""))
if not slot_name or slot_name not in used_slot_names:
return slot
for slot in slots:
slot_name = _normalize_slot_name(slot.get("name", ""))
if not slot_name or slot_name not in used_slot_names:
return slot
for slot in fallback_slots:
slot_name = _normalize_slot_name(slot.get("name", ""))
if not slot_name or slot_name not in used_slot_names:
return slot
return None
def _slot_base(slot):
base = slot.get("base")
if isinstance(base, App.Vector):
return base
return App.Vector(0, 0, 0)
def _slot_placement(slot):
base = _slot_base(slot)
rotation = App.Rotation()
rotation_value = slot.get("rotation")
if isinstance(rotation_value, dict):
axis = rotation_value.get("axis")
angle = rotation_value.get("angle")
if isinstance(axis, App.Vector) and angle is not None:
try:
rotation = App.Rotation(axis, float(angle))
except Exception:
rotation = App.Rotation()
return App.Placement(base, rotation)
def _create_terminal_object(doc, terminal_uuid, entry, slot, terminal_group, project_uuid, element_uuid, instance_id):
terminal_label = _terminal_entry_label(entry, slot, terminal_uuid)
name_hint = "QETTerminal_{0}".format(TerminalObjects.safe_token(terminal_uuid))
terminal_obj = TerminalObjects.create_lcs_object(
doc,
name_hint,
placement=_slot_placement(slot),
label=terminal_label,
)
terminal_group.addObject(terminal_obj)
TerminalObjects.set_terminal_semantics(
terminal_obj,
project_uuid,
element_uuid,
terminal_uuid,
instance_id,
label=terminal_label,
slot_name=slot.get("name", ""),
)
_set_terminal_geometry_source(terminal_obj, slot.get("source", "template"))
_ensure_visible(terminal_obj)
return terminal_obj
def import_terminals_from_payload(payload, scene_path=""):
_append_debug_log("TerminalImport.import_terminals_from_payload entered")
if not isinstance(payload, dict):
raise TerminalImportError("Exchange payload must be an object.")
project_uuid = (payload.get("project_uuid") or "").strip()
if not project_uuid:
raise TerminalImportError("Field 'project_uuid' is required for terminal import.")
doc = DeviceImport._ensure_document(scene_path)
root_group = DeviceImport._ensure_root_group(doc, project_uuid)
_ = root_group
terminal_entries = payload.get("terminals", [])
if not isinstance(terminal_entries, list):
raise TerminalImportError("Field 'terminals' must be a list.")
terminal_entries = list(terminal_entries)
terminal_entry_keys = set()
for item in terminal_entries:
if not isinstance(item, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
terminal_uuid = (item.get("terminal_uuid") or "").strip()
if element_uuid and terminal_uuid:
terminal_entry_keys.add((element_uuid, terminal_uuid))
synthesized_entries = _wire_endpoint_terminal_entries(payload, terminal_entry_keys)
terminal_entries.extend(synthesized_entries)
device_lookup = _payload_device_lookup(payload)
report = {
"document_name": doc.Name,
"scene_path": scene_path or "",
"project_uuid": project_uuid,
"total_terminals": 0,
"imported_terminals": 0,
"updated_terminals": 0,
"removed_terminals": 0,
"reused_template_hints": 0,
"matched_by_slot_hint": 0,
"generated_fallback_slots": 0,
"synthesized_wire_endpoint_terminals": len(synthesized_entries),
"skipped_missing_slot": 0,
"skipped_missing_device": 0,
"skipped_invalid_entry": 0,
"skipped_unmatched_parent": 0,
"warnings": [],
}
grouped = OrderedDict()
for index, item in enumerate(terminal_entries):
report["total_terminals"] += 1
try:
entry = _normalize_terminal_entry(item, index)
except TerminalImportError as exc:
report["skipped_invalid_entry"] += 1
report["warnings"].append(str(exc))
continue
if not _terminal_belongs_to_payload_devices(entry, device_lookup):
report["skipped_unmatched_parent"] += 1
continue
device_group = _locate_device_group(doc, entry)
if device_group is None:
report["skipped_missing_device"] += 1
report["warnings"].append(
"Terminal {0} could not find its parent device.".format(
entry["terminal_uuid"]
)
)
continue
key = device_group.Name
if key not in grouped:
grouped[key] = {"device_group": device_group, "entries": []}
grouped[key]["entries"].append(entry)
device_candidates = []
if root_group is not None:
device_candidates.extend(list(getattr(root_group, "Group", []) or []))
if not device_candidates:
device_candidates.extend(doc.Objects)
for device_group in device_candidates:
if not _is_device_group(device_group):
continue
item = grouped.get(device_group.Name, {"entries": []})
entries = item["entries"]
device_element_uuid = getattr(device_group, "QetElementUuid", "").strip()
device_instance_id = getattr(device_group, "QetInstanceId", "").strip()
resolved_model_path = getattr(device_group, "QetResolvedModelPath", "").strip()
terminal_group = _terminal_container_for_device(doc, device_group, project_uuid)
existing_by_uuid = _terminal_existing_index(terminal_group)
existing_local_by_slot = _terminal_existing_local_by_slot(terminal_group)
used_uuids = set()
used_objects = set()
used_slot_names = set()
template_slots = TemplateSemantics.collect_terminal_hints(device_group)
fallback_slots = TemplateSemantics.resolve_terminal_slots(
device_group,
resolved_model_path,
len(entries),
)
if len(fallback_slots) < len(entries):
generated_slots = TemplateSemantics.build_fallback_terminal_slots(
device_group,
len(entries),
)
if generated_slots:
fallback_slots = generated_slots
report["generated_fallback_slots"] += len(generated_slots)
slot_lookup = _build_slot_lookup(template_slots)
for index, entry in enumerate(entries):
terminal_uuid = entry["terminal_uuid"]
payload_instance_id = entry["instance_id"]
if payload_instance_id and payload_instance_id != device_instance_id:
report["warnings"].append(
"Terminal {0} references instance_id {1} but device {2} uses {3}. The device value was kept."
.format(terminal_uuid, payload_instance_id, device_element_uuid, device_instance_id)
)
slot = None
slot_hint = _normalize_slot_name(entry.get("slot_name_hint", ""))
if slot_hint:
hinted_slot = slot_lookup.get(slot_hint)
if hinted_slot is not None:
hinted_name = _normalize_slot_name(hinted_slot.get("name", ""))
if not hinted_name or hinted_name not in used_slot_names:
slot = hinted_slot
report["matched_by_slot_hint"] += 1
if slot is None:
slot = _resolve_entry_slot(entry, template_slots, fallback_slots, used_slot_names, index)
if slot is None:
report["skipped_missing_slot"] += 1
report["warnings"].append(
"Terminal {0} was skipped because device {1} has no matching FCStd template terminal slot.".format(
terminal_uuid,
device_element_uuid or device_instance_id or device_group.Name,
)
)
continue
slot_name = _normalize_slot_name(slot.get("name", ""))
if slot_name:
used_slot_names.add(slot_name)
terminal_obj = existing_by_uuid.get(terminal_uuid)
if terminal_obj is None:
terminal_obj = existing_local_by_slot.get(slot_name)
if terminal_obj is not None:
TerminalObjects.set_terminal_semantics(
terminal_obj,
project_uuid,
device_element_uuid,
terminal_uuid,
device_instance_id,
label=_terminal_entry_label(entry, slot, terminal_uuid),
slot_name=slot.get("name", ""),
)
_set_terminal_geometry_source(terminal_obj, slot.get("source", "template"))
try:
terminal_obj.Placement = _slot_placement(slot)
except Exception:
pass
_ensure_visible(terminal_obj)
report["updated_terminals"] += 1
else:
terminal_obj = _create_terminal_object(
doc,
terminal_uuid,
entry,
slot,
terminal_group,
project_uuid,
device_element_uuid,
device_instance_id,
)
report["imported_terminals"] += 1
else:
TerminalObjects.set_terminal_semantics(
terminal_obj,
project_uuid,
device_element_uuid,
terminal_uuid,
device_instance_id,
label=_terminal_entry_label(entry, slot, terminal_uuid),
slot_name=slot.get("name", ""),
)
_set_terminal_geometry_source(terminal_obj, slot.get("source", "template"))
try:
terminal_obj.Placement = _slot_placement(slot)
except Exception:
pass
_ensure_visible(terminal_obj)
report["updated_terminals"] += 1
if terminal_obj not in getattr(terminal_group, "Group", []):
terminal_group.addObject(terminal_obj)
used_uuids.add(terminal_uuid)
used_objects.add(terminal_obj)
source_obj = slot.get("source_object")
if source_obj is not None:
_hide_object(source_obj)
report["reused_template_hints"] += 1
doc.recompute()
if Gui is not None:
try:
Gui.SendMsgToActiveView("ViewFit")
except Exception:
pass
_append_debug_log(
"TerminalImport finished: imported={0}, updated={1}, removed={2}, skipped_unmatched_parent={3}, skipped_missing_slot={4}".format(
report["imported_terminals"],
report["updated_terminals"],
report["removed_terminals"],
report["skipped_unmatched_parent"],
report["skipped_missing_slot"],
)
)
return report