feature/优化2d-3d数据交换-0612

dev
zhanghao 2 weeks ago
parent 33ae1d8e31
commit 19bf61efa6

@ -44,7 +44,7 @@
<ProjectRoot>/.qet_freecad/
2d_to_3d.json
3d_to_2d.json
scene.FCStd
QETScene.FCStd
logs/
```
@ -52,22 +52,58 @@
- `2d_to_3d.json`QET 导出给 FreeCAD 的输入快照
- `3d_to_2d.json`FreeCAD 回写给 QET 的结果快照
- `scene.FCStd`:该项目对应的 FreeCAD 3D 工程
- `QETScene.FCStd`:该项目对应的 FreeCAD 3D 工程
---
## 3. 第一版 `2d_to_3d.json` 设计原则
### 3.1 最小主键集
### 3.1 设备实例与 2D 符号实例分离
第一版最小主键集只认
当前版协议明确区分三层身份
- `project_uuid`
- `element_uuid`
- `terminal_uuid`
- `instance_id`
- `element_uuid`2D 符号实例 ID
- `device_instance_id`3D 设备实例 ID
- `terminal_instance_id`3D 端子实例 ID
### 3.2 数据来源
这里的关键变化是:
> 多个 `element_uuid` 只要设备实例标注 `display_tag` 相同,就视为同一个 3D 设备实例。
因此:
- `devices[]` 表达的是“3D 设备实例”
- `devices[].terminals[]` 表达的是“该 3D 设备实例下有哪些 2D 端子成员”
- `device_models[]` 表达的是“哪个 3D 设备实例使用哪个模型”
### 3.2 同名设备实例的分组规则
第一版按下面规则分组:
- 同一项目
- 同一图纸绑定机柜上下文
- `display_tag` 相同
则视为同一个 3D 设备实例。
QET 在导出时负责:
1. 先按 `display_tag` 分组
2. 为每组生成或复用一个 `device_instance_id`
3. 把组内所有端子都挂到这个 `device_instance_id`
### 3.3 无端子设备不导出到 3D
甲方当前确认:
> 没有端子的设备,不需要传到 3D 侧。
因此第一版允许:
- `devices[]` 不再直接保存 `element_uuid`
- 设备成员关系直接通过 `devices[].terminals[]` 表达
### 3.4 数据来源
`2d_to_3d.json` 不是数据库整表导出,而是:
@ -79,7 +115,7 @@
- 也可以来自当前内存状态
- 但最终输出是给 FreeCAD 消费的统一协议格式
### 3.3 协议可以比数据库稍微丰富
### 3.5 协议可以比数据库稍微丰富
数据库设计要求尽量去冗余。
@ -101,7 +137,7 @@
```json
{
"schema_version": "1.2",
"schema_version": "2.0",
"project_uuid": "string",
"generated_at": "2026-05-18T10:30:00+08:00",
"source": {
@ -122,7 +158,7 @@
- `generated_at`:导出时间
- `source`:导出来源信息
- `cabinet`:当前图纸属性中绑定的机柜信息
- `devices`设备实例绑定,以及每个设备名下的接线处
- `devices`3D 设备实例快照,每个设备下内嵌自己的端子列表
- `device_models`:设备 3D 模型解析结果
- `wires`:导线起点/终点与标注快照
@ -181,16 +217,20 @@
### 6.1 作用
`devices` 负责表达:
`devices[]` 负责表达:
> 一个合并后的 3D 设备实例。
注意:
> 一个 2D 设备实例,对应哪个 3D 设备实例。
- 它不再表示单个 `element_uuid`
- 它表示“按 `display_tag` 分组后的设备实例”
### 6.2 第一版字段
```json
{
"element_uuid": "string",
"instance_id": "string",
"device_instance_id": "string",
"display_tag": "string",
"terminals": []
}
@ -200,17 +240,16 @@
| 字段 | 中文 | 必需 | 说明 |
| --- | --- | --- | --- |
| `element_uuid` | 2D设备实例UUID | 是 | QET 图纸中的设备实例主键 |
| `instance_id` | 3D实例ID | 是 | FreeCAD 侧设备实例主键 |
| `display_tag` | 2D设备实例标注 | 否 | JSON 显示辅助字段,优先使用 2D 中设备标注作为 FreeCAD 树标签;为空时再退回 `instance_id` / `element_uuid` |
| `terminals` | 设备接线处列表 | 否 | 当前设备名下的 2D 接线处集合FreeCAD 会先按设备再导入接线处 |
| `device_instance_id` | 3D设备实例ID | 是 | 当前 3D 设备实例唯一标识;同一 `display_tag` 分组下的多个 2D 符号共用同一个值 |
| `display_tag` | 设备实例标注 | 是 | 当前设备实例在 2D 中的业务标注QET 以它作为同一设备实例的分组键 |
| `terminals` | 设备端子列表 | 是 | 当前 3D 设备实例下的全部 2D 端子成员 |
### 6.4 说明
- 如果第一次进入 3D 时还没有 `instance_id`,允许先导出空字符串或缺省值
- FreeCAD 创建 3D 实例后,再在回写阶段补齐
- `display_tag` 不进入第一版数据库最小字段集,它只存在于交换 JSON 中,用来让 3D 树视图与 2D 标注更容易对上
- `terminals` 是设备级子结构,不再单独放在顶层
- `devices[]` 不再携带 `element_uuid`
- `devices[]` 下直接内嵌 `terminals[]`
- FreeCAD 导入时可按“先设备、后该设备端子”的顺序处理
- 第一版只导出“至少有一个端子”的设备实例
---
@ -218,17 +257,17 @@
### 7.1 作用
`devices[].terminals` 负责表达:
`devices[].terminals[]` 负责表达:
> 一个 2D 端子实例,属于哪个 3D 设备实例。
> 当前设备实例下面有哪些 2D 端子成员,以及它们可选地对应哪个 3D 端子实例。
### 7.2 第一版字段
```json
{
"terminal_uuid": "string",
"instance_id": "string",
"element_uuid": "string",
"terminal_instance_id": "string",
"terminal_display": "string"
}
```
@ -238,25 +277,16 @@
| 字段 | 中文 | 必需 | 说明 |
| --- | --- | --- | --- |
| `terminal_uuid` | 2D端子UUID | 是 | QET 端子实例主键 |
| `instance_id` | 3D实例ID | 是 | 该端子所属的 3D 设备实例 |
| `element_uuid` | 2D设备实例UUID | 否 | JSON 导入辅助字段,帮助 FreeCAD 在首次没有 `instance_id` 时仍能知道端子属于哪个设备 |
| `element_uuid` | 2D符号实例UUID | 是 | 该端子来自哪个 2D 符号实例 |
| `terminal_instance_id` | 3D端子实例ID | 否 | 3D 端子对象 UUID如果当前尚未绑定到具体 3D 端子对象则允许为空串 |
| `terminal_display` | 接线处标注 | 否 | 2D 端子在图纸上的显示标注,供 FreeCAD 端子对象显示和槽位匹配使用 |
### 7.4 为什么这里允许带 `element_uuid`
注意:
- `element_uuid` **不是**第一版端子绑定表的数据库字段扩张
- 它只是交换 JSON 中的上下文辅助字段
原因:
- 当某些设备第一次进入 3D、暂时还没有 `instance_id`
- FreeCAD 仍需要知道该端子属于哪个 2D 设备实例
所以这里允许 JSON 比数据库稍微丰富一些。
### 7.4 说明
### 7.5 为什么第一版不带更多字段
- 所属 `device_instance_id` 由外层设备对象提供
- `terminal_instance_id` 表示 3D 端子实例身份
- 两者不应继续复用同一个 `instance_id` 字段
- 设备成员关系不再需要从顶层反推
第一版先不强制包含:
@ -343,7 +373,7 @@
---
## 9. `device_models` 结构
## 8. `device_models` 结构
### 9.1 作用
@ -355,7 +385,7 @@
```json
{
"element_uuid": "string",
"device_instance_id": "string",
"device_id": 123,
"parts_3d": "string",
"resolved_model_path": "string"
@ -366,10 +396,10 @@
| 字段 | 中文 | 必需 | 说明 |
| --- | --- | --- | --- |
| `element_uuid` | 2D设备实例UUID | 是 | 与 `devices` 关联 |
| `device_instance_id` | 3D设备实例ID | 是 | 与 `devices[]` 中的设备实例关联 |
| `device_id` | 设备类型ID | 否 | QET 设备主数据 ID |
| `parts_3d` | 3D模型资源URI | 否 | 原始资源引用,来自 `device_3d_asset.uri``device_attribute.parts_3d` |
| `resolved_model_path` | 已解析模型路径 | 是 | QET 已经解析好的本地模型文件路径FreeCAD 第二步直接用它导入 STEP / FCStd |
| `resolved_model_path` | 已解析模型路径 | 是 | QET 已经解析好的本地模型文件路径FreeCAD 直接用它导入 STEP / FCStd |
### 8.4 为什么 `resolved_model_path` 是第二步关键字段
@ -377,7 +407,7 @@
如果没有 `resolved_model_path`FreeCAD 就必须自己理解和回查:
1. `element_uuid`
1. `device_instance_id`
2. `device_id`
3. `device_3d_asset`
4. `device_attribute.parts_3d`
@ -399,7 +429,7 @@ A 方案下,`.FCStd` 是正式可复用设备资产格式。QET 导出时不
```json
{
"element_uuid": "elem-1001",
"device_instance_id": "dev-inst-qf1",
"device_id": 123,
"parts_3d": "models/mccb/MCCB_1P.FCStd",
"resolved_model_path": "C:/Users/Admin/Documents/MingTuProject/models/mccb/MCCB_1P.FCStd"
@ -438,11 +468,11 @@ FreeCAD 根据 `resolved_model_path` 的扩展名导入 `.FCStd`,并在导入
---
## 8. 第一版 `2d_to_3d.json` 完整样例
## 9. 第一版 `2d_to_3d.json` 完整样例
```json
{
"schema_version": "1.0",
"schema_version": "2.0",
"project_uuid": "proj-001",
"generated_at": "2026-05-18T10:30:00+08:00",
"source": {
@ -451,25 +481,25 @@ FreeCAD 根据 `resolved_model_path` 的扩展名导入 `.FCStd`,并在导入
},
"devices": [
{
"element_uuid": "elem-1001",
"instance_id": "fc-inst-0001"
}
],
"terminals": [
{
"terminal_uuid": "term-2001",
"instance_id": "fc-inst-0001",
"element_uuid": "elem-1001"
},
{
"terminal_uuid": "term-2002",
"instance_id": "fc-inst-0001",
"element_uuid": "elem-1001"
"device_instance_id": "dev-inst-qf1",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "term-2001",
"element_uuid": "elem-1001",
"terminal_instance_id": ""
},
{
"terminal_uuid": "term-2002",
"element_uuid": "elem-1002",
"terminal_instance_id": ""
}
]
}
],
"device_models": [
{
"element_uuid": "elem-1001",
"device_instance_id": "dev-inst-qf1",
"device_id": 123,
"parts_3d": "models/mccb/model.step",
"resolved_model_path": "C:/Users/Admin/Documents/MingTuProject/models/mccb/model.step"
@ -480,7 +510,7 @@ FreeCAD 根据 `resolved_model_path` 的扩展名导入 `.FCStd`,并在导入
---
## 9. 第一版 `3d_to_2d.json` 建议
## 10. 第一版 `3d_to_2d.json` 建议
第一版回写建议同样保持最小化。
@ -496,25 +526,30 @@ FreeCAD 根据 `resolved_model_path` 的扩展名导入 `.FCStd`,并在导入
}
```
### 9.1 `instances`
### 10.1 `instances`
```json
{
"element_uuid": "string",
"instance_id": "string"
"device_instance_id": "string"
}
```
### 9.2 `terminals`
### 10.2 `terminals`
```json
{
"terminal_uuid": "string",
"instance_id": "string"
"device_instance_id": "string",
"terminal_instance_id": "string"
}
```
### 9.3 说明
### 10.3 说明
- `instances[]` 继续表达:某个 2D `element_uuid` 绑定到哪个 3D 设备实例
- `terminals[]` 表达:某个 2D `terminal_uuid` 绑定到哪个 3D 设备实例,以及可选的哪个 3D 端子实例
- 如果当前版本 QET 只消费设备实例级绑定,`terminal_instance_id` 可暂时忽略,但字段命名应保留清晰语义
第一版不回写:
@ -527,29 +562,117 @@ FreeCAD 根据 `resolved_model_path` 的扩展名导入 `.FCStd`,并在导入
---
## 10. 第一版推荐交互流程
## 11. 第一版推荐交互流程
1. 用户在 QET 中点击 `3D视图`
2. QET 生成 `2d_to_3d.json`
3. QET 打开 FreeCAD并打开 `scene.FCStd`
4. FreeCAD 读取 `2d_to_3d.json`
5. FreeCAD 创建或更新:
- 3D 设备实例
- 3D 端子对象
6. 用户在 FreeCAD 中完成装配和接线
7. 用户保存 FreeCAD 工程
8. FreeCAD 生成 `3d_to_2d.json`
9. QET 在后续时机读取 `3d_to_2d.json`
2. QET 检查 `<ProjectRoot>/.qet_freecad/3d_to_2d.json` 是否存在
3. 如果存在QET 读取并校验 `project_uuid`
4. QET 将 `instances[]` 写回 `project_2d3d_symbol_binding(project_uuid, element_uuid, instance_id)`,其中表里的 `instance_id` 列承接 `device_instance_id`
5. QET 将 `terminals[]` 写回 `project_2d3d_terminal_binding(project_uuid, terminal_uuid, instance_id)`,其中表里的 `instance_id` 列承接 `terminal_instance_id`
6. QET 重新导出最新的 `2d_to_3d.json`
7. QET 启动 FreeCAD并优先打开 `<ProjectRoot>/.qet_freecad/QETScene.FCStd`
8. FreeCAD 读取 `2d_to_3d.json`,对现有场景做增量更新
9. 用户在 FreeCAD 中完成装配、布线、保存工程
10. FreeCAD 回写最新的 `3d_to_2d.json`
补充说明:
- 第 4、5 步是第一版绑定闭环的关键
- 第一版不要求 QET 在点击 `3D视图` 前实时监听 FreeCAD
- 第一版 3D 工程文件固定为 `<ProjectRoot>/.qet_freecad/QETScene.FCStd`
---
## 11. 当前推荐结论
## 12. 当前推荐结论
第一版协议建议明确分层:
- **数据库设计**:尽量去冗余
- **JSON 协议**:允许带少量已解析结果,方便 FreeCAD 使用
- **数据库设计**:只保存 2D/3D 绑定关系
- **JSON 协议**:保存当前一次交换所需的最小快照
- **FreeCAD 文档**:保存 3D 位姿、装配状态、导线几何、端子空间结果
一句话总结:
> 第一版先把 `2d_to_3d.json` 做成“面向 FreeCAD 的最小项目快照”,而不是数据库整表镜像。
> 第一版先把 `2d_to_3d.json` 做成面向 FreeCAD 的最小项目快照,而不是数据库整表镜像。
---
## 13. 实现约束
### 13.1 场景文件位置
第一版固定使用:
`<ProjectRoot>/.qet_freecad/QETScene.FCStd`
约束如下:
- QET 再次打开 3D 视图时,应优先打开这个文件
- 如果文件不存在,才创建新场景
- 设备位姿、柜内装配、端子空间位置、导线几何都保存在这个 `FCStd`
### 13.2 设备合并规则
`devices[]` 表示的是“要在 3D 中存在的设备实例”不是“2D 符号逐条镜像”。
第一版约束:
- 多个 2D `element_uuid` 如果 `display_tag` 相同,应视为同一个 3D 设备实例
- 该 3D 设备实例使用单独的 `device_instance_id`
- `display_tag` 只作为导出时的设备归并依据
- `display_tag` 不是数据库绑定主键,数据库仍只认 `element_uuid` / `terminal_uuid`
### 13.3 端子与设备成员关系
第一版采用设备下嵌套端子的表达方式。
原因:
- 一个 3D 设备实例可能对应多个 2D `element_uuid`
- 设备顶层再保留单个 `element_uuid` 已经不再准确
- 端子是第一版真正稳定的 2D/3D 绑定粒度
因此:
- `devices[]` 负责描述 3D 设备实例本身
- `devices[].terminals[]` 负责表达“哪些 2D 端子属于该 3D 设备实例”
- 如果某设备没有端子,第一版可不导出到 3D
- `3d_to_2d.json` 回写阶段仍保持顶层拆平,因为它要直接对应 QET 的两张绑定表
### 13.4 资源解析责任
第一版资源解析责任在 QET
- 优先使用 `device_3d_asset`
- `device_attribute.parts_3d` 只作为兼容/回退字段
- 导出给 FreeCAD 的应是 `resolved_model_path`
- FreeCAD 不应再反查 QET 数据库决定该加载哪个模型
### 13.5 失效对象策略
第一版推荐采用“标记失效,不立即物理删除”的策略。
含义:
- 2D 中已删除、但 3D 文档中仍存在的设备或端子,可在 FreeCAD 中标记为 `Stale`
- 是否隐藏、显示、统计,由 FreeCAD 工具侧控制
- 第一版不要求 QET 直接删除 FreeCAD 文档对象
### 13.6 写回文件责任
`3d_to_2d.json` 的责任是把 FreeCAD 当前确认过的绑定结果回写给 QET。
第一版建议:
- FreeCAD 在导入后、保存后或显式同步时生成该文件
- QET 在下次点击 `3D视图`、手动刷新或打开工程时消费该文件
- QET 读取成功后应更新两张绑定表,但不应把 3D 位姿写入数据库
### 13.7 不再采用的旧口径
以下口径在第一版中不再作为协议设计依据:
- 设备顶层 `element_uuid` 唯一代表一个 3D 设备实例
- 仅按 `element_uuid` 查找或创建 3D 设备组
- 用数据库保存 3D 位姿或装配状态
- 让 FreeCAD 直接写 QET 数据库完成绑定闭环

@ -1,8 +1,9 @@
# 2D / 3D 数据传递待办
本文只记录 QET 与 FreeCAD 第一版协同中,和数据传递、持久化、打开流程相关的待办事项。
> **最后更新2026-05-29**
> 本文记录 QET 与 FreeCAD 第一版协同中,和数据传递、持久化、打开流程相关的实施状态。
第一版继续遵守当前约束:
第一版遵守以下约束:
- 2D 电气语义以 QET 为准。
- 3D 空间状态以 FreeCAD 文档为准。
@ -10,418 +11,150 @@
- 数据库只依赖:
- `project_2d3d_symbol_binding`
- `project_2d3d_terminal_binding`
- 设备绑定只依赖:
- `project_uuid`
- `element_uuid`
- `instance_id`
- 端子绑定只依赖:
- `project_uuid`
- `terminal_uuid`
- `instance_id`
- 设备绑定只依赖:`project_uuid` / `element_uuid` / `instance_id`
- 端子绑定只依赖:`project_uuid` / `terminal_uuid` / `instance_id`
- 第一版 3D 端子绑定唯一依据是 `terminal_uuid`
## 1. QET 读取 3d_to_2d.json
---
### 1.1 当前状态
## 1. 当前入口流程
FreeCAD 侧已经可以生成
用户点击 QET 菜单 `3D视图``openThreeDViewWindow()` (`qetdiagrameditor.cpp:17402`)
```text
<ProjectRoot>/.qet_freecad/3d_to_2d.json
```
该文件用于把 FreeCAD 创建或维护的 3D 实例 ID 回传给 QET。
当前最关键缺口是:
> QET 侧还需要读取 `3d_to_2d.json`,并把里面的实例绑定写回项目运行库。
### 1.2 要写回的表
设备实例绑定写入:
```text
project_2d3d_symbol_binding(project_uuid, element_uuid, instance_id)
```
端子实例绑定写入:
```text
project_2d3d_terminal_binding(project_uuid, terminal_uuid, instance_id)
```
### 1.3 为什么要做
第一次打开 FreeCAD 时QET 可能只知道:
```text
element_uuid
terminal_uuid
resolved_model_path
```
此时 `instance_id` 可能为空。FreeCAD 创建 3D 设备实例后,会生成并保存 `QetInstanceId`
QET 读取 `3d_to_2d.json` 后,就能记住:
```text
element_uuid -> instance_id
terminal_uuid -> instance_id
```
这样下次 QET 再导出 `2d_to_3d.json` 时,可以带上已有 `instance_id`FreeCAD 就能复用已有 3D 实例,而不是重复创建。
## 2. 3D 工程文件持久化
### 2.1 当前约定
第一版 3D 工程本体是:
```text
<ProjectRoot>/.qet_freecad/scene.FCStd
```
交换目录建议保持为:
```text
<ProjectRoot>/.qet_freecad/
2d_to_3d.json
3d_to_2d.json
scene.FCStd
logs/
```
### 2.2 FreeCAD 文档负责保存
`scene.FCStd` 保存:
- 设备 3D 实例对象
- 设备位姿
- 柜内装配关系
- 端子 LCS 和空间位置
- 手动导线几何
- 自动布线结果
- 走线路径和路由网络
这些内容不进入数据库。
### 2.3 QET 要做什么
QET 下次打开同一个工程或同一个机柜时,应优先打开已有:
```text
<ProjectRoot>/.qet_freecad/scene.FCStd
```
不要重新创建空场景。
如果后续支持一个项目多个机柜,则 QET 还需要保存或推导:
```text
某个机柜 -> 对应哪个 scene.FCStd
```
第一版可以先使用默认工程文件:
```text
.qet_freecad/scene.FCStd
```
## 3. 2D 原理图更新后同步到 3D
### 3.1 当前推荐方式
第一版不做实时同步。
推荐在用户点击 QET 的 `3D视图` 时执行一次同步:
```text
QET 重新导出 2d_to_3d.json
FreeCAD 打开已有 scene.FCStd
FreeCAD 根据最新 JSON 增量更新 3D 文档
```
### 3.2 FreeCAD 更新依据
FreeCAD 应按下面字段识别对象:
```text
设备element_uuid / instance_id
端子terminal_uuid / instance_id
```
处理策略:
- `instance_id` 已存在,并且 FreeCAD 文档中找到对象:复用并更新语义。
- `instance_id` 为空:创建新的 3D 实例。
- 2D 新增设备或端子FreeCAD 新增对应 3D 对象。
- 2D 已删除但 FreeCAD 仍存在的对象:需要定义失效处理策略。
### 3.3 删除或失效对象策略
后续需要明确:
- 直接删除 3D 对象
- 标记为失效
- 隐藏但保留
- 提示用户确认后删除
第一版建议先采用保守策略:
```text
标记失效或提示用户,不自动删除已有 3D 装配和接线。
```
## 4. 3D 保存后 2D 再打开时打开保存的工程
这和“2D 原理图更新后同步到 3D”不是一件事。
### 4.1 打开保存的工程解决的问题
它解决的是:
```text
QET 应该打开哪个 FreeCAD 文件?
```
第一版默认打开:
```text
<ProjectRoot>/.qet_freecad/scene.FCStd
```
### 4.2 2D 更新同步解决的问题
它解决的是:
```text
打开已有 scene.FCStd 后FreeCAD 应该按最新 2D 原理图新增、更新或标记哪些设备、端子、导线任务?
```
### 4.3 两者关系
完整动作通常发生在同一个入口中:
```text
用户点击 3D视图
QET 读取上一轮 3d_to_2d.json
QET 更新绑定表
QET 重新导出 2d_to_3d.json
QET 启动 FreeCAD 并打开已有 scene.FCStd
FreeCAD 根据 2d_to_3d.json 增量更新文档
```
### 4.4 3D视图入口确认流程
当前确认的 `3D视图` 入口顺序为:
```text
用户点击 3D视图
1. QET 检查 .qet_freecad/3d_to_2d.json 是否存在
2. 如果存在,读取并校验 project_uuid
3. 把 instances[] 写入 project_2d3d_symbol_binding
4. 把 terminals[] 写入 project_2d3d_terminal_binding
5. 然后重新导出最新 2d_to_3d.json
6. 启动 FreeCAD打开已有 scene.FCStd
```
这个顺序的关键点是:
```text
先读取 3D 回写并更新绑定表
再导出新的 2D -> 3D 快照
```
否则本次导出的 `2d_to_3d.json` 仍然拿不到上一轮 FreeCAD 生成的 `instance_id`
## 5. 设备 3D 资产传递
### 5.1 当前状态
QET -> FreeCAD 方向已经具备设备资产传递。
`2d_to_3d.json` 中已有:
1. exportForDiagram() // FreeCADExchangeExportService.cpp:1091
├─ consumeFreeCadWriteBackIfPresent() // 读取上一轮 3d_to_2d.json → 写绑定表 ✅
├─ loadSymbolBindingInstanceIds() // 从库读回设备 instance_id ✅
├─ loadTerminalBindingInstanceIds() // 从库读回端子 instance_id ✅
├─ 构建 devices[] / device_models[] / wires[] // 全量导出当前图纸所有设备 ⚠️ 全量非增量
└─ 写入 2d_to_3d.json
2. openScene() // FreeCADLaunchService.cpp:352
├─ 检查 QETScene.FCStd 是否存在 // 存在则传文件参数 + 设环境变量
├─ 设置 QET_2D_TO_3D_JSON / QET_FREECAD_SCENE_FILE
└─ 启动 FreeCAD.exe
```text
device_models[]
element_uuid
device_id
parts_3d
resolved_model_path
3. FreeCAD ExchangeBootstrap::bootstrap_if_requested()
├─ 读取 2d_to_3d.json
├─ 判断 QETScene.FCStd 是否存在 → is_first_open
├─ 导入设备 / 端子 / 导线任务(按 QetElementUuid 匹配复用)
├─ 强制保存场景文件(确保下次打开能检测到已有文件)
├─ 标记失效对象 (StaleObjectSync) // 首次打开跳过,再次打开执行
├─ 写回 3d_to_2d.json (ExchangeWriteBack) // 仅含 instances[] + terminals[]
└─ 弹出同步结果摘要对话框(含同步模式、增删数量)
```
FreeCAD 使用 `resolved_model_path` 导入 3D 模型。
### 5.2 后续要求
后续不是从零实现,而是继续稳定以下规则:
---
- 优先使用 `device_3d_asset`
- `device_attribute.parts_3d` 只作为兼容或回退字段。
- QET 导出给 FreeCAD 的关键字段是 `resolved_model_path`
- FreeCAD 不反查 QET 数据库来寻找模型路径。
- `.FCStd` 应作为正式可复用设备资产格式。
- STEP / STP / STE 更适合作为制作 FCStd 模板的原始几何输入。
## 2. 已完成项
## 6. 导线数据传递
### 2.1 基础闭环QET ↔ FreeCAD
### 6.1 当前状态
| 功能 | QET 侧实现 | FreeCAD 侧实现 |
|------|-----------|---------------|
| 生成 `2d_to_3d.json` | `exportForDiagram()` | — |
| 读取 `2d_to_3d.json` | — | `ExchangeBootstrap.load_exchange_payload()` |
| 生成 `3d_to_2d.json` | — | `ExchangeWriteBack.write_back_document()` |
| 读取 `3d_to_2d.json` 并写绑定表 | `consumeFreeCadWriteBackIfPresent()` | — |
| 绑定表 upsert | `upsertFreeCadSymbolBinding()` / `upsertFreeCadTerminalBinding()` | — |
| 绑定表自动建表 | `ensureMinimalFreeCadBindingTables()` | — |
| 启动 FreeCAD | `openScene()` 设环境变量 | — |
| 打开已有 `QETScene.FCStd` | 传文件路径 | `_scene_path_from_exchange_context()` |
| 按 element_uuid 匹配复用 3D 实例 | — | `DeviceImport._find_device_group()` |
| 新建设备自动生成 QetInstanceId | — | `DeviceImport._ensure_child_group()` |
| 文档保存后自动写回 | — | `_WriteBackObserver` |
| 场景文件存在性检查与首次/再次分支 | `openScene()` 检查 + 传参 | `_is_scene_first_open()` 判断 |
| 导入后强制保存场景文件 | — | `ExchangeBootstrap._run_scheduled_device_import()` |
| 失效对象标记 (StaleObjectSync) | — | `StaleObjectSync.mark_stale_objects_from_payload()` |
| 同步结果控制台横幅 + 对话框 | — | `ExchangeBootstrap._run_scheduled_device_import()` |
| 诊断日志 | — | `%LOCALAPPDATA%/QETDeps/freecad_exchange_bootstrap.log` |
QET -> FreeCAD 方向已经具备导线任务传递。
### 2.2 设备资产传递
`2d_to_3d.json` 中已有:
- `device_models[]` 包含 `resolved_model_path` → FreeCAD 导入 STEP/STP/FCStd
- 优先使用 `device_3d_asset``parts_3d` 作为回退
```text
wires[]
wire_id
net_uuid
group_uuid
wire_mark
wire_mark_is_manual
start_element_uuid
start_terminal_uuid
end_element_uuid
end_terminal_uuid
start_terminal_display
end_terminal_display
```
### 2.3 导线任务传递
FreeCAD 侧已经可以把 `wires[]` 导入为导线任务。
### 6.2 后续可能扩展
如果后续 QET 需要读取 3D 布线状态,可以扩展 `3d_to_2d.json`,例如:
```text
wire_id
route_status
route_type
length
diagnostics
```
- `wires[]` 包含所有端点信息 → FreeCAD `WiringImport.py` 导入为布线任务
- 自动布线 (`AutoRouting.py`) + 手动布线 (`ManualWiring.py`)
### 6.3 第一版不做
### 2.4 机柜维度
第一版不把下面内容写进数据库:
- `cabinet` 字段包含 `resolved_scene_path`、`location_id` 等
- FreeCAD 支持按机柜导入柜体模型
- 3D 路径点
- 导线空间几何
- 线束位姿
- 线槽内具体排布
---
这些仍保存在:
## 3. 待实现
```text
scene.FCStd
```
### 3.1 QET 增量导出
## 7. 推荐实施步骤
**当前行为:** `exportForDiagram()` 始终遍历当前图纸的全部 `elements()`,全量导出到 `2d_to_3d.json`。新设备因没有 `instance_id` 而留空,已删除的设备从 JSON 中消失但 FreeCAD 侧的对象仍存在。
### 步骤 1补 QET 读取 3d_to_2d.json
**需要做:**
在 QET 侧新增或接入一个读取流程
QET 侧提供增量导出模式,在 `2d_to_3d.json` 中增加变更标记:
```text
读取 <ProjectRoot>/.qet_freecad/3d_to_2d.json
校验 project_uuid
读取 instances[]
读取 terminals[]
```json
{
"devices": [
{"element_uuid": "...", "instance_id": "...", "change_type": "unchanged"},
{"element_uuid": "...", "instance_id": "", "change_type": "added"},
{"element_uuid": "...", "instance_id": "...", "change_type": "modified"}
],
"removed_devices": [
{"element_uuid": "...", "instance_id": "..."}
]
}
```
校验失败时不写库,并给出提示或日志。
### 步骤 2写回两张绑定表
`instances[]` 执行 upsert
```text
project_2d3d_symbol_binding:
project_uuid
element_uuid
instance_id
```
这样 FreeCAD 侧可以更精确地处理变更,而不是依赖"不在 payload 中的就是失效"的兜底逻辑。
`terminals[]` 执行 upsert
### 3.2 失效设备统计回传 ✅(已完成 2026-05-29
```text
project_2d3d_terminal_binding:
project_uuid
terminal_uuid
instance_id
**当前行为:** QET `exportForDiagram()` 中计算失效设备:
```
第一版不要写入旧 3D 场景表,也不要写入位姿字段。
### 步骤 3把读取回写接到 3D视图入口前
用户点击 `3D视图` 时,建议先执行:
```text
读取上一轮 3d_to_2d.json
更新绑定表
重新导出 2d_to_3d.json
启动 FreeCAD
失效 = project_2d3d_symbol_binding 的所有 element_uuid
Diagram::elements() 中存在的 element_uuid
```
结果写入 `2d_to_3d.json``stale_devices` 字段。FreeCAD 侧 `StaleObjectSync` 直接使用该权威列表标记 Stale无需再做反向对比。
这样新导出的 `2d_to_3d.json` 就能带上最新 `instance_id`
### 步骤 4确认 scene.FCStd 打开策略
QET 启动 FreeCAD 时:
- 如果 `.qet_freecad/scene.FCStd` 已存在,打开它。
- 如果不存在,允许 FreeCAD 创建新的工程文档。
- 后续如果接入机柜维度,则按机柜映射选择对应 FCStd。
### 3.3 QET 侧 FreeCAD 进程管理
### 步骤 5确认 FreeCAD 增量更新策略
**当前行为:** `openScene()` 使用 `QProcess::startDetached()` 启动 FreeCAD启动后即断开不跟踪进程状态。
FreeCAD 打开 `scene.FCStd` 并读取新的 `2d_to_3d.json` 后:
**可能需要:**
- 检测 FreeCAD 是否已在运行
- FreeCAD 退出后通知 QET 刷新状态
- 防止重复启动多个 FreeCAD 实例操作同一 scene 文件
- 已存在实例:复用。
- 新设备:创建。
- 新端子:创建。
- 失效设备或端子:先标记或提示,不建议第一版自动删除。
---
### 步骤 6保留设备资产和导线任务现有链路
## 4. 实施优先级
设备 3D 资产继续走:
| 优先级 | 条目 | 说明 |
|--------|------|------|
| P2 | 3.1 QET 增量导出 | 当前全量导出 + QET 侧权威失效计算已可工作,增量是性能优化 |
| P3 | 3.3 进程管理 | 不影响数据正确性,提升用户体验 |
```text
device_3d_asset -> resolved_model_path -> FreeCAD
```
导线任务继续走:
---
```text
QET wires[] -> FreeCAD QETWiring_01_Tasks
```
## 5. FreeCAD 侧后续可做
第一版暂不要求 QET 读取 3D 导线几何。
- 扩展 `ExchangeWriteBack` 支持 stale 段输出(配合 3.2
- 优化机柜级别的场景映射和切换 UI
- 增强失效对象的用户交互(批量清理、恢复、导出报告)
- 自动布线算法持续优化
### 步骤 7后续再扩展 3D 布线状态回传
等最小绑定闭环稳定后,再考虑在 `3d_to_2d.json` 中增加:
```text
routed_wires[]
wire_id
route_status
route_type
length
diagnostics
```
---
该扩展只作为状态、统计、诊断回传,不作为第一版数据库几何持久化。
## 6. 已修复问题记录
## 8. 第一版完成标准
### 6.1 失效设备统计始终为 02026-05-29
第一版数据传递闭环完成后,应满足:
**根因**`_is_scene_first_open()` 原实现检查 `QETScene.FCStd` 文件是否存在来判断首次/再次打开。但该文件仅在用户手动保存后才落盘,而 `ExchangeWriteBack.write_back_document()` 每次导入后必定写入 `3d_to_2d.json`。若用户未手动保存,`QETScene.FCStd` 不存在,每次打开都被判定为"首次打开"从而跳过 `StaleObjectSync`
1. 第一次从 QET 打开 FreeCAD`instance_id` 可以为空。
2. FreeCAD 创建 3D 实例并保存 `scene.FCStd`
3. FreeCAD 生成 `3d_to_2d.json`
4. QET 能读取 `3d_to_2d.json`
5. QET 能写入两张绑定表。
6. 第二次点击 `3D视图`QET 导出的 `2d_to_3d.json` 中已有 `instance_id`
7. FreeCAD 打开已有 `scene.FCStd`,复用已有 3D 实例。
8. 3D 位姿、装配、导线几何仍只保存在 `scene.FCStd`
**修复**
- FreeCAD `ExchangeBootstrap._is_scene_first_open()`:改为检查 `3d_to_2d.json` 是否存在(每次 write-back 必定写入),而非检查 `.FCStd` 文件
- 注QET 侧 `scene.FCStd` 与 FreeCAD 侧 `QETScene.FCStd` 的文件名不一致是已知的,但 FreeCAD 通过 `_scene_path_from_exchange_context()` 自行在 exchange 目录中查找 `QETScene.FCStd`,不依赖 QET 传参,因此不影响功能

@ -1,6 +1,7 @@
import os
from pathlib import Path
import uuid
from datetime import datetime
import FreeCAD as App
@ -45,7 +46,12 @@ def _append_debug_log(message):
log_path = _debug_log_path()
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as handle:
handle.write(message + "\n")
handle.write(
"[{0}] {1}\n".format(
datetime.now().astimezone().isoformat(timespec="seconds"),
message,
)
)
except Exception:
pass
@ -183,11 +189,24 @@ def _ensure_child_group(doc, parent_group, element_uuid, instance_id, name_prefi
def _ensure_document(scene_path):
preferred_name = _safe_token(Path(scene_path).stem if scene_path else "QETScene")[:48] or "QETScene"
normalized_scene_path = _native_path(scene_path)
_append_debug_log(
"DeviceImport _ensure_document: preferred_name={0}, normalized_scene_path={1}".format(
preferred_name,
normalized_scene_path or "<empty>",
)
)
if normalized_scene_path and os.path.isfile(normalized_scene_path):
normalized_target = os.path.normcase(os.path.normpath(normalized_scene_path))
for candidate in App.listDocuments().values():
candidate_path = getattr(candidate, "FileName", "") or ""
if candidate_path and os.path.normcase(os.path.normpath(candidate_path)) == normalized_target:
_append_debug_log(
"DeviceImport _ensure_document reusing already open scene doc: name={0}, path={1}, objects={2}".format(
getattr(candidate, "Name", ""),
candidate_path,
len(list(getattr(candidate, "Objects", []) or [])),
)
)
_activate_document(candidate)
return candidate
@ -203,15 +222,36 @@ def _ensure_document(scene_path):
"Cannot open existing FreeCAD scene file: {0}".format(normalized_scene_path)
)
_append_debug_log(
"DeviceImport _ensure_document opened existing scene doc: name={0}, path={1}, objects={2}".format(
getattr(doc, "Name", ""),
getattr(doc, "FileName", "") or normalized_scene_path,
len(list(getattr(doc, "Objects", []) or [])),
)
)
_activate_document(doc)
return doc
existing_doc = DevicePreview.find_main_exchange_document(preferred_name)
if existing_doc is not None:
_append_debug_log(
"DeviceImport _ensure_document reusing unsaved exchange doc: name={0}, path={1}, objects={2}".format(
getattr(existing_doc, "Name", ""),
getattr(existing_doc, "FileName", "") or "<unsaved>",
len(list(getattr(existing_doc, "Objects", []) or [])),
)
)
_activate_document(existing_doc)
return existing_doc
doc = App.newDocument(preferred_name)
_append_debug_log(
"DeviceImport _ensure_document created new scene doc: name={0}, path={1}, objects={2}".format(
getattr(doc, "Name", ""),
getattr(doc, "FileName", "") or "<unsaved>",
len(list(getattr(doc, "Objects", []) or [])),
)
)
_activate_document(doc)
return doc
@ -220,6 +260,24 @@ def _activate_document(doc):
if doc is None:
return
current_doc = getattr(App, "ActiveDocument", None)
if current_doc is doc:
_append_debug_log(
"DeviceImport _activate_document skipped: already active name={0}, path={1}".format(
getattr(doc, "Name", ""),
getattr(doc, "FileName", "") or "<unsaved>",
)
)
return
_append_debug_log(
"DeviceImport _activate_document: name={0}, path={1}, objects={2}".format(
getattr(doc, "Name", ""),
getattr(doc, "FileName", "") or "<unsaved>",
len(list(getattr(doc, "Objects", []) or [])),
)
)
setter = getattr(App, "setActiveDocument", None)
if callable(setter):
try:
@ -418,6 +476,21 @@ def _find_device_group(doc, element_uuid):
return candidate
return None
def _find_device_group_by_instance_id(doc, instance_id):
target_instance_id = (instance_id or "").strip()
if not target_instance_id:
return None
for candidate in doc.Objects:
if not getattr(candidate, "Name", "").startswith(DEVICE_GROUP_PREFIX):
continue
if "QetInstanceId" not in getattr(candidate, "PropertiesList", []):
continue
if getattr(candidate, "QetInstanceId", "").strip() == target_instance_id:
return candidate
return None
def _device_label_text(display_tag, instance_id, element_uuid):
label = (display_tag or "").strip()
if label:
@ -441,50 +514,176 @@ def _device_warning_subject(display_tag, element_uuid):
return "设备"
def _ensure_device_group(doc, root_group, element_uuid, instance_id, model_path, display_tag, layout_index):
created_now = False
device_group = _find_device_group(doc, element_uuid)
if device_group is not None and getattr(device_group, "TypeId", "") != "App::Part":
_remove_object_tree(doc, device_group)
device_group = None
def _device_report_label(display_tag, instance_id, element_uuid=""):
label = (display_tag or "").strip()
if label:
return label
fallback = (instance_id or "").strip() or (element_uuid or "").strip()
return fallback or "未命名设备"
def _payload_device_instance_id(device):
if not isinstance(device, dict):
return ""
return (
(device.get("device_instance_id") or "").strip()
or (device.get("instance_id") or "").strip()
)
def _payload_device_element_uuid(device):
if not isinstance(device, dict):
return ""
element_uuid = (device.get("element_uuid") or "").strip()
if element_uuid:
return element_uuid
for terminal in device.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = (terminal.get("element_uuid") or "").strip()
if element_uuid:
return element_uuid
return ""
def _payload_terminal_uuid_set(device):
result = set()
if not isinstance(device, dict):
return result
for terminal in device.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
terminal_uuid = (terminal.get("terminal_uuid") or "").strip()
if terminal_uuid:
result.add(terminal_uuid)
return result
def _existing_qet_terminal_uuids(device_group):
terminal_group = TerminalObjects.find_child_group_by_kind(
device_group,
TerminalObjects.TERMINAL_GROUP_KIND,
)
result = set()
for terminal_obj in TerminalObjects.collect_terminal_objects(terminal_group):
terminal_uuid = (getattr(terminal_obj, "QetTerminalUuid", "") or "").strip()
if not terminal_uuid or TerminalObjects.is_local_terminal_uuid(terminal_uuid):
continue
result.add(terminal_uuid)
return result
def _device_change_detail(
display_tag,
instance_id,
element_uuid="",
change_types=None,
added_terminal_uuids=None,
removed_terminal_uuids=None,
previous_display_tag="",
previous_model_path="",
resolved_model_path="",
):
return {
"display_tag": (display_tag or "").strip(),
"instance_id": (instance_id or "").strip(),
"element_uuid": (element_uuid or "").strip(),
"label": _device_report_label(display_tag, instance_id, element_uuid),
"change_types": list(change_types or []),
"added_terminal_uuids": list(added_terminal_uuids or []),
"removed_terminal_uuids": list(removed_terminal_uuids or []),
"previous_display_tag": (previous_display_tag or "").strip(),
"previous_model_path": (previous_model_path or "").strip(),
"resolved_model_path": (resolved_model_path or "").strip(),
}
def _update_device_group_metadata(device_group, root_group, element_uuid, instance_id, model_path, display_tag):
if device_group is None:
device_group = doc.addObject(
"App::Part",
DEVICE_GROUP_PREFIX + _safe_token(element_uuid),
)
created_now = True
return
if device_group not in getattr(root_group, "Group", []):
root_group.addObject(device_group)
current_element_uuid = getattr(device_group, "QetElementUuid", "").strip()
current_instance_id = getattr(device_group, "QetInstanceId", "").strip()
current_model_path = getattr(device_group, "QetResolvedModelPath", "").strip()
device_group.Label = _device_label_text(display_tag, instance_id, element_uuid)
final_element_uuid = (element_uuid or "").strip() or current_element_uuid
final_instance_id = (instance_id or "").strip() or current_instance_id
final_model_path = (model_path or "").strip() or current_model_path
final_display_tag = (display_tag or "").strip()
device_group.Label = _device_label_text(
final_display_tag,
final_instance_id,
final_element_uuid,
)
_ensure_string_property(
device_group,
"QetElementUuid",
"QET Exchange",
"2D element UUID from QET",
element_uuid,
final_element_uuid,
)
_ensure_string_property(
device_group,
"QetInstanceId",
"QET Exchange",
"3D instance id from QET/FreeCAD exchange",
instance_id,
final_instance_id,
)
_ensure_string_property(
device_group,
"QetResolvedModelPath",
"QET Exchange",
"Resolved local model path from QET exchange",
model_path,
final_model_path,
)
_ensure_string_property(
device_group,
"QetDisplayTag",
"QET Exchange",
"2D display tag from QET exchange",
final_display_tag,
)
_ensure_string_property(
device_group,
"QetProjectUuid",
"QET Exchange",
"Project UUID from QET exchange",
getattr(root_group, "QetProjectUuid", "").strip(),
)
def _ensure_device_group(doc, root_group, element_uuid, instance_id, model_path, display_tag, layout_index):
created_now = False
device_group = _find_device_group_by_instance_id(doc, instance_id)
if device_group is None:
device_group = _find_device_group(doc, element_uuid)
if device_group is not None and getattr(device_group, "TypeId", "") != "App::Part":
_remove_object_tree(doc, device_group)
device_group = None
if device_group is None:
group_token = (
(element_uuid or "").strip()
or (instance_id or "").strip()
or (display_tag or "").strip()
or "device-{0}".format(layout_index)
)
device_group = doc.addObject(
"App::Part",
DEVICE_GROUP_PREFIX + _safe_token(group_token),
)
created_now = True
if device_group not in getattr(root_group, "Group", []):
root_group.addObject(device_group)
_update_device_group_metadata(
device_group,
root_group,
element_uuid,
instance_id,
model_path,
display_tag,
)
_ensure_bool_property(
@ -647,6 +846,14 @@ def _existing_group_objects(doc, group):
return result
def _existing_model_objects(doc, group):
return [
child
for child in _existing_group_objects(doc, group)
if not _is_exchange_sidecar_group(child)
]
def _is_exchange_sidecar_group(obj):
child_name = _object_name(obj)
if child_name.startswith(TERMINAL_GROUP_PREFIX) or child_name.startswith(WIRE_GROUP_PREFIX):
@ -785,9 +992,21 @@ def _supported_for_import(model_path):
}
def _import_model_into_group(doc, device_group, model_path, merge=False, use_link_group=True):
def _import_model_into_group(
doc,
device_group,
model_path,
merge=False,
use_link_group=True,
source_doc_cache=None,
):
if Path(model_path).suffix.lower() == ".fcstd":
return _import_fcstd_into_group(doc, device_group, model_path)
return _import_fcstd_into_group(
doc,
device_group,
model_path,
source_doc_cache=source_doc_cache,
)
before_names = _existing_object_names(doc)
try:
@ -812,26 +1031,84 @@ def _import_model_into_group(doc, device_group, model_path, merge=False, use_lin
return top_level_objects
def _open_fcstd_source_document(model_path):
def _open_fcstd_source_document(model_path, source_doc_cache=None):
normalized_target = os.path.normcase(os.path.normpath(model_path))
if source_doc_cache is not None:
cached_entry = source_doc_cache.get(normalized_target)
if cached_entry is not None:
cached_doc = cached_entry.get("doc")
if cached_doc is not None:
_append_debug_log(
"DeviceImport _open_fcstd_source_document cache hit: name={0}, path={1}, objects={2}, should_close={3}".format(
getattr(cached_doc, "Name", ""),
getattr(cached_doc, "FileName", "") or model_path,
len(list(getattr(cached_doc, "Objects", []) or [])),
bool(cached_entry.get("should_close")),
)
)
return cached_doc, False
for candidate in App.listDocuments().values():
candidate_path = getattr(candidate, "FileName", "") or ""
if candidate_path and os.path.normcase(os.path.normpath(candidate_path)) == normalized_target:
_append_debug_log(
"DeviceImport _open_fcstd_source_document reusing open source doc: name={0}, path={1}, objects={2}".format(
getattr(candidate, "Name", ""),
candidate_path,
len(list(getattr(candidate, "Objects", []) or [])),
)
)
if source_doc_cache is not None:
source_doc_cache[normalized_target] = {
"doc": candidate,
"should_close": False,
}
return candidate, False
source_doc = App.openDocument(model_path, hidden=True, temporary=True)
_append_debug_log(
"DeviceImport _open_fcstd_source_document opened temp source doc: name={0}, path={1}, objects={2}".format(
getattr(source_doc, "Name", "") if source_doc is not None else "<none>",
getattr(source_doc, "FileName", "") if source_doc is not None else model_path,
len(list(getattr(source_doc, "Objects", []) or [])) if source_doc is not None else -1,
)
)
if source_doc_cache is not None and source_doc is not None:
source_doc_cache[normalized_target] = {
"doc": source_doc,
"should_close": True,
}
return source_doc, False
return source_doc, True
def _import_fcstd_into_group(doc, device_group, model_path):
def _import_fcstd_into_group(doc, device_group, model_path, source_doc_cache=None):
source_doc = None
should_close = False
try:
source_doc, should_close = _open_fcstd_source_document(model_path)
_append_debug_log(
"DeviceImport _import_fcstd_into_group start: target_doc={0}, target_objects={1}, device_group={2}, model_path={3}".format(
getattr(doc, "Name", ""),
len(list(getattr(doc, "Objects", []) or [])),
getattr(device_group, "Name", ""),
model_path,
)
)
source_doc, should_close = _open_fcstd_source_document(
model_path,
source_doc_cache=source_doc_cache,
)
if source_doc is None:
raise DeviceImportError("Cannot open FCStd file")
TemplateSemantics.clear_stored_template_slot_hints(device_group)
top_level_objects = _top_level_document_objects(source_doc)
_append_debug_log(
"DeviceImport _import_fcstd_into_group source ready: source_doc={0}, top_level_objects={1}, should_close={2}".format(
getattr(source_doc, "Name", ""),
len(top_level_objects),
should_close,
)
)
copied_objects = []
for source_obj in top_level_objects:
copied_obj = doc.copyObject(source_obj, True)
@ -853,26 +1130,81 @@ def _import_fcstd_into_group(doc, device_group, model_path):
copied_model_objects,
)
_keep_only_direct_model_children(device_group, direct_model_objects)
_append_debug_log(
"DeviceImport _import_fcstd_into_group completed: copied_objects={0}, direct_model_objects={1}, target_doc_objects={2}".format(
len(copied_objects),
len(direct_model_objects),
len(list(getattr(doc, "Objects", []) or [])),
)
)
return direct_model_objects
finally:
if should_close and source_doc is not None:
try:
_append_debug_log(
"DeviceImport _import_fcstd_into_group closing temp source doc: name={0}, path={1}".format(
getattr(source_doc, "Name", ""),
getattr(source_doc, "FileName", "") or model_path,
)
)
App.closeDocument(source_doc.Name)
except Exception:
pass
_activate_document(doc)
def _close_cached_source_documents(source_doc_cache, target_doc=None):
if not source_doc_cache:
return
closed_count = 0
for normalized_target, cached_entry in list(source_doc_cache.items()):
cached_doc = cached_entry.get("doc")
should_close = bool(cached_entry.get("should_close"))
if not should_close or cached_doc is None:
continue
try:
_append_debug_log(
"DeviceImport _close_cached_source_documents closing cached doc: name={0}, path={1}".format(
getattr(cached_doc, "Name", ""),
getattr(cached_doc, "FileName", "") or normalized_target,
)
)
App.closeDocument(cached_doc.Name)
closed_count += 1
except Exception as exc:
_append_debug_log(
"DeviceImport _close_cached_source_documents failed: name={0}, error={1}".format(
getattr(cached_doc, "Name", ""),
exc,
)
)
source_doc_cache.clear()
_append_debug_log(
"DeviceImport _close_cached_source_documents completed: closed_count={0}".format(
closed_count
)
)
if target_doc is not None:
_activate_document(target_doc)
def _model_index(payload):
index = {}
for item in payload.get("device_models", []):
element_uuid = item.get("element_uuid", "").strip()
instance_id = (
(item.get("device_instance_id") or "").strip()
or (item.get("instance_id") or "").strip()
)
element_uuid = (item.get("element_uuid") or "").strip()
if instance_id and instance_id not in index:
index[instance_id] = item
if element_uuid and element_uuid not in index:
index[element_uuid] = item
return index
def _import_cabinet_model(doc, root_group, cabinet, report):
def _import_cabinet_model(doc, root_group, cabinet, report, source_doc_cache=None):
if not isinstance(cabinet, dict):
return
@ -934,6 +1266,7 @@ def _import_cabinet_model(doc, root_group, cabinet, report):
resolved_scene_path,
merge=False,
use_link_group=True,
source_doc_cache=source_doc_cache,
)
report["cabinet_imported"] += 1
if had_existing_model:
@ -960,6 +1293,7 @@ def import_devices_from_payload(payload, scene_path=""):
project_uuid = (payload.get("project_uuid") or "").strip()
root_group = _ensure_root_group(doc, cabinet, project_uuid)
models_by_element = _model_index(payload)
source_doc_cache = {}
report = {
"document_name": doc.Name,
@ -967,6 +1301,10 @@ def import_devices_from_payload(payload, scene_path=""):
"total_devices": 0,
"imported_devices": 0,
"updated_devices": 0,
"reused_devices": 0,
"added_device_details": [],
"updated_device_details": [],
"reused_device_details": [],
"imported_without_instance_id": 0,
"skipped_missing_model": 0,
"skipped_missing_file": 0,
@ -983,114 +1321,281 @@ def import_devices_from_payload(payload, scene_path=""):
"warnings": [],
}
_import_cabinet_model(doc, root_group, cabinet, report)
for index, device in enumerate(payload.get("devices", [])):
report["total_devices"] += 1
element_uuid = device.get("element_uuid", "").strip()
instance_id = (device.get("instance_id") or "").strip()
display_tag = (device.get("display_tag") or "").strip()
model_info = models_by_element.get(element_uuid, {})
resolved_model_path = _native_path(model_info.get("resolved_model_path", ""))
_append_debug_log(
"DeviceImport device element_uuid={0}, instance_id={1}, display_tag={2}, resolved_model_path={3}".format(
element_uuid, instance_id, display_tag, resolved_model_path
)
try:
_import_cabinet_model(
doc,
root_group,
cabinet,
report,
source_doc_cache=source_doc_cache,
)
if not resolved_model_path:
report["skipped_missing_model"] += 1
report["warnings"].append(
"{0} 缺少 resolved_model_path已跳过。".format(
_device_warning_subject(display_tag, element_uuid)
for index, device in enumerate(payload.get("devices", [])):
report["total_devices"] += 1
original_instance_id = _payload_device_instance_id(device)
instance_id = original_instance_id
element_uuid = _payload_device_element_uuid(device)
display_tag = (device.get("display_tag") or "").strip()
payload_terminal_uuids = _payload_terminal_uuid_set(device)
existing_device_group = _find_device_group_by_instance_id(doc, instance_id)
if existing_device_group is None:
existing_device_group = _find_device_group(doc, element_uuid)
previous_display_tag = ""
previous_path = ""
existing_terminal_uuids = set()
existing_model_objects = []
if existing_device_group is not None:
previous_display_tag = getattr(
existing_device_group,
"QetDisplayTag",
"",
).strip()
previous_path = getattr(
existing_device_group,
"QetResolvedModelPath",
"",
).strip()
existing_terminal_uuids = _existing_qet_terminal_uuids(
existing_device_group
)
existing_model_objects = _existing_model_objects(
doc, existing_device_group
)
model_info = models_by_element.get(instance_id or element_uuid, {})
resolved_model_path = _native_path(model_info.get("resolved_model_path", ""))
_append_debug_log(
"DeviceImport device instance_id={0}, display_tag={1}, resolved_model_path={2}".format(
instance_id, display_tag, resolved_model_path
)
)
continue
if not os.path.isfile(resolved_model_path):
report["skipped_missing_file"] += 1
report["warnings"].append(
"{0} 的模型文件不存在:{1}".format(
_device_warning_subject(display_tag, element_uuid),
resolved_model_path,
if not resolved_model_path:
display_tag_changed = bool(
existing_device_group is not None
and previous_display_tag != display_tag
)
)
continue
if existing_device_group is not None:
_update_device_group_metadata(
existing_device_group,
root_group,
element_uuid,
instance_id,
previous_path,
display_tag,
)
if display_tag_changed:
report["updated_devices"] += 1
report["updated_device_details"].append(
_device_change_detail(
display_tag,
(instance_id or getattr(existing_device_group, "QetInstanceId", "")).strip(),
element_uuid=element_uuid,
change_types=["标注"],
previous_display_tag=previous_display_tag,
previous_model_path=previous_path,
resolved_model_path=previous_path,
)
)
report["skipped_missing_model"] += 1
report["warnings"].append(
"{0} 缺少 resolved_model_path已跳过。".format(
_device_warning_subject(display_tag, instance_id)
)
)
continue
if not _supported_for_import(resolved_model_path):
report["skipped_unsupported_format"] += 1
report["warnings"].append(
"{0} 的模型格式暂不支持:{1}".format(
_device_warning_subject(display_tag, element_uuid),
resolved_model_path,
if not os.path.isfile(resolved_model_path):
report["skipped_missing_file"] += 1
report["warnings"].append(
"{0} 的模型文件不存在:{1}".format(
_device_warning_subject(display_tag, instance_id),
resolved_model_path,
)
)
)
continue
continue
existing_group = _find_device_group(doc, element_uuid)
if not instance_id:
existing_instance_id = ""
if existing_group is not None:
existing_instance_id = getattr(existing_group, "QetInstanceId", "").strip()
instance_id = existing_instance_id or _generate_instance_id(project_uuid, element_uuid)
report.setdefault("generated_instance_ids", 0)
report["generated_instance_ids"] += 1
device_group, created_now = _ensure_device_group(
doc,
root_group,
element_uuid,
instance_id,
resolved_model_path,
display_tag,
index,
)
_clear_group_contents(doc, device_group)
if not _supported_for_import(resolved_model_path):
report["skipped_unsupported_format"] += 1
report["warnings"].append(
"{0} 的模型格式暂不支持:{1}".format(
_device_warning_subject(display_tag, instance_id),
resolved_model_path,
)
)
continue
try:
_append_debug_log(
"DeviceImport importing model for element_uuid={0}: {1}".format(
element_uuid, resolved_model_path
if not instance_id:
instance_id = _generate_instance_id(
project_uuid, display_tag or element_uuid or "device-{0}".format(index)
)
report.setdefault("generated_instance_ids", 0)
report["generated_instance_ids"] += 1
device_group, created_now = _ensure_device_group(
doc,
root_group,
element_uuid,
instance_id,
resolved_model_path,
display_tag,
index,
)
_import_model_into_group(doc, device_group, resolved_model_path)
_append_debug_log(
"DeviceImport import succeeded for element_uuid={0}".format(element_uuid)
same_source = (
_normalized_path_key(previous_path)
== _normalized_path_key(resolved_model_path)
)
except Exception as exc:
report["skipped_import_error"] += 1
report["warnings"].append(
"{0} 导入失败:{1}".format(
_device_warning_subject(display_tag, element_uuid),
exc,
)
added_terminal_uuids = sorted(
payload_terminal_uuids - existing_terminal_uuids
)
_append_debug_log(
"DeviceImport import failed for element_uuid={0}: {1}".format(
element_uuid, exc
)
removed_terminal_uuids = sorted(
existing_terminal_uuids - payload_terminal_uuids
)
continue
terminals_changed = bool(
added_terminal_uuids or removed_terminal_uuids
)
display_tag_changed = (
not created_now and previous_display_tag != display_tag
)
model_changed = (
not created_now
and (not existing_model_objects or not same_source)
)
if existing_model_objects and same_source:
if display_tag_changed or terminals_changed:
change_types = []
if display_tag_changed:
change_types.append("标注")
if terminals_changed:
change_types.append("端子")
report["updated_devices"] += 1
report["updated_device_details"].append(
_device_change_detail(
display_tag,
instance_id,
element_uuid=element_uuid,
change_types=change_types,
added_terminal_uuids=added_terminal_uuids,
removed_terminal_uuids=removed_terminal_uuids,
previous_display_tag=previous_display_tag,
previous_model_path=previous_path,
resolved_model_path=resolved_model_path,
)
)
_append_debug_log(
"DeviceImport import skipped: metadata-only change for instance_id={0}, display_tag_changed={1}, added_terminals={2}, removed_terminals={3}".format(
instance_id,
display_tag_changed,
len(added_terminal_uuids),
len(removed_terminal_uuids),
)
)
continue
report["reused_devices"] += 1
report["reused_device_details"].append(
_device_change_detail(
display_tag,
instance_id,
element_uuid=element_uuid,
previous_display_tag=previous_display_tag,
previous_model_path=previous_path,
resolved_model_path=resolved_model_path,
)
)
_append_debug_log(
"DeviceImport import skipped: reused existing device group for instance_id={0}, model_path={1}, existing_model_objects={2}".format(
instance_id,
resolved_model_path,
len(existing_model_objects),
)
)
continue
if created_now or existing_group is None:
report["imported_devices"] += 1
else:
report["updated_devices"] += 1
_clear_group_contents(doc, device_group)
try:
_append_debug_log(
"DeviceImport importing model for device_instance_id={0}: {1}".format(
instance_id, resolved_model_path
)
)
_import_model_into_group(
doc,
device_group,
resolved_model_path,
source_doc_cache=source_doc_cache,
)
_append_debug_log(
"DeviceImport import succeeded for device_instance_id={0}".format(
instance_id
)
)
except Exception as exc:
report["skipped_import_error"] += 1
report["warnings"].append(
"{0} 导入失败:{1}".format(
_device_warning_subject(display_tag, element_uuid or instance_id),
exc,
)
)
_append_debug_log(
"DeviceImport import failed for device_instance_id={0}: {1}".format(
instance_id, exc
)
)
continue
if not instance_id:
report["imported_without_instance_id"] += 1
if created_now:
report["imported_devices"] += 1
report["added_device_details"].append(
_device_change_detail(
display_tag,
instance_id,
element_uuid=element_uuid,
previous_display_tag=previous_display_tag,
previous_model_path=previous_path,
resolved_model_path=resolved_model_path,
)
)
else:
report["updated_devices"] += 1
change_types = []
if display_tag_changed:
change_types.append("标注")
if model_changed:
change_types.append("3D模型")
if terminals_changed:
change_types.append("端子")
if not change_types:
change_types.append("3D模型")
report["updated_device_details"].append(
_device_change_detail(
display_tag,
instance_id,
element_uuid=element_uuid,
change_types=change_types,
added_terminal_uuids=added_terminal_uuids,
removed_terminal_uuids=removed_terminal_uuids,
previous_display_tag=previous_display_tag,
previous_model_path=previous_path,
resolved_model_path=resolved_model_path,
)
)
if not original_instance_id:
report["imported_without_instance_id"] += 1
finally:
_close_cached_source_documents(source_doc_cache, target_doc=doc)
doc.recompute()
try:
Gui.SendMsgToActiveView("ViewFit")
except Exception:
pass
_append_debug_log("DeviceImport ViewFit skipped during exchange import")
_append_debug_log(
"DeviceImport finished: cabinet_imported={0}, imported={1}, updated={2}, skipped_missing_model={3}, skipped_missing_file={4}, skipped_import_error={5}".format(
"DeviceImport finished: cabinet_imported={0}, imported={1}, updated={2}, reused={3}, skipped_missing_model={4}, skipped_missing_file={5}, skipped_import_error={6}".format(
report["cabinet_imported"],
report["imported_devices"],
report["updated_devices"],
report["reused_devices"],
report["skipped_missing_model"],
report["skipped_missing_file"],
report["skipped_import_error"],

@ -2,6 +2,7 @@ import json
import traceback
import os
from pathlib import Path
from datetime import datetime
import FreeCAD as App
import FreeCADGui as Gui
@ -74,7 +75,12 @@ def _append_debug_log(message):
log_path = _debug_log_path()
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as handle:
handle.write(message + "\n")
handle.write(
"[{0}] {1}\n".format(
datetime.now().astimezone().isoformat(timespec="seconds"),
message,
)
)
except Exception:
pass
@ -97,7 +103,84 @@ def _get_main_window():
def _show_info(title, message):
QtWidgets.QMessageBox.information(_get_main_window(), title, message)
_append_debug_log(
"_show_info requested: title={0}, message_length={1}".format(
title, len(message or "")
)
)
parent = _get_main_window()
try:
if all(
hasattr(QtWidgets, attr)
for attr in (
"QDialog",
"QVBoxLayout",
"QLabel",
"QPlainTextEdit",
"QDialogButtonBox",
)
):
dialog = QtWidgets.QDialog(parent)
dialog.setWindowTitle(title)
dialog.setModal(False)
dialog.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
dialog.resize(980, 760)
layout = QtWidgets.QVBoxLayout(dialog)
title_label = QtWidgets.QLabel(title, dialog)
layout.addWidget(title_label)
info_label = QtWidgets.QLabel("同步完成,详细信息如下。", dialog)
layout.addWidget(info_label)
details_box = QtWidgets.QPlainTextEdit(dialog)
details_box.setReadOnly(True)
details_box.setPlainText(message or "")
details_box.setMinimumSize(920, 640)
layout.addWidget(details_box)
button_box = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.Ok,
parent=dialog,
)
button_box.accepted.connect(dialog.accept)
layout.addWidget(button_box)
dialog.show()
dialog.raise_()
dialog.activateWindow()
_append_debug_log("_show_info displayed as resizable non-modal dialog")
return
dialog = QtWidgets.QMessageBox(parent)
dialog.setIcon(QtWidgets.QMessageBox.Information)
dialog.setWindowTitle(title)
dialog.setText(title)
dialog.setInformativeText("同步完成,详细信息见下方。")
dialog.setDetailedText(message or "")
dialog.setStandardButtons(QtWidgets.QMessageBox.Ok)
dialog.setModal(False)
dialog.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
dialog.show()
dialog.raise_()
dialog.activateWindow()
try:
details_box = dialog.findChild(QtWidgets.QTextEdit)
if details_box is not None:
details_box.setMinimumSize(860, 560)
dialog.resize(960, 720)
except Exception:
pass
_append_debug_log("_show_info displayed as fallback message box")
except Exception as exc:
_append_debug_log("_show_info failed: {0}".format(exc))
try:
App.Console.PrintMessage(
"[FreeCADExchange] {0}\n{1}\n".format(title, message or "")
)
except Exception:
pass
def _show_error(title, message):
@ -131,6 +214,85 @@ def _has_tree_widget_parent(widget):
return False
def _doc_name(doc):
if doc is None:
return "<none>"
return getattr(doc, "Name", "") or "<unnamed>"
def _doc_path(doc):
if doc is None:
return ""
return getattr(doc, "FileName", "") or ""
def _doc_object_count(doc):
if doc is None:
return -1
try:
return len(list(getattr(doc, "Objects", []) or []))
except Exception:
return -1
def _open_document_descriptions():
descriptions = []
try:
documents = App.listDocuments()
except Exception as exc:
return ["<failed to list documents: {0}>".format(exc)]
for doc in documents.values():
descriptions.append(
"{0}|objects={1}|path={2}".format(
_doc_name(doc),
_doc_object_count(doc),
_doc_path(doc) or "<unsaved>",
)
)
return descriptions
def _log_document_state(stage, doc=None, include_open_docs=False):
active_doc = None
try:
active_doc = App.ActiveDocument
except Exception:
active_doc = None
target_doc = doc if doc is not None else active_doc
_append_debug_log(
"{0}: active_doc={1}, active_path={2}, active_objects={3}, target_doc={4}, target_path={5}, target_objects={6}".format(
stage,
_doc_name(active_doc),
_doc_path(active_doc) or "<unsaved>",
_doc_object_count(active_doc),
_doc_name(target_doc),
_doc_path(target_doc) or "<unsaved>",
_doc_object_count(target_doc),
)
)
if include_open_docs:
_append_debug_log(
"{0}: open_docs={1}".format(
stage,
"; ".join(_open_document_descriptions()) or "<none>",
)
)
if target_doc is not None:
try:
root = target_doc.getObject("QETExchangeDevices")
except Exception:
root = None
_append_debug_log(
"{0}: target_root_group={1}, root_children={2}".format(
stage,
getattr(root, "Name", "") if root is not None else "<missing>",
len(list(getattr(root, "Group", []) or [])) if root is not None else 0,
)
)
class _DeviceTreeDoubleClickFilter(QtCore.QObject):
def eventFilter(self, watched, event):
try:
@ -312,13 +474,22 @@ def _require_string(payload, field_name):
return value.strip()
def _normalize_instance_id(item):
value = item.get("instance_id", "")
def _normalize_instance_id(item, *field_names):
if not field_names:
field_names = ("instance_id",)
value = ""
matched_field = field_names[0]
for field_name in field_names:
if field_name in item:
matched_field = field_name
value = item.get(field_name, "")
break
if value is None:
return ""
if not isinstance(value, str):
raise ExchangeValidationError(
"Field 'instance_id' must be a string when present."
"Field '{0}' must be a string when present.".format(matched_field)
)
return value.strip()
@ -334,7 +505,21 @@ def _normalize_devices(payload):
raise ExchangeValidationError(
"Device entry #{0} must be an object.".format(index)
)
element_uuid = _require_string(item, "element_uuid")
entry_label = "device entry #{0}".format(index)
if "instance_id" in item:
raise ExchangeValidationError(
"Field 'instance_id' in {0} is no longer supported. Use 'device_instance_id'.".format(
entry_label
)
)
if "element_uuid" in item:
raise ExchangeValidationError(
"Field 'element_uuid' in {0} is no longer supported at device level. Put element_uuid in devices[].terminals[].".format(
entry_label
)
)
element_uuid = ""
device_instance_id = _require_string(item, "device_instance_id")
display_tag = item.get("display_tag", "")
if display_tag and not isinstance(display_tag, str):
raise ExchangeValidationError(
@ -351,6 +536,7 @@ def _normalize_devices(payload):
)
normalized_terminals = []
device_element_uuids = []
for terminal_index, terminal_item in enumerate(device_terminals):
terminal_entry_label = "device entry #{0} terminal entry #{1}".format(
index, terminal_index
@ -359,26 +545,52 @@ def _normalize_devices(payload):
raise ExchangeValidationError(
"{0} must be an object.".format(terminal_entry_label.capitalize())
)
if "instance_id" in terminal_item or "device_instance_id" in terminal_item:
raise ExchangeValidationError(
"{0} must not carry device instance fields. The parent device's device_instance_id is authoritative.".format(
terminal_entry_label.capitalize()
)
)
terminal_uuid = _require_string(terminal_item, "terminal_uuid")
terminal_element_uuid = _optional_string(
terminal_item, "element_uuid", terminal_entry_label
) or element_uuid
)
if not terminal_element_uuid:
raise ExchangeValidationError(
"{0} is missing element_uuid.".format(
terminal_entry_label.capitalize()
)
)
if terminal_element_uuid and terminal_element_uuid not in device_element_uuids:
device_element_uuids.append(terminal_element_uuid)
normalized_terminals.append(
{
"terminal_uuid": terminal_uuid,
"instance_id": _normalize_instance_id(terminal_item)
or _normalize_instance_id(item),
"instance_id": device_instance_id,
"element_uuid": terminal_element_uuid,
"terminal_display": _optional_string(
terminal_item, "terminal_display", terminal_entry_label
),
"slot_name_hint": _optional_string(
terminal_item, "slot_name_hint", terminal_entry_label
),
"terminal_label": _optional_string(
terminal_item, "terminal_label", terminal_entry_label
),
"terminal_instance_id": _optional_string(
terminal_item, "terminal_instance_id", terminal_entry_label
),
}
)
if not element_uuid and device_element_uuids:
element_uuid = device_element_uuids[0]
normalized.append(
{
"element_uuid": element_uuid,
"instance_id": _normalize_instance_id(item),
"element_uuids": list(device_element_uuids),
"instance_id": device_instance_id,
"display_tag": display_tag.strip() if isinstance(display_tag, str) else "",
"terminals": normalized_terminals,
}
@ -390,10 +602,38 @@ def _normalize_terminals(devices):
normalized = []
for device in devices:
for terminal in device.get("terminals", []) or []:
normalized.append(dict(terminal))
entry = dict(terminal)
if not entry.get("instance_id"):
entry["instance_id"] = device.get("instance_id", "")
normalized.append(entry)
return normalized
def _normalize_top_level_terminals(payload):
if "terminals" in payload:
raise ExchangeValidationError(
"Field 'terminals' at the JSON root is no longer supported. Use devices[].terminals[]."
)
return []
def _merge_terminal_entries(*terminal_groups):
merged = []
seen = set()
for terminal_group in terminal_groups:
for item in terminal_group:
key = (
item.get("terminal_uuid", ""),
item.get("element_uuid", ""),
item.get("instance_id", ""),
)
if key in seen:
continue
seen.add(key)
merged.append(item)
return merged
def _optional_string(item, field_name, entry_label):
value = item.get(field_name, "")
if value is None:
@ -482,7 +722,21 @@ def _normalize_device_models(payload):
raise ExchangeValidationError(
"Device model entry #{0} must be an object.".format(index)
)
element_uuid = _require_string(item, "element_uuid")
entry_label = "device model entry #{0}".format(index)
if "instance_id" in item:
raise ExchangeValidationError(
"Field 'instance_id' in {0} is no longer supported. Use 'device_instance_id'.".format(
entry_label
)
)
if "element_uuid" in item:
raise ExchangeValidationError(
"Field 'element_uuid' in {0} is no longer supported. Use 'device_instance_id'.".format(
entry_label
)
)
element_uuid = ""
instance_id = _require_string(item, "device_instance_id")
parts_3d = item.get("parts_3d", "")
if parts_3d and not isinstance(parts_3d, str):
raise ExchangeValidationError(
@ -509,6 +763,7 @@ def _normalize_device_models(payload):
normalized.append(
{
"element_uuid": element_uuid,
"instance_id": instance_id,
"device_id": device_id,
"parts_3d": parts_3d.strip() if isinstance(parts_3d, str) else "",
"resolved_model_path": (
@ -583,6 +838,11 @@ def load_exchange_payload(json_path):
normalized_devices = _normalize_devices(payload)
normalized_terminals = _merge_terminal_entries(
_normalize_terminals(normalized_devices),
_normalize_top_level_terminals(payload),
)
normalized = {
"schema_version": schema_version.strip(),
"project_uuid": project_uuid,
@ -590,7 +850,7 @@ def load_exchange_payload(json_path):
"source": payload.get("source", {}),
"cabinet": _normalize_cabinet(payload),
"devices": normalized_devices,
"terminals": _normalize_terminals(normalized_devices),
"terminals": normalized_terminals,
"device_models": _normalize_device_models(payload),
"wires": _normalize_wires(payload),
}
@ -658,9 +918,15 @@ def _import_wiring_tasks(payload):
return None
DEFAULT_SCENE_FILE_NAME = "QETScene.FCStd"
def _scene_path_from_exchange_context():
scene_path = os.environ.get(ENV_SCENE_PATH, "").strip()
if scene_path:
_append_debug_log(
"_scene_path_from_exchange_context using env path: {0}".format(scene_path)
)
return scene_path
json_path = os.environ.get(ENV_JSON_PATH, "").strip()
@ -668,18 +934,42 @@ def _scene_path_from_exchange_context():
return ""
exchange_dir = Path(json_path).parent
for file_name in ("QETScene.FCStd", "scene.FCStd"):
candidate = exchange_dir / file_name
if candidate.is_file():
resolved = str(candidate)
os.environ[ENV_SCENE_PATH] = resolved
_append_debug_log(
"QET_FREECAD_SCENE_FILE inferred from exchange directory: {0}".format(
resolved
)
)
return resolved
return ""
candidate = exchange_dir / DEFAULT_SCENE_FILE_NAME
if candidate.is_file():
os.environ[ENV_SCENE_PATH] = str(candidate)
_append_debug_log(
"QET_FREECAD_SCENE_FILE found: {0}".format(str(candidate))
)
return str(candidate)
# No existing scene file -> first time open
default_scene = str(exchange_dir / DEFAULT_SCENE_FILE_NAME)
_append_debug_log(
"No existing scene file, first open mode: default_scene={0}".format(default_scene)
)
return default_scene
def _is_scene_first_open(scene_path):
"""Return True if this is the first time the 2D/3D exchange has run for this project.
Uses 3d_to_2d.json (always written by ExchangeWriteBack after every import)
rather than the .FCStd file, since the user might not have saved the scene.
"""
if not scene_path:
_append_debug_log("_is_scene_first_open: scene_path empty -> True")
return True
writeback_path = Path(scene_path).parent / "3d_to_2d.json"
is_first_open = not writeback_path.is_file()
_append_debug_log(
"_is_scene_first_open: scene_path={0}, writeback_path={1}, exists={2}, result={3}".format(
scene_path,
str(writeback_path),
writeback_path.is_file(),
is_first_open,
)
)
return is_first_open
def _mark_stale_objects(payload):
@ -687,11 +977,43 @@ def _mark_stale_objects(payload):
_append_debug_log("stale object sync skipped: StaleObjectSync module unavailable")
return None
# Diagnostic: count device groups in doc and element_uuids in payload
doc = App.ActiveDocument
if doc is not None:
try:
doc_device_count = sum(
1 for _ in StaleObjectSync._iter_device_groups(doc)
)
except Exception:
doc_device_count = -1
payload_device_count = len(payload.get("devices", []) or [])
_append_debug_log(
"stale sync diagnostic: doc_device_groups={0}, payload_devices={1}".format(
doc_device_count, payload_device_count
)
)
# Log each payload device element_uuid for comparison
for item in (payload.get("devices", []) or [])[:10]:
_append_debug_log(
" payload device: element_uuid={0}, instance_id={1}".format(
item.get("element_uuid", ""), item.get("instance_id", "")
)
)
try:
return StaleObjectSync.mark_stale_objects_from_payload(
result = StaleObjectSync.mark_stale_objects_from_payload(
payload,
App.ActiveDocument,
doc,
)
_append_debug_log(
"stale sync result: active_devices={0}, stale_devices={1}, active_cabinets={2}, stale_cabinets={3}".format(
result.get("active_devices", 0),
result.get("stale_devices", 0),
result.get("active_cabinets", 0),
result.get("stale_cabinets", 0),
)
)
return result
except Exception as exc:
_append_debug_log("stale object sync failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
@ -704,16 +1026,56 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac
]
if import_report or stale_report:
lines.extend(
[
"",
"同步结果:",
"新增机柜:{0}".format(import_report.get("cabinet_added", 0) if import_report else 0),
"失效机柜:{0}".format(stale_report.get("stale_cabinets", 0) if stale_report else 0),
"新增设备:{0}".format(import_report.get("imported_devices", 0) if import_report else 0),
"失效设备:{0}".format(stale_report.get("stale_devices", 0) if stale_report else 0),
]
)
lines.append("")
updated_device_details = import_report.get("updated_device_details", []) if import_report else []
if summary.get("is_first_open"):
lines.extend(
[
"同步模式:首次打开(全量导入)",
"新增机柜:{0}".format(import_report.get("cabinet_added", 0) if import_report else 0),
"新增设备:{0}".format(import_report.get("imported_devices", 0) if import_report else 0),
"更新设备:{0}".format(import_report.get("updated_devices", 0) if import_report else 0),
]
)
else:
lines.extend(
[
"同步模式:再次打开(增量更新)",
"新增机柜:{0}".format(import_report.get("cabinet_added", 0) if import_report else 0),
"失效机柜:{0}".format(stale_report.get("stale_cabinets", 0) if stale_report else 0),
"新增设备:{0}".format(import_report.get("imported_devices", 0) if import_report else 0),
"更新设备:{0}".format(import_report.get("updated_devices", 0) if import_report else 0),
"失效设备:{0}".format(stale_report.get("stale_devices", 0) if stale_report else 0),
]
)
if updated_device_details:
lines.append("修改设备:")
for item in updated_device_details[:5]:
change_types = " + ".join(item.get("change_types", []) or []) or "未知变化"
detail_bits = []
if "标注" in (item.get("change_types", []) or []):
previous_display_tag = item.get("previous_display_tag", "") or "<empty>"
current_display_tag = item.get("display_tag", "") or "<empty>"
detail_bits.append(
"标注 {0} -> {1}".format(previous_display_tag, current_display_tag)
)
if "端子" in (item.get("change_types", []) or []):
added_terms = item.get("added_terminal_uuids", []) or []
removed_terms = item.get("removed_terminal_uuids", []) or []
detail_bits.append("+{0}/-{1} 端子".format(len(added_terms), len(removed_terms)))
detail_suffix = ""
if detail_bits:
detail_suffix = " ({0})".format(", ".join(detail_bits))
lines.append(
"- {0} [{1}] -> {2}{3}".format(
item.get("label", "") or item.get("display_tag", "") or item.get("instance_id", ""),
item.get("instance_id", "") or "<empty>",
change_types,
detail_suffix,
)
)
if len(updated_device_details) > 5:
lines.append("- ... ({0} more)".format(len(updated_device_details) - 5))
lines.extend(
[
@ -761,7 +1123,10 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac
)
)
if summary["scene_path"]:
lines.append("Scene file: {0}".format(summary["scene_path"]))
if summary.get("is_first_open"):
lines.append("Scene file (new): {0}".format(summary["scene_path"]))
else:
lines.append("Scene file (existing): {0}".format(summary["scene_path"]))
if import_report:
lines.extend(
@ -775,8 +1140,53 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac
"Reused cabinets: {0}".format(import_report.get("cabinet_reused", 0)),
"Imported devices: {0}".format(import_report["imported_devices"]),
"Updated devices: {0}".format(import_report["updated_devices"]),
"Reused devices: {0}".format(import_report.get("reused_devices", 0)),
]
)
added_device_details = import_report.get("added_device_details", [])
if added_device_details:
lines.append("Added device details:")
for item in added_device_details[:10]:
lines.append(
"- {0} [{1}]".format(
item.get("label", "") or item.get("display_tag", "") or item.get("instance_id", ""),
item.get("instance_id", "") or "<empty>",
)
)
if len(added_device_details) > 10:
lines.append("- ... ({0} more)".format(len(added_device_details) - 10))
updated_device_details = import_report.get("updated_device_details", [])
if updated_device_details:
lines.append("Updated device details:")
for item in updated_device_details[:10]:
change_types = " + ".join(item.get("change_types", []) or []) or "未知变化"
terminal_bits = []
added_terms = item.get("added_terminal_uuids", []) or []
removed_terms = item.get("removed_terminal_uuids", []) or []
if "端子" in (item.get("change_types", []) or []):
terminal_bits.append("+{0}".format(len(added_terms)))
terminal_bits.append("-{0}".format(len(removed_terms)))
if "标注" in (item.get("change_types", []) or []):
previous_display_tag = item.get("previous_display_tag", "") or "<empty>"
current_display_tag = item.get("display_tag", "") or "<empty>"
terminal_bits.append(
"标注 {0} -> {1}".format(previous_display_tag, current_display_tag)
)
detail_suffix = ""
if terminal_bits:
detail_suffix = " ({0})".format(", ".join(terminal_bits))
lines.append(
"- {0} [{1}] -> {2}{3}".format(
item.get("label", "") or item.get("display_tag", "") or item.get("instance_id", ""),
item.get("instance_id", "") or "<empty>",
change_types,
detail_suffix,
)
)
if len(updated_device_details) > 10:
lines.append("- ... ({0} more)".format(len(updated_device_details) - 10))
if import_report["imported_without_instance_id"]:
lines.append(
"Imported without instance_id yet: {0}".format(
@ -877,6 +1287,18 @@ def _summary_message(summary, import_report=None, terminal_report=None, writebac
"Stale routed wires: {0}".format(stale_report.get("stale_routed_wires", 0)),
]
)
stale_device_details = stale_report.get("stale_device_details", [])
if stale_device_details:
lines.append("Stale device details:")
for item in stale_device_details[:10]:
lines.append(
"- {0} [{1}]".format(
item.get("label", "") or item.get("display_tag", "") or item.get("instance_id", ""),
item.get("instance_id", "") or "<empty>",
)
)
if len(stale_device_details) > 10:
lines.append("- ... ({0} more)".format(len(stale_device_details) - 10))
lines.append("")
lines.append("This step validates the exchange payload and imports devices with valid resolved model paths.")
@ -888,6 +1310,10 @@ def _run_scheduled_device_import(attempt=0):
_append_debug_log(
"scheduled device import invoked: attempt={0}".format(attempt)
)
_log_document_state(
"scheduled device import state before gui readiness check",
include_open_docs=True,
)
if not _is_gui_ready():
if attempt < IMPORT_READY_MAX_RETRIES:
@ -916,8 +1342,17 @@ def _run_scheduled_device_import(attempt=0):
return
scene_path = _scene_path_from_exchange_context()
is_first_open = _is_scene_first_open(scene_path)
summary["scene_path"] = scene_path
summary["is_first_open"] = is_first_open
_append_debug_log(
"scheduled device import starting with scene_path={0}".format(scene_path)
"scheduled device import: scene_path={0}, is_first_open={1}".format(
scene_path, is_first_open
)
)
_log_document_state(
"scheduled device import before DeviceImport",
include_open_docs=True,
)
try:
import_report = DeviceImport.import_devices_from_payload(payload, scene_path)
@ -946,6 +1381,11 @@ def _run_scheduled_device_import(attempt=0):
import_report["skipped_import_error"],
)
)
_log_document_state(
"scheduled device import after DeviceImport",
App.ActiveDocument,
include_open_docs=True,
)
App.Console.PrintMessage(
"[FreeCADExchange] Loaded exchange payload from {0}\n".format(
@ -973,10 +1413,62 @@ def _run_scheduled_device_import(attempt=0):
)
)
# ── Sync banner ──
cabinet_added = import_report.get("cabinet_added", 0)
if is_first_open:
App.Console.PrintMessage(
"[FreeCADExchange] ========================================\n"
)
App.Console.PrintMessage(
"[FreeCADExchange] 同步模式:首次打开(全量导入)\n"
)
App.Console.PrintMessage(
"[FreeCADExchange] 新增机柜:{0}\n".format(cabinet_added)
)
App.Console.PrintMessage(
"[FreeCADExchange] 新增设备:{0}\n".format(import_report["imported_devices"])
)
App.Console.PrintMessage(
"[FreeCADExchange] ========================================\n"
)
_append_debug_log(
"sync banner: first open, cabinets={0}, devices={1}".format(
cabinet_added, import_report["imported_devices"]
)
)
else:
App.Console.PrintMessage(
"[FreeCADExchange] ========================================\n"
)
App.Console.PrintMessage(
"[FreeCADExchange] 同步模式:再次打开(增量更新)\n"
)
App.Console.PrintMessage(
"[FreeCADExchange] 新增机柜:{0}\n".format(cabinet_added)
)
App.Console.PrintMessage(
"[FreeCADExchange] 新增设备:{0}\n".format(import_report["imported_devices"])
)
App.Console.PrintMessage(
"[FreeCADExchange] 更新设备:{0}\n".format(import_report["updated_devices"])
)
App.Console.PrintMessage(
"[FreeCADExchange] ========================================\n"
)
_append_debug_log(
"sync banner: reopen, cabinets={0}, imported={1}, updated={2}".format(
cabinet_added, import_report["imported_devices"], import_report["updated_devices"]
)
)
if TerminalImport is None:
_append_debug_log("terminal import skipped: TerminalImport module unavailable")
terminal_report = _terminal_report_not_available()
else:
_log_document_state(
"scheduled device import before TerminalImport",
App.ActiveDocument,
)
try:
terminal_report = TerminalImport.import_terminals_from_payload(payload, scene_path)
except TerminalImport.TerminalImportError as exc:
@ -992,22 +1484,50 @@ def _run_scheduled_device_import(attempt=0):
"[FreeCADExchange] Failed to import terminals: {0}\n".format(exc)
)
return
_log_document_state(
"scheduled device import after TerminalImport",
App.ActiveDocument,
)
setattr(App, STATE_TERMINAL_IMPORT_REPORT, terminal_report)
_log_document_state(
"scheduled device import before wiring init",
App.ActiveDocument,
)
_initialize_wiring_scene(payload)
wiring_report = _import_wiring_tasks(payload)
if wiring_report is not None:
setattr(App, STATE_WIRING_IMPORT_REPORT, wiring_report)
_log_document_state(
"scheduled device import after wiring import",
App.ActiveDocument,
)
stale_report = _mark_stale_objects(payload)
if stale_report is not None:
setattr(App, STATE_STALE_SYNC_REPORT, stale_report)
if is_first_open:
_append_debug_log("stale object sync skipped: first open (no prior 3D state to compare)")
stale_report = None
else:
_log_document_state(
"scheduled device import before stale sync",
App.ActiveDocument,
)
stale_report = _mark_stale_objects(payload)
if stale_report is not None:
setattr(App, STATE_STALE_SYNC_REPORT, stale_report)
_log_document_state(
"scheduled device import after stale sync",
App.ActiveDocument,
)
if ExchangeWriteBack is None:
_append_debug_log("write-back skipped: ExchangeWriteBack module unavailable")
writeback_report = None
else:
_log_document_state(
"scheduled device import before write-back",
App.ActiveDocument,
)
try:
writeback_report = ExchangeWriteBack.write_back_document(
App.ActiveDocument, scene_path=scene_path, payload=payload
@ -1018,6 +1538,11 @@ def _run_scheduled_device_import(attempt=0):
writeback_report = None
else:
setattr(App, STATE_WRITEBACK_REPORT, writeback_report)
_log_document_state(
"scheduled device import after write-back",
App.ActiveDocument,
include_open_docs=True,
)
App.Console.PrintMessage(
"[FreeCADExchange] Imported terminals: {0}, updated: {1}, removed: {2}\n".format(
@ -1032,12 +1557,18 @@ def _run_scheduled_device_import(attempt=0):
_summary_message(summary, import_report, terminal_report, writeback_report, wiring_report, stale_report),
)
_append_debug_log("summary dialog shown")
_log_document_state(
"scheduled device import completed",
App.ActiveDocument,
include_open_docs=True,
)
def bootstrap_if_requested():
if not getattr(App, STATE_FLAG, False):
_reset_debug_log()
_append_debug_log("bootstrap_if_requested entered")
_log_document_state("bootstrap_if_requested initial state", include_open_docs=True)
_install_tree_double_click_filter()
if getattr(App, STATE_FLAG, False):
_append_debug_log("bootstrap_if_requested skipped: already bootstrapped")
@ -1052,6 +1583,7 @@ def bootstrap_if_requested():
setattr(App, STATE_FLAG, True)
_append_debug_log("STATE_FLAG set")
_log_document_state("bootstrap_if_requested after STATE_FLAG", include_open_docs=True)
if not os.path.isfile(json_path):
_append_debug_log("exchange file missing: {0}".format(json_path))
@ -1097,3 +1629,7 @@ def bootstrap_if_requested():
QtCore.QTimer.singleShot(
IMPORT_READY_DELAY_MS, lambda: _run_scheduled_device_import(0)
)
_log_document_state(
"bootstrap_if_requested after scheduling import",
include_open_docs=True,
)

@ -4,6 +4,7 @@ import json
import os
from datetime import datetime
from pathlib import Path
import traceback
import FreeCAD as App
@ -31,6 +32,27 @@ def _append_debug_log(message):
pass
def _doc_name(doc):
if doc is None:
return "<none>"
return getattr(doc, "Name", "") or "<unnamed>"
def _doc_path(doc):
if doc is None:
return ""
return getattr(doc, "FileName", "") or ""
def _doc_object_count(doc):
if doc is None:
return -1
try:
return len(list(getattr(doc, "Objects", []) or []))
except Exception:
return -1
def _project_uuid_from_payload(payload):
if isinstance(payload, dict):
value = (payload.get("project_uuid") or "").strip()
@ -126,20 +148,28 @@ def _collect_instance_bindings(doc):
bindings = []
seen = set()
for device_group in _iter_device_groups(doc):
element_uuid = getattr(device_group, "QetElementUuid", "").strip()
instance_id = getattr(device_group, "QetInstanceId", "").strip()
if not element_uuid or not instance_id:
continue
key = (element_uuid, instance_id)
if key in seen:
if not instance_id:
continue
seen.add(key)
bindings.append(
{
"element_uuid": element_uuid,
"instance_id": instance_id,
}
)
element_uuids = set()
group_element_uuid = getattr(device_group, "QetElementUuid", "").strip()
if group_element_uuid:
element_uuids.add(group_element_uuid)
for terminal_obj in _iter_terminal_objects(device_group):
terminal_element_uuid = getattr(terminal_obj, "QetElementUuid", "").strip()
if terminal_element_uuid:
element_uuids.add(terminal_element_uuid)
for element_uuid in sorted(element_uuids):
key = (element_uuid, instance_id)
if key in seen:
continue
seen.add(key)
bindings.append(
{
"element_uuid": element_uuid,
"device_instance_id": instance_id,
}
)
return bindings
@ -166,7 +196,8 @@ def _collect_terminal_bindings(doc):
bindings.append(
{
"terminal_uuid": terminal_uuid,
"instance_id": terminal_instance_id,
"device_instance_id": instance_id,
"terminal_instance_id": terminal_instance_id,
}
)
return bindings
@ -187,6 +218,16 @@ def write_back_document(doc=None, scene_path="", payload=None):
if doc is None:
raise ExchangeWriteBackError("No active FreeCAD document is available.")
_append_debug_log(
"write_back_document starting: doc={0}, path={1}, objects={2}, requested_scene_path={3}, env_json={4}".format(
_doc_name(doc),
_doc_path(doc) or "<unsaved>",
_doc_object_count(doc),
scene_path or "<empty>",
os.environ.get(ENV_JSON_PATH, "").strip() or "<empty>",
)
)
scene_path = _scene_path_from_doc(doc, scene_path)
output_path = _output_path_for_exchange_json() or _output_path_for_scene(scene_path)
if not output_path:
@ -201,7 +242,7 @@ def write_back_document(doc=None, scene_path="", payload=None):
)
report = {
"schema_version": "1.0",
"schema_version": "2.0",
"project_uuid": project_uuid,
"generated_at": _format_timestamp(),
"instances": _collect_instance_bindings(doc),
@ -211,6 +252,15 @@ def write_back_document(doc=None, scene_path="", payload=None):
output_dir = str(Path(output_path).parent)
os.makedirs(output_dir, exist_ok=True)
_append_debug_log(
"write_back_document collected: project_uuid={0}, scene_path={1}, output_path={2}, instance_count={3}, terminal_count={4}".format(
project_uuid,
scene_path or "<empty>",
output_path,
len(report["instances"]),
len(report["terminals"]),
)
)
Path(output_path).write_text(
json.dumps(
{
@ -256,13 +306,61 @@ def _is_exchange_document(doc):
class _WriteBackObserver:
def slotCreatedDocument(self, doc):
_append_debug_log(
"write-back observer slotCreatedDocument: doc={0}, path={1}, objects={2}".format(
_doc_name(doc),
_doc_path(doc) or "<unsaved>",
_doc_object_count(doc),
)
)
def slotDeletedDocument(self, doc):
_append_debug_log(
"write-back observer slotDeletedDocument: doc={0}, path={1}, objects={2}".format(
_doc_name(doc),
_doc_path(doc) or "<unsaved>",
_doc_object_count(doc),
)
)
def slotActivateDocument(self, doc):
_append_debug_log(
"write-back observer slotActivateDocument: doc={0}, path={1}, objects={2}".format(
_doc_name(doc),
_doc_path(doc) or "<unsaved>",
_doc_object_count(doc),
)
)
def slotStartSaveDocument(self, doc, name):
_append_debug_log(
"write-back observer slotStartSaveDocument: doc={0}, doc_path={1}, target_name={2}, objects={3}, exchange_doc={4}".format(
_doc_name(doc),
_doc_path(doc) or "<unsaved>",
name or "<empty>",
_doc_object_count(doc),
_is_exchange_document(doc),
)
)
def slotFinishSaveDocument(self, doc, name):
_append_debug_log(
"write-back observer slotFinishSaveDocument: doc={0}, doc_path={1}, target_name={2}, objects={3}, exchange_doc={4}".format(
_doc_name(doc),
_doc_path(doc) or "<unsaved>",
name or "<empty>",
_doc_object_count(doc),
_is_exchange_document(doc),
)
)
if not _is_exchange_document(doc):
return
try:
write_back_document(doc, scene_path=name)
except Exception as exc:
_append_debug_log("write-back after save failed: {0}".format(exc))
_append_debug_log(traceback.format_exc())
try:
App.Console.PrintError(
"[FreeCADExchange] write-back after save failed: {0}\n".format(exc)
@ -273,6 +371,7 @@ class _WriteBackObserver:
def ensure_document_observer_installed():
if getattr(App, STATE_WRITEBACK_OBSERVER, None) is not None:
_append_debug_log("write-back observer already installed")
return getattr(App, STATE_WRITEBACK_OBSERVER)
observer = _WriteBackObserver()
@ -283,6 +382,9 @@ def ensure_document_observer_installed():
return None
setattr(App, STATE_WRITEBACK_OBSERVER, observer)
_append_debug_log(
"write-back observer installed: observer_id={0}".format(id(observer))
)
return observer

@ -42,9 +42,15 @@ def _set_status(obj, status, reason=""):
)
def _device_report_label(device_group):
display_tag = (getattr(device_group, "QetDisplayTag", "") or "").strip()
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
return display_tag or instance_id or element_uuid or getattr(device_group, "Name", "")
def _payload_identity_sets(payload):
cabinet_instance_ids = set()
device_element_uuids = set()
device_instance_ids = set()
terminal_uuids = set()
wire_uuids = set()
@ -58,12 +64,15 @@ def _payload_identity_sets(payload):
break
for item in payload.get("devices", []) or []:
element_uuid = _string_value(item, "element_uuid")
instance_id = _string_value(item, "instance_id")
if element_uuid:
device_element_uuids.add(element_uuid)
instance_id = _string_value(item, "device_instance_id") or _string_value(
item, "instance_id"
)
if instance_id:
device_instance_ids.add(instance_id)
for terminal in item.get("terminals", []) or []:
terminal_uuid = _string_value(terminal, "terminal_uuid")
if terminal_uuid:
terminal_uuids.add(terminal_uuid)
for item in payload.get("terminals", []) or []:
terminal_uuid = _string_value(item, "terminal_uuid")
@ -81,7 +90,6 @@ def _payload_identity_sets(payload):
return {
"cabinet_instance_ids": cabinet_instance_ids,
"device_element_uuids": device_element_uuids,
"device_instance_ids": device_instance_ids,
"terminal_uuids": terminal_uuids,
"wire_uuids": wire_uuids,
@ -133,11 +141,19 @@ def _iter_device_groups(doc):
def _mark_device(device_group, identity_sets):
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
active = False
if element_uuid and element_uuid in identity_sets["device_element_uuids"]:
active = True
if instance_id and instance_id in identity_sets["device_instance_ids"]:
active = True
active = bool(
instance_id and instance_id in identity_sets["device_instance_ids"]
)
try:
import FreeCAD as _App
_App.Console.PrintMessage(
"[FreeCADExchange] stale check device: name={0}, element_uuid={1}, instance_id={2}, active={3}\n".format(
getattr(device_group, "Name", ""), element_uuid, instance_id, active
)
)
except Exception:
pass
if active:
_set_status(device_group, SYNC_STATUS_ACTIVE)
@ -166,7 +182,11 @@ def _mark_cabinet(cabinet_group, identity_sets):
return "stale"
def _mark_terminals(device_group, identity_sets):
def _mark_terminals(device_group, identity_sets, authoritative_stale_terminal_uuids=None):
"""authoritative_stale_terminal_uuids: QET-side stale list (binding_table - diagram_elements)."""
if authoritative_stale_terminal_uuids is None:
authoritative_stale_terminal_uuids = set()
report = {"active": 0, "stale": 0}
terminal_group = TerminalObjects.find_child_group_by_kind(
device_group,
@ -178,6 +198,14 @@ def _mark_terminals(device_group, identity_sets):
_set_status(terminal, SYNC_STATUS_ACTIVE)
report["active"] += 1
continue
if terminal_uuid and terminal_uuid in authoritative_stale_terminal_uuids:
_set_status(
terminal,
SYNC_STATUS_STALE,
"This 3D terminal was removed from the 2D schematic (QET authoritative).",
)
report["stale"] += 1
continue
if terminal_uuid and terminal_uuid in identity_sets["terminal_uuids"]:
_set_status(terminal, SYNC_STATUS_ACTIVE)
report["active"] += 1
@ -234,11 +262,24 @@ def mark_stale_objects_from_payload(payload, doc=None):
raise RuntimeError("Exchange payload must be an object.")
identity_sets = _payload_identity_sets(payload)
# QET 侧权威失效列表stale = binding_table - diagram_elements
authoritative_stale_instance_ids = set()
authoritative_stale_terminal_uuids = set()
for item in payload.get("stale_devices", []) or []:
instance_id = _string_value(item, "instance_id")
tu = _string_value(item, "terminal_uuid")
if instance_id:
authoritative_stale_instance_ids.add(instance_id)
if tu:
authoritative_stale_terminal_uuids.add(tu)
report = {
"active_cabinets": 0,
"stale_cabinets": 0,
"active_devices": 0,
"stale_devices": 0,
"stale_device_details": [],
"active_terminals": 0,
"stale_terminals": 0,
"active_wire_tasks": 0,
@ -256,13 +297,47 @@ def mark_stale_objects_from_payload(payload, doc=None):
report["stale_cabinets"] += 1
for device_group in _iter_device_groups(doc):
device_status = _mark_device(device_group, identity_sets)
if device_status == "active":
report["active_devices"] += 1
else:
element_uuid = (getattr(device_group, "QetElementUuid", "") or "").strip()
instance_id = (getattr(device_group, "QetInstanceId", "") or "").strip()
instance_is_active = bool(
instance_id and instance_id in identity_sets["device_instance_ids"]
)
if (
not instance_is_active
and (instance_id and instance_id in authoritative_stale_instance_ids)
):
_set_status(
device_group,
SYNC_STATUS_STALE,
"This 3D device was removed from the 2D schematic (QET authoritative).",
)
report["stale_devices"] += 1
terminal_report = _mark_terminals(device_group, identity_sets)
report["stale_device_details"].append(
{
"display_tag": (getattr(device_group, "QetDisplayTag", "") or "").strip(),
"instance_id": instance_id,
"element_uuid": element_uuid,
"label": _device_report_label(device_group),
}
)
else:
device_status = _mark_device(device_group, identity_sets)
if device_status == "active":
report["active_devices"] += 1
else:
report["stale_devices"] += 1
report["stale_device_details"].append(
{
"display_tag": (getattr(device_group, "QetDisplayTag", "") or "").strip(),
"instance_id": instance_id,
"element_uuid": element_uuid,
"label": _device_report_label(device_group),
}
)
terminal_report = _mark_terminals(
device_group, identity_sets, authoritative_stale_terminal_uuids
)
report["active_terminals"] += terminal_report["active"]
report["stale_terminals"] += terminal_report["stale"]

@ -37,7 +37,10 @@ def _normalize_terminal_entry(item, index):
"Terminal entry #{0} is missing terminal_uuid.".format(index)
)
instance_id = (item.get("instance_id") or "").strip()
instance_id = (
(item.get("device_instance_id") or "").strip()
or (item.get("instance_id") or "").strip()
)
element_uuid = (item.get("element_uuid") or "").strip()
terminal_display = (item.get("terminal_display") or "").strip()
slot_name_hint = (
@ -66,13 +69,23 @@ def _payload_device_lookup(payload):
if not isinstance(item, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
instance_id = (item.get("instance_id") or "").strip()
instance_id = (
(item.get("device_instance_id") or "").strip()
or (item.get("instance_id") or "").strip()
)
if instance_id:
by_instance_id.add(instance_id)
element_uuid = (item.get("element_uuid") or "").strip()
if element_uuid:
by_element_uuid.add(element_uuid)
if instance_id:
by_instance_id.add(instance_id)
for terminal in item.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = (terminal.get("element_uuid") or "").strip()
if element_uuid:
by_element_uuid.add(element_uuid)
return {
"element_uuids": by_element_uuid,
@ -82,16 +95,62 @@ def _payload_device_lookup(payload):
def _payload_device_instance_by_element(payload):
result = {}
for item in payload.get("devices", []) or []:
if not isinstance(item, dict):
for device in payload.get("devices", []) or []:
if not isinstance(device, dict):
continue
element_uuid = (item.get("element_uuid") or "").strip()
instance_id = (item.get("instance_id") or "").strip()
if element_uuid and instance_id and element_uuid not in result:
result[element_uuid] = instance_id
device_instance_id = (
(device.get("device_instance_id") or "").strip()
or (device.get("instance_id") or "").strip()
)
if not device_instance_id:
continue
for terminal in device.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = (terminal.get("element_uuid") or "").strip()
if element_uuid and element_uuid not in result:
result[element_uuid] = device_instance_id
for terminal in payload.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
element_uuid = (terminal.get("element_uuid") or "").strip()
device_instance_id = (
(terminal.get("device_instance_id") or "").strip()
or (terminal.get("instance_id") or "").strip()
)
if element_uuid and device_instance_id and element_uuid not in result:
result[element_uuid] = device_instance_id
return result
def _payload_terminal_entries(payload):
if "terminals" in payload and payload.get("terminals") is not None:
terminal_entries = payload.get("terminals", [])
if not isinstance(terminal_entries, list):
raise TerminalImportError("Field 'terminals' must be a list.")
return list(terminal_entries)
terminal_entries = []
for device in payload.get("devices", []) or []:
if not isinstance(device, dict):
continue
device_instance_id = (
(device.get("device_instance_id") or "").strip()
or (device.get("instance_id") or "").strip()
)
for terminal in device.get("terminals", []) or []:
if not isinstance(terminal, dict):
continue
entry = dict(terminal)
if device_instance_id and not (
(entry.get("device_instance_id") or "").strip()
or (entry.get("instance_id") or "").strip()
):
entry["instance_id"] = device_instance_id
terminal_entries.append(entry)
return terminal_entries
def _wire_endpoint_terminal_entries(payload, existing_keys):
wires = payload.get("wires", []) or []
if not isinstance(wires, list):
@ -357,10 +416,7 @@ def import_terminals_from_payload(payload, scene_path=""):
root_group = DeviceImport._ensure_root_group(doc, project_uuid)
_ = root_group
terminal_entries = payload.get("terminals", [])
if not isinstance(terminal_entries, list):
raise TerminalImportError("Field 'terminals' must be a list.")
terminal_entries = list(terminal_entries)
terminal_entries = _payload_terminal_entries(payload)
terminal_entry_keys = set()
for item in terminal_entries:
if not isinstance(item, dict):
@ -462,12 +518,6 @@ def import_terminals_from_payload(payload, scene_path=""):
for index, entry in enumerate(entries):
terminal_uuid = entry["terminal_uuid"]
payload_instance_id = entry["instance_id"]
if payload_instance_id and payload_instance_id != device_instance_id:
report["warnings"].append(
"Terminal {0} references instance_id {1} but device {2} uses {3}. The device value was kept."
.format(terminal_uuid, payload_instance_id, device_element_uuid, device_instance_id)
)
slot = None
slot_hint = _normalize_slot_name(entry.get("slot_name_hint", ""))
@ -557,11 +607,12 @@ def import_terminals_from_payload(payload, scene_path=""):
report["reused_template_hints"] += 1
doc.recompute()
if Gui is not None:
try:
Gui.SendMsgToActiveView("ViewFit")
except Exception:
pass
try:
DeviceImport._append_debug_log(
"TerminalImport ViewFit skipped during exchange import"
)
except Exception:
pass
_append_debug_log(
"TerminalImport finished: imported={0}, updated={1}, removed={2}, skipped_unmatched_parent={3}, skipped_missing_slot={4}".format(

@ -89,8 +89,12 @@ def _normalize_wire_entry(item, index, device_labels=None):
end_element_uuid = _string_value(item, "end_element_uuid")
start_terminal_display = _string_value(item, "start_terminal_display")
end_terminal_display = _string_value(item, "end_terminal_display")
start_device_label = device_labels.get(start_element_uuid, "")
end_device_label = device_labels.get(end_element_uuid, "")
start_device_label = _string_value(item, "start_device_label") or device_labels.get(
start_element_uuid, ""
)
end_device_label = _string_value(item, "end_device_label") or device_labels.get(
end_element_uuid, ""
)
endpoint_label = "{0} -> {1}".format(
_endpoint_text(start_device_label, start_terminal_display, start_terminal_uuid),
_endpoint_text(end_device_label, end_terminal_display, end_terminal_uuid),

@ -111,7 +111,6 @@ class ExchangeBootstrapWiringTest(unittest.TestCase):
"schema_version": "1.2",
"project_uuid": "project-1",
"devices": [],
"terminals": [],
"device_models": [],
"wires": [
{
@ -132,6 +131,121 @@ class ExchangeBootstrapWiringTest(unittest.TestCase):
self.assertEqual("wire-1", normalized["wires"][0]["wire_id"])
self.assertEqual("W001", normalized["wires"][0]["wire_mark"])
def test_load_exchange_payload_flattens_nested_device_terminals(self):
_install_fake_modules()
sys.modules.pop("ExchangeBootstrap", None)
bootstrap = importlib.import_module("ExchangeBootstrap")
payload = {
"schema_version": "2.0",
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "element-a",
"terminal_display": "P1",
}
],
}
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": r"D:\models\qf1.step",
}
],
"wires": [],
}
with tempfile.TemporaryDirectory() as temp_dir:
path = Path(temp_dir) / "2d_to_3d.json"
path.write_text(json.dumps(payload), encoding="utf-8")
normalized = bootstrap.load_exchange_payload(str(path))
self.assertEqual("device-inst-1", normalized["devices"][0]["instance_id"])
self.assertEqual("element-a", normalized["devices"][0]["element_uuid"])
self.assertEqual(["element-a"], normalized["devices"][0]["element_uuids"])
self.assertEqual(1, len(normalized["terminals"]))
self.assertEqual("terminal-a", normalized["terminals"][0]["terminal_uuid"])
self.assertEqual("device-inst-1", normalized["terminals"][0]["instance_id"])
self.assertEqual("device-inst-1", normalized["device_models"][0]["instance_id"])
def test_load_exchange_payload_rejects_legacy_root_terminals(self):
_install_fake_modules()
sys.modules.pop("ExchangeBootstrap", None)
bootstrap = importlib.import_module("ExchangeBootstrap")
payload = {
"schema_version": "2.0",
"project_uuid": "project-1",
"devices": [],
"terminals": [],
"device_models": [],
"wires": [],
}
with tempfile.TemporaryDirectory() as temp_dir:
path = Path(temp_dir) / "2d_to_3d.json"
path.write_text(json.dumps(payload), encoding="utf-8")
with self.assertRaises(bootstrap.ExchangeValidationError):
bootstrap.load_exchange_payload(str(path))
def test_summary_message_includes_updated_device_label_change_details(self):
_install_fake_modules()
sys.modules.pop("ExchangeBootstrap", None)
bootstrap = importlib.import_module("ExchangeBootstrap")
message = bootstrap._summary_message(
{
"project_uuid": "project-1",
"json_path": r"D:\project\example\.qet_freecad\2d_to_3d.json",
"device_count": 1,
"terminal_count": 2,
"wire_count": 0,
"device_model_count": 1,
"device_models_with_parts": 1,
"missing_device_instances": 0,
"missing_terminal_instances": 0,
"scene_path": r"D:\project\example\.qet_freecad\QETScene.FCStd",
"is_first_open": False,
},
import_report={
"document_name": "QETScene",
"cabinet_imported": 0,
"cabinet_added": 0,
"cabinet_reimported": 0,
"cabinet_reused": 1,
"imported_devices": 0,
"updated_devices": 1,
"reused_devices": 0,
"added_device_details": [],
"updated_device_details": [
{
"label": "J3",
"display_tag": "J3",
"previous_display_tag": "J1",
"instance_id": "device-inst-1",
"change_types": ["标注"],
"added_terminal_uuids": [],
"removed_terminal_uuids": [],
}
],
"imported_without_instance_id": 0,
"skipped_missing_model": 0,
"skipped_missing_file": 0,
"skipped_unsupported_format": 0,
"skipped_import_error": 0,
"warnings": [],
},
)
self.assertIn("更新设备1", message)
self.assertIn("修改设备:", message)
self.assertIn("Updated device details:", message)
self.assertIn("J3 [device-inst-1] -> 标注 (标注 J1 -> J3)", message)
if __name__ == "__main__":
unittest.main()

@ -41,13 +41,16 @@ def _install_fake_freecad(source_doc):
fake_freecad.ActiveDocument = None
fake_freecad.set_active_document_calls = []
fake_freecad.open_document_calls = []
fake_freecad.close_document_calls = []
fake_freecad.new_document_calls = []
fake_freecad.documents = {}
def set_active_document(name):
fake_freecad.set_active_document_calls.append(name)
def close_document(*args, **kwargs):
def close_document(name, *args, **kwargs):
fake_freecad.close_document_calls.append(name)
fake_freecad.documents.pop(name, None)
fake_freecad.ActiveDocument = None
def open_document(*args, **kwargs):
@ -266,7 +269,7 @@ class FcstdDeviceImportTest(unittest.TestCase):
self.assertEqual(str(scene_path), app.open_document_calls[0][0][0])
self.assertEqual([], app.new_document_calls)
self.assertIs(app.ActiveDocument, scene_doc)
self.assertIn("QETScene", app.set_active_document_calls)
self.assertEqual([], app.set_active_document_calls)
def test_legacy_singleton_cabinet_group_is_reused_without_reimport(self):
with tempfile.TemporaryDirectory() as temp_dir:
@ -602,6 +605,434 @@ class FcstdDeviceImportTest(unittest.TestCase):
self.assertEqual([body.Name], [obj.Name for obj in copied_objects])
self.assertEqual(["Body"], [obj.Name for obj in device_group.Group])
def test_import_devices_from_payload_accepts_nested_terminal_payload(self):
with tempfile.TemporaryDirectory() as temp_dir:
model_path = Path(temp_dir) / "device.step"
model_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
doc = FakeDocument("QETScene")
doc.recompute = lambda: None
device_import._ensure_document = lambda scene_path: doc
device_import._import_model_into_group = lambda *args, **kwargs: []
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "element-a",
}
],
}
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": str(model_path),
}
],
}
)
self.assertEqual(1, report["imported_devices"])
root = doc.getObject(device_import.ROOT_GROUP_NAME)
self.assertIsNotNone(root)
devices = [
obj
for obj in root.Group
if getattr(obj, "Name", "").startswith(device_import.DEVICE_GROUP_PREFIX)
]
self.assertEqual(1, len(devices))
self.assertEqual("device-inst-1", devices[0].QetInstanceId)
self.assertEqual("element-a", devices[0].QetElementUuid)
def test_import_devices_from_payload_reuses_fcstd_source_document_within_one_sync(self):
source = FakeDocument("TerminalSlice", r"D:\models\qet_terminal_slice.FCStd")
source.addObject("Part::Feature", "Body")
_install_fake_freecad(source)
app = sys.modules["FreeCAD"]
device_import, _ = _reload_modules()
doc = FakeDocument("QETScene")
doc.recompute = lambda: None
device_import._ensure_document = lambda scene_path: doc
original_isfile = device_import.os.path.isfile
try:
device_import.os.path.isfile = lambda path: True
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "ID:1",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "element-a",
}
],
},
{
"device_instance_id": "device-inst-2",
"display_tag": "ID:2",
"terminals": [
{
"terminal_uuid": "terminal-b",
"element_uuid": "element-b",
}
],
},
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": source.FileName,
},
{
"device_instance_id": "device-inst-2",
"resolved_model_path": source.FileName,
},
],
}
)
finally:
device_import.os.path.isfile = original_isfile
self.assertEqual(2, report["imported_devices"])
self.assertEqual(1, len(app.open_document_calls))
self.assertEqual(["TerminalSlice"], app.close_document_calls)
def test_import_devices_from_payload_skips_reimport_when_existing_device_matches_model_path(self):
with tempfile.TemporaryDirectory() as temp_dir:
model_path = Path(temp_dir) / "device.step"
model_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
doc = FakeDocument("QETScene")
doc.recompute = lambda: None
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, None, "project-1")
device_group, created_now = device_import._ensure_device_group(
doc,
root,
"element-a",
"device-inst-1",
str(model_path),
"QF1",
0,
)
self.assertTrue(created_now)
existing_body = doc.addObject("Part::Feature", "ExistingBody")
device_group.addObject(existing_body)
terminal_objects = importlib.import_module("TerminalObjects")
terminal_group = terminal_objects.ensure_terminal_group(
doc,
device_group,
project_uuid="project-1",
instance_id="device-inst-1",
)
existing_terminal = terminal_objects.create_lcs_object(
doc,
"QETTerminal_terminal_a",
label="terminal-a",
)
terminal_group.addObject(existing_terminal)
terminal_objects.set_terminal_semantics(
existing_terminal,
"project-1",
"element-a",
"terminal-a",
"device-inst-1",
label="terminal-a",
slot_name="terminal-a",
)
import_calls = []
def fake_import_model(*args, **kwargs):
import_calls.append((args, kwargs))
return []
device_import._import_model_into_group = fake_import_model
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "element-a",
}
],
}
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": str(model_path),
}
],
}
)
self.assertEqual([], import_calls)
self.assertEqual(0, report["imported_devices"])
self.assertEqual(0, report["updated_devices"])
self.assertEqual(1, report["reused_devices"])
self.assertIn(existing_body, device_group.Group)
def test_import_devices_from_payload_reports_terminal_only_change_without_model_reimport(self):
with tempfile.TemporaryDirectory() as temp_dir:
model_path = Path(temp_dir) / "device.step"
model_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
terminal_objects = importlib.import_module("TerminalObjects")
doc = FakeDocument("QETScene")
doc.recompute = lambda: None
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, None, "project-1")
device_group, _ = device_import._ensure_device_group(
doc,
root,
"element-a",
"device-inst-1",
str(model_path),
"QF1",
0,
)
existing_body = doc.addObject("Part::Feature", "ExistingBody")
device_group.addObject(existing_body)
terminal_group = terminal_objects.ensure_terminal_group(
doc,
device_group,
project_uuid="project-1",
instance_id="device-inst-1",
)
old_terminal = terminal_objects.create_lcs_object(
doc,
"QETTerminal_old_terminal",
label="old",
)
terminal_group.addObject(old_terminal)
terminal_objects.set_terminal_semantics(
old_terminal,
"project-1",
"element-a",
"terminal-old",
"device-inst-1",
label="old",
slot_name="old",
)
import_calls = []
def fake_import_model(*args, **kwargs):
import_calls.append((args, kwargs))
return []
device_import._import_model_into_group = fake_import_model
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-new",
"element_uuid": "element-a",
}
],
}
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": str(model_path),
}
],
}
)
self.assertEqual([], import_calls)
self.assertEqual(0, report["imported_devices"])
self.assertEqual(1, report["updated_devices"])
self.assertEqual(0, report["reused_devices"])
self.assertEqual(1, len(report["updated_device_details"]))
self.assertEqual(["端子"], report["updated_device_details"][0]["change_types"])
self.assertEqual(["terminal-new"], report["updated_device_details"][0]["added_terminal_uuids"])
self.assertEqual(["terminal-old"], report["updated_device_details"][0]["removed_terminal_uuids"])
def test_import_devices_from_payload_reports_display_tag_only_change_without_model_reimport(self):
with tempfile.TemporaryDirectory() as temp_dir:
model_path = Path(temp_dir) / "device.step"
model_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
terminal_objects = importlib.import_module("TerminalObjects")
doc = FakeDocument("QETScene")
doc.recompute = lambda: None
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, None, "project-1")
device_group, _ = device_import._ensure_device_group(
doc,
root,
"element-a",
"device-inst-1",
str(model_path),
"J1",
0,
)
existing_body = doc.addObject("Part::Feature", "ExistingBody")
device_group.addObject(existing_body)
terminal_group = terminal_objects.ensure_terminal_group(
doc,
device_group,
project_uuid="project-1",
instance_id="device-inst-1",
)
existing_terminal = terminal_objects.create_lcs_object(
doc,
"QETTerminal_terminal_a",
label="terminal-a",
)
terminal_group.addObject(existing_terminal)
terminal_objects.set_terminal_semantics(
existing_terminal,
"project-1",
"element-a",
"terminal-a",
"device-inst-1",
label="terminal-a",
slot_name="terminal-a",
)
import_calls = []
def fake_import_model(*args, **kwargs):
import_calls.append((args, kwargs))
return []
device_import._import_model_into_group = fake_import_model
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "J3",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "element-a",
}
],
}
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": str(model_path),
}
],
}
)
self.assertEqual([], import_calls)
self.assertEqual("J3", device_group.Label)
self.assertEqual("J3", device_group.QetDisplayTag)
self.assertEqual(0, report["imported_devices"])
self.assertEqual(1, report["updated_devices"])
self.assertEqual(0, report["reused_devices"])
self.assertEqual(["标注"], report["updated_device_details"][0]["change_types"])
self.assertEqual("J1", report["updated_device_details"][0]["previous_display_tag"])
self.assertEqual("J3", report["updated_device_details"][0]["display_tag"])
def test_import_devices_from_payload_updates_existing_display_tag_even_when_model_path_missing(self):
with tempfile.TemporaryDirectory() as temp_dir:
model_path = Path(temp_dir) / "device.step"
model_path.write_text("fake step placeholder", encoding="utf-8")
_install_fake_freecad(None)
device_import, _ = _reload_modules()
doc = FakeDocument("QETScene")
doc.recompute = lambda: None
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, None, "project-1")
device_group, _ = device_import._ensure_device_group(
doc,
root,
"element-a",
"device-inst-1",
str(model_path),
"J1",
0,
)
existing_body = doc.addObject("Part::Feature", "ExistingBody")
device_group.addObject(existing_body)
report = device_import.import_devices_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "device-inst-1",
"display_tag": "J3",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "element-a",
}
],
}
],
"device_models": [
{
"device_instance_id": "device-inst-1",
"resolved_model_path": "",
}
],
}
)
self.assertEqual("J3", device_group.Label)
self.assertEqual("J3", device_group.QetDisplayTag)
self.assertEqual(str(model_path), device_group.QetResolvedModelPath)
self.assertEqual(1, report["updated_devices"])
self.assertEqual(1, report["skipped_missing_model"])
self.assertEqual(["标注"], report["updated_device_details"][0]["change_types"])
if __name__ == "__main__":
unittest.main()

