|
|
# FreeCADExchange stale object marking helpers.
|
|
|
|
|
|
import FreeCAD as App
|
|
|
|
|
|
import TerminalObjects
|
|
|
|
|
|
try:
|
|
|
import WiringObjects
|
|
|
except Exception:
|
|
|
WiringObjects = None
|
|
|
|
|
|
|
|
|
SYNC_STATUS_ACTIVE = "Active"
|
|
|
SYNC_STATUS_STALE = "Stale"
|
|
|
SYNC_GROUP = "QET Sync"
|
|
|
CABINET_GROUP_PREFIX = "QETCabinet_"
|
|
|
|
|
|
|
|
|
def _string_value(item, field_name):
|
|
|
if not isinstance(item, dict):
|
|
|
return ""
|
|
|
value = item.get(field_name, "")
|
|
|
if value is None:
|
|
|
return ""
|
|
|
return str(value).strip()
|
|
|
|
|
|
|
|
|
def _set_status(obj, status, reason=""):
|
|
|
TerminalObjects.ensure_string_property(
|
|
|
obj,
|
|
|
"QetSyncStatus",
|
|
|
SYNC_GROUP,
|
|
|
"Latest 2D/3D synchronization status",
|
|
|
status,
|
|
|
)
|
|
|
TerminalObjects.ensure_string_property(
|
|
|
obj,
|
|
|
"QetSyncReason",
|
|
|
SYNC_GROUP,
|
|
|
"Why the object has this synchronization status",
|
|
|
reason,
|
|
|
)
|
|
|
|
|
|
|
|
|
def _device_report_label(device_group):
|
|
|
display_tag = (getattr(device_group, "QetDisplayTag", "") or "").strip()
|
|
|
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
|
|
|
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
|
|
|
return display_tag or instance_id or element_uuid or getattr(device_group, "Name", "")
|
|
|
|
|
|
|
|
|
def _payload_identity_sets(payload):
|
|
|
cabinet_instance_ids = set()
|
|
|
device_instance_ids = set()
|
|
|
terminal_uuids = set()
|
|
|
wire_uuids = set()
|
|
|
|
|
|
cabinet = payload.get("cabinet")
|
|
|
if isinstance(cabinet, dict):
|
|
|
for field_name in ("cabinet_instance_id", "cabinet_uuid", "location_id"):
|
|
|
value = _string_value(cabinet, field_name)
|
|
|
if value:
|
|
|
cabinet_instance_ids.add(value)
|
|
|
break
|
|
|
|
|
|
for item in payload.get("devices", []) or []:
|
|
|
instance_id = _string_value(item, "device_instance_id")
|
|
|
if instance_id:
|
|
|
device_instance_ids.add(instance_id)
|
|
|
for terminal in item.get("terminals", []) or []:
|
|
|
terminal_uuid = _string_value(terminal, "terminal_uuid")
|
|
|
if terminal_uuid:
|
|
|
terminal_uuids.add(terminal_uuid)
|
|
|
|
|
|
for item in payload.get("wires", []) or []:
|
|
|
wire_uuid = (
|
|
|
_string_value(item, "wire_id")
|
|
|
or _string_value(item, "wire_uuid")
|
|
|
or _string_value(item, "id")
|
|
|
)
|
|
|
if wire_uuid:
|
|
|
wire_uuids.add(wire_uuid)
|
|
|
|
|
|
return {
|
|
|
"cabinet_instance_ids": cabinet_instance_ids,
|
|
|
"device_instance_ids": device_instance_ids,
|
|
|
"terminal_uuids": terminal_uuids,
|
|
|
"wire_uuids": wire_uuids,
|
|
|
}
|
|
|
|
|
|
|
|
|
def _iter_cabinet_groups(doc):
|
|
|
root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME)
|
|
|
candidates = []
|
|
|
if root is not None:
|
|
|
candidates.extend(list(getattr(root, "Group", []) or []))
|
|
|
if not candidates:
|
|
|
candidates.extend(list(getattr(doc, "Objects", []) or []))
|
|
|
|
|
|
seen = set()
|
|
|
for obj in candidates:
|
|
|
name = getattr(obj, "Name", "")
|
|
|
if not name.startswith(CABINET_GROUP_PREFIX) and name != "QETCabinetModel":
|
|
|
continue
|
|
|
if "QetCabinetInstanceId" not in getattr(obj, "PropertiesList", []):
|
|
|
continue
|
|
|
if id(obj) in seen:
|
|
|
continue
|
|
|
seen.add(id(obj))
|
|
|
yield obj
|
|
|
|
|
|
|
|
|
def _iter_device_groups(doc):
|
|
|
root = doc.getObject(TerminalObjects.ROOT_GROUP_NAME)
|
|
|
candidates = []
|
|
|
if root is not None:
|
|
|
candidates.extend(list(getattr(root, "Group", []) or []))
|
|
|
if not candidates:
|
|
|
candidates.extend(list(getattr(doc, "Objects", []) or []))
|
|
|
|
|
|
seen = set()
|
|
|
for obj in candidates:
|
|
|
name = getattr(obj, "Name", "")
|
|
|
if not name.startswith(TerminalObjects.DEVICE_GROUP_PREFIX):
|
|
|
continue
|
|
|
if "QetElementUuid" not in getattr(obj, "PropertiesList", []):
|
|
|
continue
|
|
|
if id(obj) in seen:
|
|
|
continue
|
|
|
seen.add(id(obj))
|
|
|
yield obj
|
|
|
|
|
|
|
|
|
def _mark_device(device_group, identity_sets):
|
|
|
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
|
|
|
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
|
|
|
active = bool(
|
|
|
instance_id and instance_id in identity_sets["device_instance_ids"]
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
import FreeCAD as _App
|
|
|
_App.Console.PrintMessage(
|
|
|
"[FreeCADExchange] stale check device: name={0}, element_uuid={1}, instance_id={2}, active={3}\n".format(
|
|
|
getattr(device_group, "Name", ""), element_uuid, instance_id, active
|
|
|
)
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
if active:
|
|
|
_set_status(device_group, SYNC_STATUS_ACTIVE)
|
|
|
return "active"
|
|
|
|
|
|
_set_status(
|
|
|
device_group,
|
|
|
SYNC_STATUS_STALE,
|
|
|
"This 3D device is not present in the latest 2D exchange payload.",
|
|
|
)
|
|
|
return "stale"
|
|
|
|
|
|
|
|
|
def _mark_cabinet(cabinet_group, identity_sets):
|
|
|
cabinet_instance_id = (getattr(cabinet_group, "QetCabinetInstanceId", "") or "").strip()
|
|
|
active = bool(cabinet_instance_id and cabinet_instance_id in identity_sets["cabinet_instance_ids"])
|
|
|
if active:
|
|
|
_set_status(cabinet_group, SYNC_STATUS_ACTIVE)
|
|
|
return "active"
|
|
|
|
|
|
_set_status(
|
|
|
cabinet_group,
|
|
|
SYNC_STATUS_STALE,
|
|
|
"This 3D cabinet is not present in the latest 2D exchange payload.",
|
|
|
)
|
|
|
return "stale"
|
|
|
|
|
|
|
|
|
def _mark_terminals(device_group, identity_sets, authoritative_stale_terminal_uuids=None):
|
|
|
"""authoritative_stale_terminal_uuids: QET-side stale list (binding_table - diagram_elements)."""
|
|
|
if authoritative_stale_terminal_uuids is None:
|
|
|
authoritative_stale_terminal_uuids = set()
|
|
|
|
|
|
report = {"active": 0, "stale": 0}
|
|
|
terminal_group = TerminalObjects.find_child_group_by_kind(
|
|
|
device_group,
|
|
|
TerminalObjects.TERMINAL_GROUP_KIND,
|
|
|
)
|
|
|
for terminal in TerminalObjects.collect_terminal_objects(terminal_group):
|
|
|
terminal_uuid = (getattr(terminal, "QetTerminalUuid", "") or "").strip()
|
|
|
if TerminalObjects.is_local_terminal_uuid(terminal_uuid):
|
|
|
_set_status(terminal, SYNC_STATUS_ACTIVE)
|
|
|
report["active"] += 1
|
|
|
continue
|
|
|
if terminal_uuid and terminal_uuid in authoritative_stale_terminal_uuids:
|
|
|
_set_status(
|
|
|
terminal,
|
|
|
SYNC_STATUS_STALE,
|
|
|
"This 3D terminal was removed from the 2D schematic (QET authoritative).",
|
|
|
)
|
|
|
report["stale"] += 1
|
|
|
continue
|
|
|
if terminal_uuid and terminal_uuid in identity_sets["terminal_uuids"]:
|
|
|
_set_status(terminal, SYNC_STATUS_ACTIVE)
|
|
|
report["active"] += 1
|
|
|
continue
|
|
|
_set_status(
|
|
|
terminal,
|
|
|
SYNC_STATUS_STALE,
|
|
|
"This 3D terminal is not present in the latest 2D exchange payload.",
|
|
|
)
|
|
|
report["stale"] += 1
|
|
|
return report
|
|
|
|
|
|
|
|
|
def _iter_wire_task_objects(doc):
|
|
|
task_group = doc.getObject("QETWiring_01_Tasks")
|
|
|
if task_group is None:
|
|
|
return []
|
|
|
|
|
|
result = []
|
|
|
for obj in list(getattr(task_group, "Group", []) or []):
|
|
|
if (getattr(obj, "RouteType", "") or "").strip() != "Task":
|
|
|
continue
|
|
|
if "QetWireUuid" not in getattr(obj, "PropertiesList", []):
|
|
|
continue
|
|
|
result.append(obj)
|
|
|
return result
|
|
|
|
|
|
|
|
|
def _iter_routed_wire_objects(doc):
|
|
|
if WiringObjects is None:
|
|
|
return []
|
|
|
try:
|
|
|
return WiringObjects.iter_routed_wire_objects(doc)
|
|
|
except Exception:
|
|
|
return []
|
|
|
|
|
|
|
|
|
def _mark_wire_object(obj, identity_sets, stale_reason):
|
|
|
wire_uuid = (getattr(obj, "QetWireUuid", "") or "").strip()
|
|
|
if wire_uuid and wire_uuid in identity_sets["wire_uuids"]:
|
|
|
_set_status(obj, SYNC_STATUS_ACTIVE)
|
|
|
return "active"
|
|
|
|
|
|
_set_status(obj, SYNC_STATUS_STALE, stale_reason)
|
|
|
return "stale"
|
|
|
|
|
|
|
|
|
def mark_stale_objects_from_payload(payload, doc=None):
|
|
|
if doc is None:
|
|
|
doc = getattr(App, "ActiveDocument", None)
|
|
|
if doc is None:
|
|
|
raise RuntimeError("No active FreeCAD document is available.")
|
|
|
if not isinstance(payload, dict):
|
|
|
raise RuntimeError("Exchange payload must be an object.")
|
|
|
|
|
|
identity_sets = _payload_identity_sets(payload)
|
|
|
|
|
|
# QET 侧权威失效列表(stale = binding_table - diagram_elements)
|
|
|
authoritative_stale_instance_ids = set()
|
|
|
authoritative_stale_terminal_uuids = set()
|
|
|
for item in payload.get("stale_devices", []) or []:
|
|
|
instance_id = _string_value(item, "instance_id")
|
|
|
tu = _string_value(item, "terminal_uuid")
|
|
|
if instance_id:
|
|
|
authoritative_stale_instance_ids.add(instance_id)
|
|
|
if tu:
|
|
|
authoritative_stale_terminal_uuids.add(tu)
|
|
|
|
|
|
report = {
|
|
|
"active_cabinets": 0,
|
|
|
"stale_cabinets": 0,
|
|
|
"active_devices": 0,
|
|
|
"stale_devices": 0,
|
|
|
"stale_device_details": [],
|
|
|
"active_terminals": 0,
|
|
|
"stale_terminals": 0,
|
|
|
"active_wire_tasks": 0,
|
|
|
"stale_wire_tasks": 0,
|
|
|
"active_routed_wires": 0,
|
|
|
"stale_routed_wires": 0,
|
|
|
"warnings": [],
|
|
|
}
|
|
|
|
|
|
for cabinet_group in _iter_cabinet_groups(doc):
|
|
|
cabinet_status = _mark_cabinet(cabinet_group, identity_sets)
|
|
|
if cabinet_status == "active":
|
|
|
report["active_cabinets"] += 1
|
|
|
else:
|
|
|
report["stale_cabinets"] += 1
|
|
|
|
|
|
for device_group in _iter_device_groups(doc):
|
|
|
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
|
|
|
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
|
|
|
instance_is_active = bool(
|
|
|
instance_id and instance_id in identity_sets["device_instance_ids"]
|
|
|
)
|
|
|
if (
|
|
|
not instance_is_active
|
|
|
and (instance_id and instance_id in authoritative_stale_instance_ids)
|
|
|
):
|
|
|
_set_status(
|
|
|
device_group,
|
|
|
SYNC_STATUS_STALE,
|
|
|
"This 3D device was removed from the 2D schematic (QET authoritative).",
|
|
|
)
|
|
|
report["stale_devices"] += 1
|
|
|
report["stale_device_details"].append(
|
|
|
{
|
|
|
"display_tag": (getattr(device_group, "QetDisplayTag", "") or "").strip(),
|
|
|
"instance_id": instance_id,
|
|
|
"element_uuid": element_uuid,
|
|
|
"label": _device_report_label(device_group),
|
|
|
}
|
|
|
)
|
|
|
else:
|
|
|
device_status = _mark_device(device_group, identity_sets)
|
|
|
if device_status == "active":
|
|
|
report["active_devices"] += 1
|
|
|
else:
|
|
|
report["stale_devices"] += 1
|
|
|
report["stale_device_details"].append(
|
|
|
{
|
|
|
"display_tag": (getattr(device_group, "QetDisplayTag", "") or "").strip(),
|
|
|
"instance_id": instance_id,
|
|
|
"element_uuid": element_uuid,
|
|
|
"label": _device_report_label(device_group),
|
|
|
}
|
|
|
)
|
|
|
|
|
|
terminal_report = _mark_terminals(
|
|
|
device_group, identity_sets, authoritative_stale_terminal_uuids
|
|
|
)
|
|
|
report["active_terminals"] += terminal_report["active"]
|
|
|
report["stale_terminals"] += terminal_report["stale"]
|
|
|
|
|
|
for task in _iter_wire_task_objects(doc):
|
|
|
status = _mark_wire_object(
|
|
|
task,
|
|
|
identity_sets,
|
|
|
"This 3D wire task is not present in the latest 2D exchange payload.",
|
|
|
)
|
|
|
if status == "active":
|
|
|
report["active_wire_tasks"] += 1
|
|
|
else:
|
|
|
report["stale_wire_tasks"] += 1
|
|
|
|
|
|
for wire in _iter_routed_wire_objects(doc):
|
|
|
status = _mark_wire_object(
|
|
|
wire,
|
|
|
identity_sets,
|
|
|
"This routed 3D wire is not present in the latest 2D exchange payload.",
|
|
|
)
|
|
|
if status == "active":
|
|
|
report["active_routed_wires"] += 1
|
|
|
else:
|
|
|
report["stale_routed_wires"] += 1
|
|
|
|
|
|
return report
|