- 打卡等级:初来乍到
- 打卡总天数:6
- 最近打卡:2025-11-17 23:01:03
已绑定手机
注册会员
- 积分
- 55
|
有没有大佬做过利用官方 OTA 实现自定义刷机工具的项目?求分享相关思路或代码!
情况是这样的:我的程序固件大小超过了 64K,用官方刷机工具测试时,超过 64K 的固件能正常刷入,但官方工具没有开源,只能自己开发替代工具。
之前处理 64K 以内的固件时很顺利,因为芯片的写入地址有明显规律,按规律拼接指令、分配地址就能成功。但固件一旦超过 64K,写入的地址分配就变得毫无规律可循,试了很久都没摸清逻辑,这个问题已经困扰我好久了,实在没头绪。
如果有大佬做过类似 的项目,或者了解这类地址分配的底层逻辑,麻烦指点一二!哪怕是核心思路、协议解析要点,或者相关代码片段都万分感激!跪谢各位!
下面是我自己设计的页面!
虽然可以发送bin数据,但是写入的地址没找到规律!以下是python代码,:
import tkinter as tk
from tkinter import ttk, scrolledtext
import threading
import serial
import serial.tools.list_ports
from datetime import datetime
import time
import requests
import io
class FlashingApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("STC32G12K128 专用刷机软件(最终帧头修正版)")
self.geometry("500x450")
self.create_widgets()
self.flashing_thread = None
self.stop_flag = threading.Event()
self.reset_flag = threading.Event()
self.manual_port_selection = False
self.after(1000, self.refresh_ports)
def create_widgets(self):
# 按钮框架
self.button_frame = tk.Frame(self)
self.button_frame.pack(fill=tk.X, padx=5, pady=5)
self.start_button = tk.Button(self.button_frame, text="开始升级", command=self.start_flashing)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = tk.Button(self.button_frame, text="停止", command=self.stop_flashing, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.reset_button = tk.Button(self.button_frame, text="重复升级", command=self.reset_flashing)
self.reset_button.pack(side=tk.LEFT, padx=5)
self.help_button = tk.Button(self.button_frame, text="疑难帮助")
self.help_button.pack(side=tk.LEFT, padx=5)
# 串口选择框架
self.port_frame = tk.Frame(self)
self.port_frame.pack(fill=tk.X, padx=5, pady=5)
self.port_label = tk.Label(self.port_frame, text="串口号")
self.port_label.pack(side=tk.LEFT)
self.com_select = ttk.Combobox(self.port_frame)
self.com_select.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.com_select.bind("<<ComboboxSelected>>", self.on_port_selection)
# 日志输出框架
self.log_frame = tk.Frame(self)
self.log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.log_text = scrolledtext.ScrolledText(self.log_frame, height=18, wrap=tk.WORD)
self.log_text.pack(fill=tk.BOTH, expand=True)
self.log_text.config(font=("Consolas", 9))
# 进度条框架
self.progress_frame = tk.Frame(self)
self.progress_frame.pack(fill=tk.X, padx=5, pady=5)
self.progress_label = tk.Label(self.progress_frame, text="进度")
self.progress_label.pack(side=tk.LEFT)
self.progress_bar = ttk.Progressbar(self.progress_frame, orient=tk.HORIZONTAL, length=200, mode='determinate')
self.progress_bar.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
def log_message(self, message):
"""线程安全日志输出(带时间戳)"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
self.after(0, lambda: self._log_message_safe(f"[{timestamp}] {message}"))
def _log_message_safe(self, message):
self.log_text.insert(tk.END, message + "\n")
self.log_text.see(tk.END)
def on_port_selection(self, event):
self.manual_port_selection = True
self.log_message(f"手动选择串口:{self.com_select.get()}")
def refresh_ports(self):
"""刷新串口列表(刷机时暂停)"""
if self.flashing_thread and self.flashing_thread.is_alive():
self.after(1000, self.refresh_ports)
return
try:
ports = serial.tools.list_ports.comports()
port_list = [f"{p.device} ({p.description})" for p in ports]
self.com_select['values'] = port_list
if port_list and not self.manual_port_selection:
selected_port = next((p for p in port_list if "CH340" in p), port_list[0])
self.com_select.set(selected_port)
if f"自动检测到串口:{selected_port}" not in self.log_text.get("1.0", tk.END):
self.log_message(f"自动检测到串口:{selected_port}")
except Exception as e:
self.log_message(f"刷新串口失败:{str(e)}")
self.after(1000, self.refresh_ports)
def update_progress(self, sent, total):
"""线程安全进度条更新"""
self.after(0, lambda: self.progress_bar.config(value=(sent/total)*100 if total else 0))
def start_flashing(self):
"""开始升级(初始化状态)"""
self.stop_flag.clear()
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.progress_bar.config(value=0)
selected_port = self.com_select.get()
if not selected_port:
self.log_message("错误:未选择串口!")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
return
com_port = selected_port.split('(')[0].strip()
self.log_message(f"开始升级,串口:{com_port}")
self.log_message("帧头规则:23 86 A2 00 00 XX YY 80(YY=00→80→01→81→02→82...)")
self.flashing_thread = threading.Thread(target=self.flash_firmware, args=(com_port,))
self.flashing_thread.start()
def stop_flashing(self):
"""停止升级"""
self.log_message("正在停止升级...")
self.stop_flag.set()
self.reset_flag.clear()
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
def reset_flashing(self):
"""重复升级"""
self.log_message("启用重复升级模式")
self.reset_flag.set()
threading.Thread(target=self.repeat_flashing).start()
def repeat_flashing(self):
"""重复升级循环"""
while self.reset_flag.is_set() and not self.stop_flag.is_set():
self.start_flashing()
while self.flashing_thread and self.flashing_thread.is_alive():
time.sleep(0.5)
if not self.stop_flag.is_set():
self.log_message("准备重新升级...")
time.sleep(1)
def flash_firmware(self, com_port):
"""核心刷机逻辑(完全匹配实际帧头)"""
file_url = "替换成服务器存放的bin文件地址"#替换成服务器存放的bin文件地址
BAUD_RATE = 115200
TIMEOUT = 2
packet_size = 128 # 每包128字节
def send_command(hex_str, ser):
"""发送命令(握手/擦除/完成)"""
try:
cmd = bytes.fromhex(hex_str)
self.log_message(f"发送命令:{hex_str}({len(cmd)}字节)")
ser.write(cmd)
ser.flush()
return True
except Exception as e:
self.log_message(f"命令发送失败:{str(e)}")
return False
def read_response(ser, exp_len=7):
"""读取设备响应"""
try:
resp = ser.read(exp_len)
if resp:
self.log_message(f"收到响应:{resp.hex().upper()}({len(resp)}字节)")
else:
self.log_message("未收到响应(超时)")
return resp
except Exception as e:
self.log_message(f"读取响应失败:{str(e)}")
return b""
def calculate_checksum(data):
"""计算校验和(与设备端一致)"""
return (256 - (sum(data) & 0xFF)) & 0xFF
def generate_frame_header(packet_index):
"""
生成完整帧头(8字节:23 86 A2 00 00 XX YY 80)
规则:
1. XX = packet_index // 2(每2包XX+1:0→0→1→1→2→2...)
2. YY(控制位):packet_index%4=0→00;%4=1→80;%4=2→01;%4=3→81;%4=4→02;%4=5→82...
3. 阶段切换:XX=A7时,YY从00→81;XX=EE时,YY从01→82(与你的描述完全匹配)
"""
xx = packet_index // 2 # 计算XX值(每2包+1)
# 计算控制位YY:按packet_index%4循环,且按XX值阶梯切换前缀
mod4 = packet_index % 4
if xx < 0xA7:
# 阶段1:XX < 0xA7 → YY=00/80(前缀0)
yy = 0x00 if mod4 in [0,2] else 0x80
elif xx < 0xEE:
# 阶段2:0xA7 ≤ XX < 0xEE → YY=01/81(前缀1)
yy = 0x01 if mod4 in [0,2] else 0x81
else:
# 阶段3:XX ≥ 0xEE → YY=02/82(前缀2)
yy = 0x02 if mod4 in [0,2] else 0x82
# 完整8字节帧头:23 86 A2 00 00 XX YY 80
return [0x23, 0x86, 0xA2, 0x00, 0x00, xx, yy, 0x80]
def send_frame(packet_index, data, ser):
"""发送数据帧(帧头+数据+帧尾+校验和)"""
try:
# 1. 生成8字节帧头
frame_header = generate_frame_header(packet_index)
# 2. 组装帧:帧头(8) + 数据(128) + 帧尾(0x24)
frame = bytes(frame_header) + data + b'\x24'
# 3. 计算并添加校验和
checksum = calculate_checksum(frame)
frame += bytes([checksum])
# 4. 日志输出(验证帧头正确性)
header_hex = bytes(frame_header).hex().upper()
xx = packet_index // 2
yy = frame_header[6]
self.log_message(f"发送帧 {packet_index+1}:[{header_hex}] [数据128字节] [24 {checksum:02X}]")
self.log_message(f"帧头细节:XX=0x{xx:02X} | YY=0x{yy:02X} | 匹配规则:{bytes(frame_header).hex().upper()}")
ser.write(frame)
ser.flush()
return True
except Exception as e:
self.log_message(f"帧 {packet_index+1} 发送失败:{str(e)}")
return False
handshake_ok = False
try:
# 1. 下载固件
self.log_message(f"下载固件:{file_url}")
resp = requests.get(file_url, timeout=30)
resp.raise_for_status()
firmware = bytearray(resp.content)
total_bytes = len(firmware)
total_packets = (total_bytes + packet_size - 1) // packet_size
self.log_message(f"固件下载成功:{total_bytes}字节 → {total_packets}包(每包128字节)")
# 2. 打开串口
self.log_message(f"打开串口:{com_port}(波特率{BAUD_RATE},超时{TIMEOUT}s)")
with serial.Serial(
port=com_port,
baudrate=BAUD_RATE,
timeout=TIMEOUT,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
) as ser:
time.sleep(1)
self.log_message("串口打开成功,开始握手")
# 3. 握手(进入ISP模式)
exp_handshake = bytes.fromhex('40000201002499')
for retry in range(5):
if self.stop_flag.is_set():
self.log_message("握手被停止")
return
self.log_message(f"握手尝试 {retry+1}/5")
if send_command('2306A000000000002413', ser):
time.sleep(0.3)
resp = read_response(ser)
if resp == exp_handshake:
self.log_message("握手成功!设备进入ISP模式")
handshake_ok = True
break
else:
self.log_message(f"握手响应不匹配:期望{exp_handshake.hex()},实际{resp.hex()}")
time.sleep(1)
if not handshake_ok:
self.log_message("错误:握手失败!请检查:")
self.log_message("1. 设备是否拉低P3.3后断电重启")
self.log_message("2. 串口号是否正确(当前:{com_port})")
self.log_message("3. 设备晶振是否为24MHz(匹配115200波特率)")
return
# 4. 擦除Flash
self.log_message("擦除Flash...")
if send_command('2306A300000000002410', ser):
exp_erase = bytes.fromhex('400000249C')
erase_ok = False
for _ in range(10):
if self.stop_flag.is_set():
self.log_message("擦除被停止")
return
resp = read_response(ser, 5)
if resp == exp_erase:
self.log_message("擦除成功!")
erase_ok = True
break
time.sleep(1)
if not erase_ok:
self.log_message("错误:擦除超时")
return
else:
self.log_message("错误:擦除命令发送失败")
return
# 5. 逐包写入固件
self.log_message(f"\n开始写入(共{total_packets}包)")
for idx in range(total_packets):
if self.stop_flag.is_set():
self.log_message("写入被停止")
return
# 截取当前包数据,不足128字节用0xFF填充
offset = idx * packet_size
packet = firmware[offset:offset+packet_size]
if len(packet) < packet_size:
packet += bytes([0xFF]*(packet_size - len(packet)))
# 发送帧
if not send_frame(idx, packet, ser):
self.log_message(f"写入失败:第{idx+1}包出错")
return
# 更新进度
sent_bytes = min(offset+packet_size, total_bytes)
self.update_progress(sent_bytes, total_bytes)
# 每5包打印进度
if (idx+1) % 5 == 0:
progress = (sent_bytes/total_bytes)*100
self.log_message(f"进度:{idx+1}/{total_packets}包({sent_bytes}/{total_bytes}字节,{progress:.1f}%)")
time.sleep(0.4) # 适配设备写入速度
# 6. 发送完成命令
self.log_message("\n发送升级完成命令...")
send_command('2306A40000000000240F', ser)
time.sleep(0.5)
read_response(ser)
# 7. 写入成功标志
self.log_message("写入升级成功标志(0xAA)...")
success_frame = [0x23,0x86,0xA2,0x00,0x1F,0xF5,0x01,0x01,0xAA,0x24]
success_crc = calculate_checksum(success_frame)
ser.write(bytes(success_frame + [success_crc]))
time.sleep(0.3)
read_response(ser)
# 8. 完成
self.log_message(f"\n===== 升级成功!({datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) =====")
self.log_message(f"总写入:{total_bytes}字节({total_packets}包)")
self.log_message("提示:断电重启设备即可运行新固件")
except requests.exceptions.RequestException as e:
self.log_message(f"固件下载失败:{str(e)}")
except serial.SerialException as e:
self.log_message(f"串口错误:{str(e)}")
except Exception as e:
self.log_message(f"未知错误:{str(e)}")
import traceback
self.log_message(f"错误详情:{traceback.format_exc()}")
finally:
# 恢复界面状态
self.after(0, lambda: self.start_button.config(state=tk.NORMAL))
self.after(0, lambda: self.stop_button.config(state=tk.DISABLED))
if handshake_ok:
self.after(0, lambda: self.progress_bar.config(value=100))
if __name__ == "__main__":
app = FlashingApp()
app.mainloop()
|
|