@ -180,15 +180,14 @@ class StaleObjectSyncTest(unittest.TestCase):
},
"devices": [
{
"element_uuid": "device-active",
"instance_id": "instance-active",
}
],
"terminals": [
{
"terminal_uuid": "terminal-active",
"instance_id": "instance-active",
"element_uuid": "device-active",
"device_instance_id": "instance-active",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-active",
"element_uuid": "device-active",
}
],
}
],
"wires": [
@ -219,6 +218,47 @@ class StaleObjectSyncTest(unittest.TestCase):
self.assertEqual("Active", active_task.QetSyncStatus)
self.assertEqual("Stale", stale_task.QetSyncStatus)
def test_device_stale_check_uses_instance_id_not_element_uuid(self):
_install_fake_freecad()
terminal_objects, _, stale_object_sync = _reload_modules()
doc = FakeDocument()
root = terminal_objects.ensure_root_group(doc, "project-1")
device = doc.addObject("App::Part", "QETDevice_device_a")
device.addProperty("App::PropertyString", "QetElementUuid", "QET Exchange", "")
device.QetElementUuid = "shared-element"
device.addProperty("App::PropertyString", "QetInstanceId", "QET Exchange", "")
device.QetInstanceId = "instance-old"
device.addProperty("App::PropertyString", "QetDisplayTag", "QET Exchange", "")
device.QetDisplayTag = "QF1"
root.addObject(device)
payload = {
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "instance-new",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "shared-element",
}
],
}
],
}
report = stale_object_sync.mark_stale_objects_from_payload(payload, doc)
self.assertEqual(0, report["active_devices"])
self.assertEqual(1, report["stale_devices"])
self.assertEqual("Stale", device.QetSyncStatus)
self.assertEqual(1, len(report["stale_device_details"]))
self.assertEqual("QF1", report["stale_device_details"][0]["label"])
self.assertEqual("instance-old", report["stale_device_details"][0]["instance_id"])
if __name__ == "__main__":
unittest.main()

