Compare commits

...

3 Commits

@ -53,11 +53,15 @@ STATE_WRITEBACK_REPORT = "_qet_exchange_writeback_report"
STATE_IMPORT_SCHEDULED = "_qet_exchange_import_scheduled"
STATE_TREE_FILTER = "_qet_exchange_tree_filter"
STATE_TREE_SIGNAL_CONNECTIONS = "_qet_exchange_tree_signal_connections"
STATE_SIGNAL_TIMER = "_qet_exchange_signal_timer"
STATE_SIGNAL_REVISION = "_qet_exchange_signal_revision"
STATE_JSON_PATH = "_qet_exchange_json_path"
TREE_FILTER_MARKER = "_qet_exchange_tree_filter_installed"
TREE_SIGNAL_MARKER = "_qet_exchange_tree_signal_installed"
IMPORT_READY_DELAY_MS = 1500
IMPORT_READY_RETRY_DELAY_MS = 1000
IMPORT_READY_MAX_RETRIES = 10
SIGNAL_POLL_INTERVAL_MS = 1500
class ExchangeValidationError(RuntimeError):
@ -661,6 +665,18 @@ def _normalize_conductor_uuids(item, entry_label):
return normalized
def _normalize_wire_style(item, entry_label):
value = item.get("wire_style", {})
if value is None:
return {}
if not isinstance(value, dict):
_append_debug_log(
"Field 'wire_style' in {0} is not an object; ignored.".format(entry_label)
)
return {}
return dict(value)
def _normalize_wires(payload):
wires = payload.get("wires", [])
if wires is None:
@ -692,6 +708,7 @@ def _normalize_wires(payload):
"wire_mark": _optional_string(item, "wire_mark", entry_label),
"wire_mark_is_manual": wire_mark_is_manual,
"wire_style_id": _optional_text(item, "wire_style_id"),
"wire_style": _normalize_wire_style(item, entry_label),
"start_element_uuid": _optional_string(item, "start_element_uuid", entry_label),
"start_terminal_uuid": _optional_string(item, "start_terminal_uuid", entry_label),
"end_element_uuid": _optional_string(item, "end_element_uuid", entry_label),
@ -975,6 +992,161 @@ def _import_wiring_tasks(payload):
return None
def _signal_path_for_json(json_path):
try:
return str(Path(json_path).with_name("qet_exchange_signal.json"))
except Exception:
return ""
def _load_exchange_signal(signal_path):
path = Path(signal_path)
if not path.is_file():
return None
try:
raw_text = path.read_text(encoding="utf-8")
except OSError as exc:
_append_debug_log("exchange signal read failed: {0}".format(exc))
return None
try:
signal_payload = json.loads(raw_text)
except Exception as exc:
_append_debug_log("exchange signal JSON parse failed: {0}".format(exc))
return None
if not isinstance(signal_payload, dict):
_append_debug_log("exchange signal ignored: root is not an object")
return None
return signal_payload
def _signal_revision(signal_payload):
try:
return int(signal_payload.get("revision", 0) or 0)
except Exception:
return 0
def reload_exchange_payload_from_signal(json_path, signal_payload):
if WiringImport is None:
_append_debug_log("signal reload skipped: WiringImport module unavailable")
return None
current_payload = getattr(App, STATE_PAYLOAD, None)
expected_project_uuid = ""
if isinstance(current_payload, dict):
expected_project_uuid = str(current_payload.get("project_uuid", "") or "").strip()
signal_project_uuid = str(signal_payload.get("project_uuid", "") or "").strip()
if expected_project_uuid and signal_project_uuid and signal_project_uuid != expected_project_uuid:
message = "exchange signal project_uuid mismatch: signal={0}, current={1}".format(
signal_project_uuid,
expected_project_uuid,
)
_append_debug_log(message)
App.Console.PrintError("[FreeCADExchange] {0}\n".format(message))
return None
payload = load_exchange_payload(json_path)
summary = _build_summary(payload, json_path)
setattr(App, STATE_PAYLOAD, payload)
setattr(App, STATE_SUMMARY, summary)
wiring_report = WiringImport.import_wire_tasks_from_payload(payload, App.ActiveDocument)
style_report = WiringImport.apply_wire_styles_from_payload(payload, App.ActiveDocument)
setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report)
try:
App.ActiveDocument.recompute()
except Exception as exc:
_append_debug_log("signal reload recompute failed: {0}".format(exc))
revision = _signal_revision(signal_payload)
_append_debug_log(
"exchange signal reload complete: revision={0}, tasks_updated={1}, routed_matched={2}, view_updates={3}, samples={4}".format(
revision,
wiring_report.get("updated_tasks", 0) if isinstance(wiring_report, dict) else 0,
style_report.get("routed_wires_matched", 0) if isinstance(style_report, dict) else 0,
style_report.get("updated_view", 0) if isinstance(style_report, dict) else 0,
style_report.get("updated_samples", []) if isinstance(style_report, dict) else [],
)
)
App.Console.PrintMessage(
"[FreeCADExchange] Reloaded QET wire styles: revision {0}, tasks updated {1}, routed wires matched {2}, view updates {3}\n".format(
revision,
wiring_report.get("updated_tasks", 0) if isinstance(wiring_report, dict) else 0,
style_report.get("routed_wires_matched", 0) if isinstance(style_report, dict) else 0,
style_report.get("updated_view", 0) if isinstance(style_report, dict) else 0,
)
)
return {"wiring_report": wiring_report, "style_report": style_report}
def _check_exchange_signal(json_path):
signal_path = _signal_path_for_json(json_path)
if not signal_path:
return
signal_payload = _load_exchange_signal(signal_path)
if not isinstance(signal_payload, dict):
return
event_name = str(signal_payload.get("event", "") or "").strip()
if event_name and event_name not in {"wire_style_changed"}:
_append_debug_log("exchange signal event treated as reload: {0}".format(event_name))
revision = _signal_revision(signal_payload)
if revision <= 0:
return
last_revision = int(getattr(App, STATE_SIGNAL_REVISION, 0) or 0)
if revision <= last_revision:
return
try:
reload_exchange_payload_from_signal(json_path, signal_payload)
except ExchangeValidationError as exc:
_append_debug_log("exchange signal reload validation failed: {0}".format(exc))
App.Console.PrintError("[FreeCADExchange] Signal reload failed: {0}\n".format(exc))
return
except Exception as exc:
_append_debug_log("exchange signal reload failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
App.Console.PrintError("[FreeCADExchange] Signal reload failed: {0}\n".format(exc))
return
setattr(App, STATE_SIGNAL_REVISION, revision)
def _install_exchange_signal_timer(json_path):
json_path = str(json_path or "").strip()
if not json_path:
return
existing_timer = getattr(App, STATE_SIGNAL_TIMER, None)
if existing_timer is not None:
try:
existing_timer.stop()
except Exception:
pass
signal_path = _signal_path_for_json(json_path)
initial_signal = _load_exchange_signal(signal_path) if signal_path else None
if isinstance(initial_signal, dict):
setattr(App, STATE_SIGNAL_REVISION, _signal_revision(initial_signal))
else:
setattr(App, STATE_SIGNAL_REVISION, 0)
# 中文注释:保持 QTimer 引用,避免 Python 对象被回收FreeCAD 后续收到 QET 信号后
# 只重载 wiring task / routed wire 样式,不重新创建全部设备。
timer = QtCore.QTimer()
timer.setInterval(SIGNAL_POLL_INTERVAL_MS)
timer.timeout.connect(lambda: _check_exchange_signal(json_path))
timer.start()
setattr(App, STATE_SIGNAL_TIMER, timer)
setattr(App, STATE_JSON_PATH, json_path)
_append_debug_log(
"exchange signal timer installed: path={0}, interval_ms={1}".format(
signal_path,
SIGNAL_POLL_INTERVAL_MS,
)
)
DEFAULT_SCENE_FILE_NAME = "QETScene.FCStd"
@ -1627,6 +1799,21 @@ def _run_scheduled_device_import(attempt=0):
wiring_report = _import_wiring_tasks(payload)
if wiring_report is not None:
setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report)
if WiringImport is not None:
try:
style_report = WiringImport.apply_wire_styles_from_payload(
payload,
App.ActiveDocument,
)
_append_debug_log(
"initial routed wire style apply: matched={0}, view_updates={1}".format(
style_report.get("routed_wires_matched", 0),
style_report.get("updated_view", 0),
)
)
except Exception as exc:
_append_debug_log("initial routed wire style apply failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
_log_document_state(
"scheduled device import after wiring import",
App.ActiveDocument,
@ -1747,6 +1934,7 @@ def bootstrap_if_requested():
)
setattr(App, STATE_PAYLOAD, payload)
setattr(App, STATE_SUMMARY, summary)
_install_exchange_signal_timer(json_path)
if not getattr(App, STATE_IMPORT_SCHEDULED, False):
setattr(App, STATE_IMPORT_SCHEDULED, True)
_append_debug_log(

@ -1,6 +1,7 @@
# FreeCADExchange wire task import helpers.
import json
import re
import FreeCAD as App
@ -42,6 +43,100 @@ def _conductor_uuids(item):
return [str(value).strip() for value in values if str(value).strip()]
def _wire_style_payload(item):
wire_style = item.get("wire_style", {})
if not isinstance(wire_style, dict):
return {}
return dict(wire_style)
def _canonical_wire_style_json(wire_style):
if not isinstance(wire_style, dict):
wire_style = {}
return json.dumps(wire_style, ensure_ascii=False, sort_keys=True)
def _style_text(wire_style, key):
if not isinstance(wire_style, dict):
return ""
value = wire_style.get(key, "")
if value is None:
return ""
return str(value).strip()
def _style_float_text(wire_style, key):
text = _style_text(wire_style, key)
if not text:
return ""
try:
return "{0:g}".format(float(text))
except Exception:
return text
def _parse_line_color(value):
text = str(value or "").strip()
if not text:
return None
match = re.fullmatch(r"#?([0-9a-fA-F]{6})", text)
if match:
raw = match.group(1)
return tuple(int(raw[index:index + 2], 16) / 255.0 for index in (0, 2, 4))
if "," in text:
try:
numbers = [float(part.strip()) for part in text.split(",")]
except Exception:
numbers = []
if len(numbers) == 3:
if max(numbers) > 1.0:
numbers = [number / 255.0 for number in numbers]
return tuple(max(0.0, min(1.0, number)) for number in numbers)
return None
def _line_width_value(wire_style):
try:
value = float(_style_text(wire_style, "line_width") or 0)
except Exception:
value = 0.0
if value <= 0.0:
return None
return max(0.1, min(20.0, value))
def _draw_style_value(wire_style):
text = _style_text(wire_style, "line_type").lower()
if not text:
return "Solid"
normalized = text.replace("_", "").replace("-", "").replace(" ", "")
if normalized in {"dashline", "dashed", "dash", "dashes", "虚线"}:
return "Dashed"
if normalized in {"dotline", "dotted", "dot", "dots", "点线"}:
return "Dotted"
if normalized in {"dashdotline", "dashdot", "点划线"}:
return "Dashdot"
return "Solid"
def _geometry_style_fields(wire_style):
return {
key: _style_text(wire_style, key)
for key in ("diameter_mm", "bend_radius_mm", "bend_radius_factor", "area_or_spec")
}
def _json_dict(text):
try:
value = json.loads(text or "{}")
except Exception:
return {}
return value if isinstance(value, dict) else {}
def _device_display_map(payload):
labels = {}
for item in payload.get("devices", []) or []:
@ -141,6 +236,7 @@ def _normalize_wire_entry(item, index, device_labels=None, endpoint_instances=No
_endpoint_text(start_device_label, start_terminal_display, start_terminal_uuid),
_endpoint_text(end_device_label, end_terminal_display, end_terminal_uuid),
)
wire_style = _wire_style_payload(item)
return {
"wire_uuid": wire_uuid,
"wire_label": wire_label,
@ -149,6 +245,7 @@ def _normalize_wire_entry(item, index, device_labels=None, endpoint_instances=No
"wire_mark": wire_mark,
"wire_mark_is_manual": _bool_value(item, "wire_mark_is_manual"),
"wire_style_id": _int_text_value(item, "wire_style_id"),
"wire_style": wire_style,
"start_element_uuid": start_element_uuid,
"start_terminal_uuid": start_terminal_uuid,
"start_instance_id": start_instance_id,
@ -199,10 +296,137 @@ def _ensure_string_property(obj, prop_name, value, description="QET wire task pr
)
def _ensure_bool_property(obj, prop_name, value, description="QET wire task property"):
TerminalObjects.ensure_bool_property(
obj,
prop_name,
"QET Wiring",
description,
value,
)
def _set_wire_style_properties(obj, wire_style_id, wire_style):
style = wire_style if isinstance(wire_style, dict) else {}
new_style_json = _canonical_wire_style_json(style)
old_style_json = (getattr(obj, "QetWireStyleJson", "") or "").strip()
old_style = _json_dict(old_style_json)
changed = old_style_json != new_style_json
_ensure_string_property(obj, "QetWireStyleId", str(wire_style_id or "").strip())
_ensure_string_property(
obj,
"QetWireStyleJson",
new_style_json,
"QET wire style JSON from 2D payload",
)
_ensure_string_property(obj, "QetWireStyleName", _style_text(style, "name"))
_ensure_string_property(obj, "QetWireLineColor", _style_text(style, "line_color"))
_ensure_string_property(obj, "QetWireLineWidth", _style_float_text(style, "line_width"))
_ensure_string_property(obj, "QetWireDiameterMm", _style_float_text(style, "diameter_mm"))
_ensure_string_property(obj, "QetWireBendRadiusMm", _style_float_text(style, "bend_radius_mm"))
if old_style_json and changed and _geometry_style_fields(old_style) != _geometry_style_fields(style):
_ensure_bool_property(
obj,
"QetWireStyleAffectsGeometry",
True,
"Whether latest QET wire style changes geometry-sensitive fields",
)
elif "QetWireStyleAffectsGeometry" not in getattr(obj, "PropertiesList", []):
_ensure_bool_property(
obj,
"QetWireStyleAffectsGeometry",
False,
"Whether latest QET wire style changes geometry-sensitive fields",
)
return changed
def _apply_wire_style_view(obj, wire_style):
changed = False
view = getattr(obj, "ViewObject", None)
if view is None:
return changed
color = _parse_line_color(_style_text(wire_style, "line_color"))
if color is not None:
for prop_name in ("LineColor", "ShapeColor", "PointColor"):
if not hasattr(view, prop_name):
continue
try:
if tuple(getattr(view, prop_name, ())) != tuple(color):
setattr(view, prop_name, color)
changed = True
except Exception:
pass
draw_style = _draw_style_value(wire_style)
if hasattr(view, "DrawStyle"):
try:
if str(getattr(view, "DrawStyle", "") or "") != draw_style:
view.DrawStyle = draw_style
changed = True
except Exception:
pass
if hasattr(view, "DisplayMode"):
try:
if str(getattr(view, "DisplayMode", "") or "") != "Wireframe":
view.DisplayMode = "Wireframe"
changed = True
except Exception:
pass
width = _line_width_value(wire_style)
if width is not None and hasattr(view, "LineWidth"):
try:
if float(getattr(view, "LineWidth", 0.0) or 0.0) != width:
view.LineWidth = width
changed = True
except Exception:
pass
_ensure_bool_property(
obj,
"QetWireStyleApplied",
bool(isinstance(wire_style, dict) and wire_style),
"Whether the QET wire style has been applied to the visible 3D wire",
)
_ensure_string_property(
obj,
"QetAppliedWireLineColor",
_style_text(wire_style, "line_color"),
"Applied QET wire line color text",
)
_ensure_string_property(
obj,
"QetAppliedWireDrawStyle",
draw_style,
"Applied QET wire draw style",
)
if color is not None:
_ensure_string_property(
obj,
"QetAppliedWireLineColorRgb",
",".join(str(value) for value in color),
"Applied QET wire RGB color",
)
if width is not None:
_ensure_string_property(
obj,
"QetAppliedWireLineWidth",
str(width),
"Applied QET wire line width",
)
return changed
def _set_task_extra_properties(task, entry):
_ensure_string_property(task, "QetStartElementUuid", entry["start_element_uuid"])
_ensure_string_property(task, "QetEndElementUuid", entry["end_element_uuid"])
_ensure_string_property(task, "QetWireStyleId", entry["wire_style_id"])
_set_wire_style_properties(task, entry["wire_style_id"], entry["wire_style"])
_ensure_string_property(task, "QetStartTerminalDisplay", entry["start_terminal_display"])
_ensure_string_property(task, "QetEndTerminalDisplay", entry["end_terminal_display"])
_ensure_string_property(task, "QetStartDeviceLabel", entry["start_device_label"])
@ -306,3 +530,67 @@ def import_wire_tasks_from_payload(payload, doc=None):
report["updated_tasks"] += 1
return report
def apply_wire_styles_from_payload(payload, doc=None):
if doc is None:
doc = getattr(App, "ActiveDocument", None)
if doc is None:
raise WiringImportError("No active FreeCAD document is available.")
if not isinstance(payload, dict):
raise WiringImportError("Exchange payload must be an object.")
style_by_wire = {}
for item in payload.get("wires", []) or []:
if not isinstance(item, dict):
continue
wire_uuid = (
_string_value(item, "wire_id")
or _string_value(item, "wire_uuid")
or _string_value(item, "id")
)
if not wire_uuid:
continue
style_by_wire[wire_uuid] = {
"wire_style_id": _int_text_value(item, "wire_style_id"),
"wire_style": _wire_style_payload(item),
}
report = {
"total_styles": len(style_by_wire),
"routed_wires_seen": 0,
"routed_wires_matched": 0,
"updated_properties": 0,
"updated_view": 0,
"updated_samples": [],
"warnings": [],
}
for obj in list(WiringObjects.iter_routed_wire_objects(doc)):
report["routed_wires_seen"] += 1
wire_uuid = (getattr(obj, "QetWireUuid", "") or "").strip()
if not wire_uuid or wire_uuid not in style_by_wire:
continue
report["routed_wires_matched"] += 1
entry = style_by_wire[wire_uuid]
prop_changed = _set_wire_style_properties(
obj,
entry["wire_style_id"],
entry["wire_style"],
)
view_changed = _apply_wire_style_view(obj, entry["wire_style"])
if prop_changed:
report["updated_properties"] += 1
if view_changed:
report["updated_view"] += 1
if len(report["updated_samples"]) < 8:
report["updated_samples"].append(
{
"wire_uuid": wire_uuid,
"line_color": _style_text(entry["wire_style"], "line_color"),
"line_width": _style_text(entry["wire_style"], "line_width"),
"view_changed": bool(view_changed),
}
)
return report

Loading…
Cancel
Save