找回密码
 立即注册
查看: 234|回复: 0

【开源分享】STC8H 触摸传感器实时监控系统(Python 可视化 + 串口通信)

[复制链接]
  • 打卡等级:常住居民III
  • 打卡总天数:129
  • 最近打卡:2026-05-08 14:48:41

15

主题

97

回帖

144

积分

注册会员

积分
144
发表于 2026-1-24 11:25:29 | 显示全部楼层 |阅读模式
前因:修改“STC8H4K64Txx-触摸按键校验检测工程”至 STC8H1K08T-33I未果
索性自己问AI写了一个可视化监控程序。
程序功能:
  • 支持 3 路触摸通道的实时波形显示(采样值 + 基线值)
  • 单独展示所有通道的差值波形
  • 串口自动扫描、一键连接 / 断开,状态可视化提示
  • 实时显示通道最新数值,差值实时更新
  • 纵轴自适应数据范围,X 轴固定 200 个数据点
  • 效果图片:
截图202601241118104763.jpg

截图202601241118356472.jpg
单片机代码:
  1.   while(1) {
  2.     touch_scan();
  3.     // 发送:基线0 基线1 基线2 当前0 当前1 当前2
  4.     printf("%u %u %u %u %u %u %x\r\n",
  5.            TK_zero[0], TK_zero[1], TK_zero[2],
  6.            TK_cnt[0],  TK_cnt[1],  TK_cnt[2],);
  7.     delay_ms(50); // ~20Hz 发送频率
  8.   }
复制代码