@ -265,6 +265,65 @@ class TerminalImportTemplateSlotPolicyTest(unittest.TestCase):
self.assertEqual([local_terminal], terminals)
self.assertEqual("local:instance-a:P1", local_terminal.QetTerminalUuid)
def test_import_accepts_nested_device_terminals_without_top_level_terminals(self):
_install_fake_freecad()
terminal_import, terminal_objects, device_import = _reload_modules()
doc = FakeDocument()
device_import._ensure_document = lambda scene_path: doc
root = device_import._ensure_root_group(doc, project_uuid="project-1")
device = doc.addObject("App::Part", "QETDevice_device_a")
root.addObject(device)
terminal_objects.ensure_string_property(
device,
"QetProjectUuid",
"QET Exchange",
"Project UUID",
"project-1",
)
terminal_objects.ensure_string_property(
device,
"QetElementUuid",
"QET Exchange",
"Element UUID",
"device-a",
)
terminal_objects.ensure_string_property(
device,
"QetInstanceId",
"QET Exchange",
"Instance ID",
"instance-a",
)
report = terminal_import.import_terminals_from_payload(
{
"project_uuid": "project-1",
"devices": [
{
"device_instance_id": "instance-a",
"display_tag": "QF1",
"terminals": [
{
"terminal_uuid": "terminal-a",
"element_uuid": "device-a",
}
],
}
],
}
)
terminal_group = terminal_objects.find_child_group_by_kind(
device,
terminal_objects.TERMINAL_GROUP_KIND,
)
terminals = terminal_objects.collect_terminal_objects(terminal_group)
self.assertEqual(1, report["imported_terminals"])
self.assertEqual(1, len(terminals))
self.assertEqual("terminal-a", terminals[0].QetTerminalUuid)
def test_import_synthesizes_missing_terminal_entries_from_wire_endpoints(self):
_install_fake_freecad()
terminal_import, terminal_objects, device_import = _reload_modules()

