雷达驱动代码
# RadarDriver - By: CaiZhipeng - 周二 6月 27 2023
import time, math,image
from fpioa_manager import fm
from machine import Timer,PWM,UART
class RadarData:
def __init__(self,angle, distance):
self.angle = angle
self.distance = distance
def PrintRadarData(self):
print("angle:{:0.2f} distance:{:0.2f}".format(self.angle,self.distance))
def ToRectangularCoordinates(self):
angle_rad = math.radians(self.angle)
x = self.distance * math.cos(angle_rad)
y = self.distance * math.sin(angle_rad)
return x, y
def ToBytes(self):
angle_int = int(self.angle)
distance_int = int(self.distance)
angle_byte_H = (angle_int >> 8) & 0xFF
angle_byte_L = angle_int & 0xFF
distance_byte_H = (distance_int >> 8) & 0xFF
distance_byte_L = distance_int & 0xFF
return [angle_byte_H, angle_byte_L, distance_byte_H, distance_byte_L]
class RadarClusterData:
def __init__(self,ClusterList):
self.ClusterList = ClusterList
self.length = len(ClusterList)
angles = [data.angle for data in ClusterList]
distances = [data.distance for data in ClusterList]
average_angle = sum(angles) / self.length
average_distance = sum(distances) / self.length
self.AverageRadarData = RadarData(average_angle,average_distance)
self.VarianceAngle = sum((x - average_angle) ** 2 for x in angles) / self.length
self.VarianceDistance = sum((x - average_distance) ** 2 for x in distances) / self.length
def PrintRadarClusterData(self):
print("AverageRadarData:")
self.AverageRadarData.PrintRadarData()
print("VarianceAngle:{:0.2f} VarianceDistance:{:0.2f}".format(self.VarianceAngle,self.VarianceDistance))
class RadarDbscan:
def __init__(self,DataList,Eps,MinPts):
self.DataList = DataList
self.Eps = Eps
self.MinPts = MinPts
self.ClusterList = []
self.NoiseList = []
def GetDistance(self, Data1, Data2):
# 定义极坐标的距离公式
theta_diff = Data1.angle - Data2.angle
return math.sqrt(Data1.distance**2 + Data2.distance**2 - 2*Data1.distance*Data2.distance*math.cos(theta_diff))
def GetNeighbour(self, Data):
# 获取邻居
NeighbourList = []
for data in self.DataList:
if self.GetDistance(Data,data) <= self.Eps:
NeighbourList.append(data)
return NeighbourList
def GetCoreObject(self):
# 获取核心对象
CoreObjectList = []
for data in self.DataList:
if len(self.GetNeighbour(data)) >= self.MinPts:
CoreObjectList.append(data)
return CoreObjectList
def GetCluster(self):
# 获取聚类
CoreObjectList = self.GetCoreObject()
ClusterList = []
NoiseList = []
OkList = []
for data in CoreObjectList:
if data not in OkList:
Cluster = []
Cluster.append(data)
OkList.append(data)
NeighbourList = self.GetNeighbour(data)
for Neighbour in NeighbourList:
if Neighbour not in Cluster:
Cluster.append(Neighbour)
OkList.append(Neighbour)
NewClusterData = RadarClusterData(Cluster)
ClusterList.append(NewClusterData)
for data in self.DataList:
if data not in OkList:
NoiseList.append(data)
self.ClusterList = ClusterList
self.NoiseList = NoiseList
return ClusterList, NoiseList
def SortByCluster_size(self):
self.ClusterList.sort(key=lambda x: x.length)
def SortByVariance_distance(self):
self.ClusterList.sort(key=lambda x: x.VarianceDistance)
def SortByVariance_angle(self):
self.ClusterList.sort(key=lambda x: x.VarianceAngle)
class RadarDriver:
# 雷达驱动类
def __init__(self,port_RX, port_TX,PWM_pin,PWM_freq,PWM_duty):
# 进行初始化设置和准备工作
fm.register(port_RX, fm.fpioa.UART2_RX, force=True)
fm.register(port_TX, fm.fpioa.UART2_TX, force=True)
tim = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
self.Time_interupt = Timer(Timer.TIMER1, Timer.CHANNEL0, mode=Timer.MODE_PERIODIC, period=4000, callback=self.ClearFlash)
self.MotorPWM = PWM(tim, freq=PWM_freq, duty=PWM_duty, pin=PWM_pin)
self.RplidarA1uart = UART(UART.UART2, 115200, read_buf_len=1024)
self.RadarDataList = []
self.SwitchFlag = 0
self.MinAngle = 0
self.MaxAngle = 360
self.MaxDistance = 500
self.MapImg = image.Image(size=(224,224))
def CheckHealthy(self):
CheckData = [0xA5, 0x52]
Res = []
self.RplidarA1uart.write(bytes(CheckData))
text = self.RplidarA1uart.read()
if text:
for txt in text:
Res.append(txt)
if len(Res) >= 8:
if(Res[0] == 0xA5 and
Res[1] == 0x5A and
Res[2] == 0x03 and
Res[3] == 0x00 and
Res[4] == 0x00 and
Res[5] == 0x00 and
Res[6] == 0x06):
if Res[7] == 0x00:
return 1
else:
return Res[-2:]
return 0
def EnableRplidar(self):
if(self.CheckHealthy()):
self.Time_interupt.start()
self.MotorPWM.enable()
return 1
return 0
def DisableRplidar(self):
self.PauseScan()
self.MotorPWM.disable()
def PauseScan(self):
if self.SwitchFlag == 1:
self.Time_interupt.stop()
EndData = [0xA5, 0x25]
self.RplidarA1uart.write(bytes(EndData))
self.SwitchFlag = 0
def ContinueScan(self):
Res = []
StartData = [0xA5, 0x82, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x22]
if self.SwitchFlag == 0:
self.Time_interupt.start()
self.RplidarA1uart.write(bytes(StartData))
text = self.RplidarA1uart.read()
if text:
for txt in text:
Res.append(txt)
if len(Res) >= 7:
if(Res[0] == 0xA5 and
Res[1] == 0x5A and
Res[2] == 0x54 and
Res[3] == 0x00 and
Res[4] == 0x00 and
Res[5] == 0x40 and
Res[6] == 0x82):
self.SwitchFlag = 1
return 1
return 0
def Decode_q3_6bit(self, number):
if number >= 32:
number = number - 64
return number / 8.0
def validate_packet(self,packet):
TrueChkSum = (packet[0]&0x0F) | ((packet[1]&0x0F)<<4)
data = packet[2:]
ChkSum = 0
for byte in data:
ChkSum ^= byte
return TrueChkSum == ChkSum
def ExpressAnalyseByte(self, Byte, Start_angle, Delta_angle, index):
distance1 = ((Byte[0]&0xFC) >> 2 | Byte[1] << 6)/10
distance2 = ((Byte[2]&0xFC) >> 2 | Byte[3] << 6)/10
theta1 = (Byte[0]&0x03)<<4 | (Byte[4]&0x0F)
theta2 = (Byte[2]&0x03)<<4 | ((Byte[4]&0xF0) >> 4)
angle1 = Start_angle + Delta_angle / 32.0 * index - self.Decode_q3_6bit(theta1)
angle2 = Start_angle + Delta_angle / 32.0 * (index+1) - self.Decode_q3_6bit(theta2)
if angle1 > 360:
angle1 = angle1 - 360
if angle2 > 360:
angle2 = angle2 - 360
if distance1>0:
self.RadarDataList.append(RadarData(angle1,distance1))
if distance2>0:
self.RadarDataList.append(RadarData(angle2,distance2))
def AngleDiff(self, Start_angle, new_Start_angle):
if Start_angle <= new_Start_angle:
result = new_Start_angle - Start_angle
else:
result = 360 + new_Start_angle - Start_angle
return result
def ExpressAnalyseData(self, data):
Start_angle = (data[0][2] | (data[0][3]&0x7F) << 8)/64.0
new_Start_angle = (data[1][2] | (data[1][3]&0x7F) << 8)/64.0
Delta_angle = self.AngleDiff(Start_angle,new_Start_angle)
for i in range(16):
self.ExpressAnalyseByte(data[0][4+i*5:9+i*5],Start_angle,Delta_angle,2*i+1)
def ExpressGetData(self, batch):
temp = []
RplidarA1Data = []
AnalyseRadarDataList = []
self.RadarDataList.clear()
while len(self.RadarDataList) < 32:
self.ContinueScan()
ResFlag = 0
text = self.RplidarA1uart.read(84*batch)
if text:
#self.PauseScan()
for txt in text:
if(txt & 0xF0) == 0xA0 and ResFlag == 0:# 验证头帧
temp.append(txt)
ResFlag = 1
elif(txt & 0xF0) == 0x50 and ResFlag == 1:# 验证头帧
temp.append(txt)
ResFlag = 2
elif ResFlag < 2:# 有冗余数据舍去
temp.clear()
ResFlag = 0
if ResFlag == 2:
RplidarA1Data.extend(temp)# 存放头数据并开始接收
temp.clear()
ResFlag = 3
elif ResFlag == 3 and len(RplidarA1Data) < 84:
RplidarA1Data.append(txt)
if len(RplidarA1Data) == 84:# 一次接收完成
if self.validate_packet(RplidarA1Data):# 验证数据
AnalyseRadarDataList.append(RplidarA1Data[:])# 存入解析队列(一个数据解析需要它的下一个数据的一个表象)
if len(AnalyseRadarDataList) == 2:# 开始解析
self.ExpressAnalyseData(AnalyseRadarDataList)
AnalyseRadarDataList.pop(0)
RplidarA1Data.clear()
temp.clear()
ResFlag = 0
def ClearFlash(self,Time_interupt):
self.RplidarA1uart.read(1024)
def ReadRadarData(self):
if self.MaxAngle > 360:
temp = self.MaxAngle
while self.MaxAngle > 360:
self.MaxAngle = self.MaxAngle - 360
self.RadarDataList = [x for x in self.RadarDataList if self.MinAngle <= x.angle <= 360 or 0 <= x.angle <= self.MaxAngle and x.distance <= self.MaxDistance]
self.MaxAngle = temp
else:
self.RadarDataList = [x for x in self.RadarDataList if self.MinAngle <= x.angle <= self.MaxAngle and x.distance <= self.MaxDistance]
self.RadarDataList.sort(key=lambda x: x.angle)
return self.RadarDataList
def CreatMapping(self,weight=224,height=224):
self.MapImg.clear()
X_max = self.MaxDistance * math.cos(math.radians(45))
Y_max = self.MaxDistance * math.cos(math.radians(45))
self.ReadRadarData()
for data in self.RadarDataList:
x,y = data.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_line(x,y,x,y,(225,225,255),thickness = 3)
return self.MapImg
def CreatDbscanMapping(self,ClusterTotalList,NoiseList,weight=224,height=224):
self.MapImg.clear()
X_max = self.MaxDistance * math.cos(math.radians(45))
Y_max = self.MaxDistance * math.cos(math.radians(45))
for ClusterInedex in range(len(ClusterTotalList)):
g = int((ClusterInedex / len(ClusterTotalList)) * 255)
b = int(((len(ClusterTotalList) - ClusterInedex) / len(ClusterTotalList)) * 255)
for data in ClusterTotalList[ClusterInedex].ClusterList:
x,y = data.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_line(x,y,x,y,(255,g,b),thickness = 3)
for data in NoiseList:
x,y = data.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_line(x,y,x,y,(255,0,0),thickness = 3)
return self.MapImg
def CreatAveragRadarDataMapping(self,ClusterTotalList,Dbscan,weight=224,height=224):
MaxIndex = 2
self.MapImg.clear()
X_max = self.MaxDistance * math.cos(math.radians(45))
Y_max = self.MaxDistance * math.cos(math.radians(45))
for Cluster in ClusterTotalList:
x,y = Cluster.AverageRadarData.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_line(x,y,x,y,(255,255,255),thickness = 3)
Dbscan.SortByVariance_angle()
if len(ClusterTotalList) < 2:
MaxIndex = len(ClusterTotalList)
for i in range(MaxIndex):
x,y = ClusterTotalList[i].AverageRadarData.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_circle(x,y,10,(255,0,0),thickness = 3)
Dbscan.SortByVariance_distance()
for i in range(MaxIndex):
x,y = ClusterTotalList[i].AverageRadarData.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_circle(x,y,10,(0,255,0),thickness = 3)
Dbscan.SortByCluster_size()
for i in range(MaxIndex):
x,y = ClusterTotalList[i].AverageRadarData.ToRectangularCoordinates()
x = int(x / X_max * weight / 2 + weight / 2)
y = int(y / Y_max * height / 2 + height / 2)
self.MapImg.draw_circle(x,y,10,(0,0,255),thickness = 3)
return self.MapImg
def SetScope(self,minangle,maxangle,distance=500):
minangle = int(minangle)
maxangle = int(maxangle)
distance = int(distance)
if minangle < maxangle and minangle < 360 and maxangle < 720:
if minangle < 0:
minangle = 0
if maxangle > 360 and maxangle-360 > minangle:
maxangle = 360
self.MinAngle = minangle
self.MaxAngle = maxangle
if distance > 0 :
self.MaxDistance = distance
return self.MinAngle,self.MaxAngle,self.MaxDistance
def GetScope(self):
return self.MinAngle,self.MaxAngle,self.MaxDistance
#Radar = RadarDriver(10,11,31,50000,69)
#Radar.EnableRplidar()
#while True:
#Radar.ExpressGetData(10)
#DataList = Radar.ReadRadarData()
#for data in DataList:
#data.PrintRadarData()
#print(data.ToBytes())
复制代码