上位机代码:
  1. import serial
  2. import serial.tools.list_ports
  3. import matplotlib.pyplot as plt
  4. from matplotlib.animation import FuncAnimation
  5. from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
  6. import numpy as np
  7. import threading
  8. import time
  9. import tkinter as tk
  10. from tkinter import ttk, messagebox
  11. from collections import deque
  12. import queue
  13. # # 中文字体设置(放在导入 matplotlib 之  后)
  14. # plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS', 'DejaVu Sans']
  15. # # 优先尝试常见的中文字体,如果都没有会自动回退
  16. # plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
  17. # ================== 配置与全局变量 ==================
  18. # 优先使用Windows原生中文等宽字体,兜底兼容
  19. plt.rcParams['font.sans-serif'] = [
  20.     'Microsoft YaHei Mono',  # 核心:Win10+ 中文等宽(优先)
  21.     'SimSun-ExtB',           # 兜底:Win全版本中文等宽
  22.     'Courier New'            # 备用:跨平台等宽(防止前两者缺失)
  23. ]
  24. plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
  25. plt.rcParams['font.family'] = 'monospace'   # 强制启用等宽模式(关键)
  26. plt.rcParams['font.size'] = 10              # 可选:统一字体大小
  27. DATA_POINTS = 200
  28. BAUDRATE = 9600
  29. ANIMATE_INTERVAL = 50  # 动画刷新间隔(ms)
  30. SERIAL_READ_INTERVAL = 0.01  # 串口读取间隔(s)
  31. Y_AXIS_MARGIN = 50  # Y轴边距(缩小边距,适配更精准)
  32. class SerialApp:
  33.     def __init__(self, root):
  34.         self.root = root
  35.         self.root.title("STC8H 触摸监控系统")
  36.         # 串口对象及状态
  37.         self.ser = serial.Serial()
  38.         self.is_running = False
  39.         self.read_thread = None
  40.         self.data_lock = threading.Lock()  # 线程安全锁
  41.         self.data_queue = queue.Queue(maxsize=100)  # 数据缓冲队列
  42.         # 数据存储 - 使用deque替代numpy数组,优化数据更新
  43.         self.cnt_data = [deque([0] * DATA_POINTS, maxlen=DATA_POINTS) for _ in range(3)]
  44.         self.zero_data = [deque([0] * DATA_POINTS, maxlen=DATA_POINTS) for _ in range(3)]
  45.         self.current_delta = [0, 0, 0]
  46.         # 预计算X轴数据
  47.         self.x_data = np.arange(DATA_POINTS)
  48.         # 绘图对象缓存
  49.         self.line_objects = []
  50.         self.baseline_objects = []
  51.         self.delta_line_objects = []
  52.         # 动画控制标志
  53.         self.animation_running = False
  54.         self.ani = None  # 延迟初始化动画
  55.         self.setup_ui()
  56.         self.setup_plot()
  57.     def setup_ui(self):
  58.         """创建控制面板"""
  59.         control_frame = ttk.Frame(self.root)
  60.         control_frame.pack(side=tk.TOP, fill=tk.X, padx=10, pady=5)
  61.         # 1. 串口选择
  62.         ttk.Label(control_frame, text="串口选择:").grid(row=0, column=0, padx=5)
  63.         self.port_combo = ttk.Combobox(control_frame, width=15)
  64.         self.port_combo.grid(row=0, column=1, padx=5)
  65.         self.refresh_ports()  # 初始化扫描
  66.         # 2. 刷新串口列表按钮
  67.         ttk.Button(control_frame, text="刷新串口", command=self.refresh_ports).grid(row=0, column=2, padx=5)
  68.         # 3. 开始/断开按钮
  69.         self.btn_start = ttk.Button(control_frame, text="开始连接", command=self.toggle_serial)
  70.         self.btn_start.grid(row=0, column=3, padx=10)
  71.         self.status_label = ttk.Label(control_frame, text="状态: 未连接", foreground="red")
  72.         self.status_label.grid(row=0, column=4, padx=10)
  73.     def setup_plot(self):
  74.         """创建嵌入式 Matplotlib 图表 - 优化版本"""
  75.         # 创建图表时减少不必要的参数,优化布局
  76.         self.fig, (self.ax1, self.ax2, self.ax3, self.ax4) = plt.subplots(
  77.             4, 1, figsize=(10, 8), gridspec_kw={'height_ratios': [1, 1, 1, 1.2]},
  78.             tight_layout=True  # 自动优化布局,替代tight_layout调用
  79.         )
  80.         self.fig.suptitle('Touch Channel Monitor', y=0.98)
  81.         # 将图表嵌入 Tkinter 窗口
  82.         self.canvas = FigureCanvasTkAgg(self.fig, master=self.root)
  83.         self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=1)
  84.         # 初始化绘图对象(只创建一次,后续更新数据)
  85.         colors = ['blue', 'green', 'red']
  86.         axes = [self.ax1, self.ax2, self.ax3]
  87.         # 初始化通道波形图
  88.         for i, (ax, color) in enumerate(zip(axes, colors)):
  89.             # 主数据线条
  90.             line, = ax.plot(self.x_data, list(self.cnt_data[i]),
  91.                             label=f'CH{i} Count', color=color, linewidth=1)
  92.             self.line_objects.append(line)
  93.             # 基线线条
  94.             baseline, = ax.plot(self.x_data, list(self.zero_data[i]),
  95.                                 color='black', linestyle='--', alpha=0.6,
  96.                                 label='Baseline', linewidth=1)
  97.             self.baseline_objects.append(baseline)
  98.             # 初始化坐标轴设置(只做一次)
  99.             ax.set_title(f'Channel {i} (Delta: 0)')
  100.             ax.set_ylim(-Y_AXIS_MARGIN, Y_AXIS_MARGIN)
  101.             ax.grid(True, alpha=0.3)
  102.             ax.legend(loc='upper right', fontsize='small')
  103.             # 关闭x轴自动缩放,避免布局抖动
  104.             ax.set_xlim(0, DATA_POINTS - 1)
  105.         # 初始化差值图
  106.         for i, color in enumerate(colors):
  107.             delta_line, = self.ax4.plot(self.x_data, [0] * DATA_POINTS,
  108.                                         label=f'CH{i} Δ (0)', color=color, linewidth=1)
  109.             self.delta_line_objects.append(delta_line)
  110.         self.ax4.set_title('Real-time Channel Differences (Baseline - Count)')
  111.         self.ax4.set_ylim(-50, 50)  # 初始值,后续动态更新
  112.         self.ax4.legend(loc='upper right', ncol=3)
  113.         self.ax4.grid(True, alpha=0.3)
  114.         self.ax4.set_xlim(0, DATA_POINTS - 1)  # 固定x轴范围
  115.     def start_animation(self):
  116.         """启动动画(仅在连接时)"""
  117.         if not self.animation_running:
  118.             self.ani = FuncAnimation(
  119.                 self.fig,
  120.                 self.animate,
  121.                 interval=ANIMATE_INTERVAL,
  122.                 cache_frame_data=False,
  123.                 blit=True,  # 开启blit优化
  124.                 repeat=True
  125.             )
  126.             self.animation_running = True
  127.     def stop_animation(self):
  128.         """停止动画(断开连接时)"""
  129.         if self.animation_running and self.ani:
  130.             self.ani.event_source.stop()
  131.             self.animation_running = False
  132.     def refresh_ports(self):
  133.         """扫描当前可用串口 - 优化:减少不必要的刷新"""
  134.         ports = serial.tools.list_ports.comports()
  135.         port_list = [p.device for p in ports]
  136.         self.port_combo['values'] = port_list
  137.         if port_list and not self.port_combo.get():
  138.             self.port_combo.current(0)
  139.     def toggle_serial(self):
  140.         """切换开始/断开状态"""
  141.         if not self.is_running:
  142.             self.start_serial()
  143.         else:
  144.             self.stop_serial()
  145.     def start_serial(self):
  146.         """打开串口并启动读取线程"""
  147.         port = self.port_combo.get()
  148.         if not port:
  149.             messagebox.showwarning("警告", "请先选择一个串口!")
  150.             return
  151.         try:
  152.             self.ser.port = port
  153.             self.ser.baudrate = BAUDRATE
  154.             self.ser.timeout = 0.5  # 优化超时时间
  155.             self.ser.open()
  156.             self.is_running = True
  157.             self.btn_start.config(text="断开连接")
  158.             self.status_label.config(text=f"状态: 已连接 {port}", foreground="green")
  159.             self.port_combo.config(state="disabled")
  160.             # 启动动画
  161.             self.start_animation()
  162.             # 启动线程
  163.             self.read_thread = threading.Thread(target=self.read_serial_data, daemon=True)
  164.             self.read_thread.start()
  165.         except Exception as e:
  166.             messagebox.showerror("错误", f"无法打开串口: {e}")
  167.     def stop_serial(self):
  168.         """停止读取并关闭串口 - 优化:更安全的线程退出"""
  169.         self.is_running = False
  170.         # 停止动画,减少资源占用
  171.         self.stop_animation()
  172.         # 清空数据队列
  173.         while not self.data_queue.empty():
  174.             try:
  175.                 self.data_queue.get_nowait()
  176.             except queue.Empty:
  177.                 break
  178.         # 等待线程退出
  179.         if self.read_thread and self.read_thread.is_alive():
  180.             self.read_thread.join(timeout=0.5)
  181.         if self.ser.is_open:
  182.             self.ser.close()
  183.         self.btn_start.config(text="开始连接")
  184.         self.status_label.config(text="状态: 未连接", foreground="red")
  185.         self.port_combo.config(state="normal")
  186.     def read_serial_data(self):
  187.         """后台读取线程 - 大幅优化"""
  188.         buffer = ""  # 串口数据缓冲
  189.         while self.is_running:
  190.             if self.ser.is_open:
  191.                 try:
  192.                     # 读取可用数据(而非整行),减少阻塞
  193.                     if self.ser.in_waiting > 0:
  194.                         data = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
  195.                         buffer += data
  196.                         # 按行分割处理
  197.                         while '\n' in buffer:
  198.                             line, buffer = buffer.split('\n', 1)
  199.                             line = line.strip()
  200.                             if line:
  201.                                 parts = line.split()
  202.                                 if len(parts) >= 6:
  203.                                     try:
  204.                                         zeros = [int(parts[0]), int(parts[1]), int(parts[2])]
  205.                                         cnts = [int(parts[3]), int(parts[4]), int(parts[5])]
  206.                                         # 使用队列传递数据,避免直接操作共享变量
  207.                                         if not self.data_queue.full():
  208.                                             self.data_queue.put((zeros, cnts))
  209.                                     except ValueError:
  210.                                         # 数据格式错误时忽略,不中断线程
  211.                                         continue
  212.                     # 优化休眠时间,减少CPU占用
  213.                     time.sleep(SERIAL_READ_INTERVAL)
  214.                 except Exception as e:
  215.                     print(f"读取异常: {e}")
  216.                     time.sleep(0.1)
  217.             else:
  218.                 time.sleep(0.1)
  219.     def update_data(self):
  220.         """更新数据缓存 - 线程安全"""
  221.         if self.data_queue.empty():
  222.             return False
  223.         try:
  224.             # 批量处理队列中的数据(最多处理10条,避免阻塞)
  225.             for _ in range(min(10, self.data_queue.qsize())):
  226.                 zeros, cnts = self.data_queue.get_nowait()
  227.                 with self.data_lock:
  228.                     # 更新数据 - 使用deque的append自动维护长度,无需roll
  229.                     for i in range(3):
  230.                         self.zero_data[i].append(zeros[i])
  231.                         self.cnt_data[i].append(cnts[i])
  232.                         self.current_delta[i] = zeros[i] - cnts[i]
  233.             return True
  234.         except queue.Empty:
  235.             return False
  236.     def animate(self, i):
  237.         """图表刷新逻辑 - 核心优化(解决卡顿+纵轴自适应)"""
  238.         # 只在有数据更新时才处理绘图逻辑
  239.         data_updated = self.update_data()
  240.         if not data_updated:
  241.             return []
  242.         with self.data_lock:
  243.             # 转换为numpy数组用于绘图
  244.             cnt_arrays = [np.array(list(d)) for d in self.cnt_data]
  245.             zero_arrays = [np.array(list(d)) for d in self.zero_data]
  246.             # 更新通道波形 + 纵轴自适应
  247.             axes = [self.ax1, self.ax2, self.ax3]
  248.             for i in range(3):
  249.                 # 更新线条数据(不重建,只更新)
  250.                 self.line_objects[i].set_ydata(cnt_arrays[i])
  251.                 self.baseline_objects[i].set_ydata(zero_arrays[i])
  252.                 # 【关键修复】强制更新Y轴范围,确保自适应
  253.                 ax = axes[i]
  254.                 all_vals = np.concatenate([cnt_arrays[i], zero_arrays[i]])
  255.                 val_min, val_max = all_vals.min(), all_vals.max()
  256.                 # 计算新的Y轴范围(加边距)
  257.                 new_ylim = (val_min - Y_AXIS_MARGIN, val_max + Y_AXIS_MARGIN)
  258.                 ax.set_ylim(new_ylim)
  259.                 # 更新标题(只更新文本,不重建)- 显示当前通道最新值(6位补零)
  260.                 latest_value = self.cnt_data[i][-1]  # 获取deque最后一个元素(最新值)
  261.                 ax.set_title(f'Channel {i} ({latest_value:06d})')
  262.             # 更新差值图 + 纵轴自适应
  263.             delta_arrays = []
  264.             for i in range(3):
  265.                 delta = zero_arrays[i] - cnt_arrays[i]
  266.                 delta_arrays.append(delta)
  267.                 self.delta_line_objects[i].set_ydata(delta)
  268.                 # self.delta_line_objects[i].set_label(f'CH{i}Δ:({self.current_delta[i]:06d})')
  269.             self.ax4.set_title(f'CH0Δ:({self.current_delta[0]:06d}) CH1Δ:({self.current_delta[1]:06d}) CH2Δ:({self.current_delta[2]:06d})')
  270.             # 差值图纵轴自适应
  271.             all_delta = np.concatenate(delta_arrays)
  272.             delta_min, delta_max = all_delta.min(), all_delta.max()
  273.             self.ax4.set_ylim(delta_min - 20, delta_max + 20)
  274.             self.ax4.legend(loc='upper right', ncol=3)
  275.         # 返回需要更新的对象(blit优化)
  276.         return self.line_objects + self.baseline_objects + self.delta_line_objects
  277. if __name__ == "__main__":
  278.     root = tk.Tk()
  279.     # 优化Tkinter渲染优先级
  280.     root.attributes('-topmost', False)
  281.     root.update_idletasks()
  282.     # 减少Tkinter事件循环负载
  283.     root.after(100, lambda: root.update())
  284.     app = SerialApp(root)
  285.     root.mainloop()
复制代码


回复

使用道具 举报 送花

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|手机版|深圳国芯人工智能有限公司 ( 粤ICP备2022108929号-2 )

GMT+8, 2026-5-10 23:30 , Processed in 0.101517 second(s), 45 queries .

Powered by Discuz! X3.5

© 2001-2026 Discuz! Team.

快速回复 返回顶部 返回列表