@ -458,10 +458,80 @@ class WiringTest(unittest.TestCase):
)
self.assertEqual(
[{"terminal_uuid": "terminal-a", "instance_id": "instance-a"}],
[{
"terminal_uuid": "terminal-a",
"device_instance_id": "instance-a",
"terminal_instance_id": "instance-a",
}],
report["terminals"],
)
def test_writeback_collects_all_element_uuid_bindings_from_terminals(self):
_install_fake_freecad()
terminal_objects, _wiring_objects, _manual_wiring, write_back = _reload_modules()
doc = FakeDocument()
root = terminal_objects.ensure_root_group(doc, "project-1")
device = doc.addObject("App::DocumentObjectGroup", "QETDevice_device_a")
root.addObject(device)
terminal_objects.ensure_string_property(
device,
"QetElementUuid",
"QET Exchange",
"Element UUID",
"device-a",
)
terminal_objects.ensure_string_property(
device,
"QetInstanceId",
"QET Exchange",
"Instance ID",
"instance-a",
)
terminal_group = terminal_objects.ensure_terminal_group(
doc,
device,
project_uuid="project-1",
instance_id="instance-a",
)
first_terminal = terminal_objects.create_lcs_object(doc, "QETTerminal_A")
terminal_group.addObject(first_terminal)
terminal_objects.set_terminal_semantics(
first_terminal,
"project-1",
"device-a",
"terminal-a",
"instance-a",
label="A",
)
second_terminal = terminal_objects.create_lcs_object(doc, "QETTerminal_B")
terminal_group.addObject(second_terminal)
terminal_objects.set_terminal_semantics(
second_terminal,
"project-1",
"device-b",
"terminal-b",
"instance-a",
label="B",
)
with tempfile.TemporaryDirectory() as tmp_dir:
report = write_back.write_back_document(
doc,
scene_path=str(Path(tmp_dir) / "scene.FCStd"),
payload={"project_uuid": "project-1"},
)
self.assertEqual(
[
{"element_uuid": "device-a", "device_instance_id": "instance-a"},
{"element_uuid": "device-b", "device_instance_id": "instance-a"},
],
report["instances"],
)
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save