CANanlystII 基于python的二次开发实践
CANanlystII 基于python的二次开发实践
·
前期,我已经编写过一篇《CANanlystII 基于linux的二次开发实践》这篇博客承接上一篇博客,所以背景知识和测试场景,就不再赘述。
背景知识和测试场景,可以查阅如下:
CANanlystII 基于linux的二次开发实践_晓翔仔的博客-CSDN博客_can分析仪二次开发
基于python的二次开发实践
设计步骤如下:
step0xf1: wakeup step0x01: enter extension diagConversation step0x03: update condition step0x05: DTC OFF step0x06: forbid transfer step0x07: ask seed step0x71: send key step0x08: enter codeConversation step0x09: ask seed step0x91: send key 之后可以进行0x34 download步骤,也可以进行0x35 upload步骤
OK,步骤已经清楚了,那我们直接上代码:
# python3.8.0 64位(python 32位要用32位的DLL)
#
# 结构体数组类
import ctypes
from ctypes import *
import threading
import time
VCI_USBCAN2 = 4
STATUS_OK = 1
NO_RES_COUNT_MAX = 5 # 等待响应的时间(周期)上限
STEP = 0xf1 # 添加了一个唤醒步骤
CURRENT_STEP = 0X00 # 当前所处的CAN数据发送和接收的步骤,“接收线程“会根据”当前步骤”和“CAN数据内容”判断是否是正确响应
can_id = 0x777 # ECU的CANID,每个ECU不一样
fun_id = 0x7df # 功能寻址的CANID,固定值
seed = 0 # UDS 0X27,ECU返回过来的seed
seed_size = 0 # UDS 0X27,ECU返回过来的seed位数,一般是2或者4
# 全局变量
TestMode = 'true' # 是否是测试模式,测试模式时,CAN1 CAN2直连。非测试模式时CAN2不使用。
# TestMode = 'false'
class VCI_INIT_CONFIG(Structure):
_fields_ = [("AccCode", c_uint),
("AccMask", c_uint),
("Reserved", c_uint),
("Filter", c_ubyte),
("Timing0", c_ubyte),
("Timing1", c_ubyte),
("Mode", c_ubyte)
]
class VCI_CAN_OBJ(Structure):
_fields_ = [("ID", c_uint),
("TimeStamp", c_uint),
("TimeFlag", c_ubyte),
("SendType", c_ubyte),
("RemoteFlag", c_ubyte),
("ExternFlag", c_ubyte),
("DataLen", c_ubyte),
("Data", c_ubyte * 8),
("Reserved", c_ubyte * 3)
]
class VCI_CAN_OBJ_ARRAY(Structure):
_fields_ = [('SIZE', ctypes.c_uint16), ('STRUCT_ARRAY', ctypes.POINTER(VCI_CAN_OBJ))]
def __init__(self, num_of_structs):
# 这个括号不能少
self.STRUCT_ARRAY = ctypes.cast((VCI_CAN_OBJ * num_of_structs)(), ctypes.POINTER(VCI_CAN_OBJ)) # 结构体数组
self.SIZE = num_of_structs # 结构体长度
self.ADDR = self.STRUCT_ARRAY[0] # 结构体数组地址 byref()转c地址
def list_to_hex_string(list_data):
list_str = '[ '
for x in list_data:
list_str += '0x{:02X},'.format(x)
list_str += ' ]'
return list_str
def can_init_and_start():
ret_open_device = canDLL.VCI_OpenDevice(VCI_USBCAN2, 0, 0)
if ret_open_device == STATUS_OK:
print('调用 VCI_OpenDevice成功')
if ret_open_device != STATUS_OK:
print('调用 VCI_OpenDevice出错')
# 初始0通道
vci_init_config = VCI_INIT_CONFIG((can_id + 8) << 21, 0x00000000, 0,
1, 0x00, 0x1C, 0) # 波特率125 Kbps 0x03 0x1C ||| 波特率500 Kbps 0x00 0x1C
ret_can0_init = canDLL.VCI_InitCAN(VCI_USBCAN2, 0, 0, byref(vci_init_config))
if ret_can0_init == STATUS_OK:
print('调用 VCI_InitCAN1成功')
if ret_can0_init != STATUS_OK:
print('调用 VCI_InitCAN1出错')
ret_can0_start = canDLL.VCI_StartCAN(VCI_USBCAN2, 0, 0)
if ret_can0_start == STATUS_OK:
print('调用 VCI_StartCAN1成功')
if ret_can0_start != STATUS_OK:
print('调用 VCI_StartCAN1出错')
if TestMode == 'true':
# 初始1通道
ret_can1_init = canDLL.VCI_InitCAN(VCI_USBCAN2, 0, 1, byref(vci_init_config))
if ret_can1_init == STATUS_OK:
print('调用 VCI_InitCAN2 成功')
if ret_can1_init != STATUS_OK:
print('调用 VCI_InitCAN2 出错')
ret_can1_start = canDLL.VCI_StartCAN(VCI_USBCAN2, 0, 1)
if ret_can1_start == STATUS_OK:
print('调用 VCI_StartCAN2 成功')
if ret_can1_start != STATUS_OK:
print('调用 VCI_StartCAN2 出错')
# 通道1发送数据
def send_can1():
ubyte_array = c_ubyte * 8
ubyte_3array = c_ubyte * 3
NoResCount = 0
global STEP
global CURRENT_STEP
global can_id
global seed_size
global key_k
while 'true':
# step0xf1: wakeup
if STEP == 0xF1:
a = ubyte_array(0x50, 0x14, 0x02, 0x20, 0x44, 0xF1, 0x2F, 0x15)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(0x121, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret1 = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
ret2 = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
ret3 = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
ret4 = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
# 唤醒信号 是各厂家自定义的。这里是随便写的
if ret1 == STATUS_OK and ret2 == STATUS_OK and ret3 == STATUS_OK and ret4 == STATUS_OK:
print('\r\nCAN1通道发送4 wake up messages成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x01
time.sleep(0.3)
continue
# step0x01: enter extension diagConversation
if STEP == 0x01:
a = ubyte_array(0x02, 0x10, 0x03, 0x55, 0x55, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('\r\nCAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# step0x03: update condition
elif STEP == 0x03:
a = ubyte_array(0x04, 0x31, 0x01, 0x02, 0x03, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
# STEP = 0x05
time.sleep(0.3)
# continue
if ret != STATUS_OK:
print('CAN1通道发送失败')
break
# step0x05: DTC OFF
elif STEP == 0x05:
a = ubyte_array(0x02, 0x85, 0x82, 0x55, 0x55, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(fun_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x06
time.sleep(0.3)
if ret != STATUS_OK:
print('CAN1通道发送失败')
break
# step0x06: forbid transfer
elif STEP == 0x06:
a = ubyte_array(0x03, 0x28, 0x81, 0x01, 0x55, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(fun_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x07
time.sleep(0.3)
if ret != STATUS_OK:
print('CAN1通道发送失败')
break
# step0x07: ask seed
elif STEP == 0x07:
a = ubyte_array(0x02, 0x27, 0x01, 0x55, 0x55, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print('CAN1通道发送失败')
break
# step0x71: send key
elif STEP == 0x71:
if seed_size == 4:
key = seed # 这里是厂商提供的算法
print('after calculate,key is: %#x' % key)
key0 = (key >> 24) % 256
key1 = (key >> 16) % 256
key2 = (key >> 8) % 256
key3 = key % 256
print('key0:%#x' % key0)
print('key1:%#x' % key1)
print('key2:%#x' % key2)
print('key3:%#x' % key3)
a = ubyte_array(0x06, 0x27, 0x04, key0, key1, key2, key3, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
elif seed_size == 2:
key = seed # 这里是厂商提供的算法
print('after calculate,key is: %#x' % key)
key0 = (key >> 8) % 256
key1 = key % 256
print('key0:%#x' % key0)
print('key1:%#x' % key1)
a = ubyte_array(0x06, 0x27, 0x02, key0, key1, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
else:
print("seed size unknown")
STEP = 0x00
break
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# step0x08: enter codeConversation
elif STEP == 0x08:
a = ubyte_array(0x02, 0x10, 0x02, 0x55, 0x55, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# step0x09: ask seed
elif STEP == 0x09:
a = ubyte_array(0x02, 0x27, 0x01, 0x55, 0x55, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# step0x91: send key
elif STEP == 0x91:
if seed_size == 4:
key = seed # 这里是厂商提供的算法
print('after calculate,key is: %#x' % key)
key0 = (key >> 24) % 256
key1 = (key >> 16) % 256
key2 = (key >> 8) % 256
key3 = key % 256
print('key0:%#x' % key0)
print('key1:%#x' % key1)
print('key2:%#x' % key2)
print('key3:%#x' % key3)
a = ubyte_array(0x06, 0x27, 0x04, key0, key1, key2, key3, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
elif seed_size == 2:
key = seed # 这里是厂商提供的算法
print('after calculate,key is: %#x' % key)
key0 = (key >> 8) % 256
key1 = key % 256
print('key0:%#x' % key0)
print('key1:%#x' % key1)
a = ubyte_array(0x06, 0x27, 0x02, key0, key1, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
else:
print("seed size unknown")
STEP = 0x00
break
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print('CAN1通道发送失败')
break
# step0x10: ask to UPLOAD flash firstframe
elif STEP == 0x10:
a = ubyte_array(0x10, 0x0b, 0x35, 0x00, 0x44, 0x00, 0xc1, 0x00) # 这里有期望upload的起始地址
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# step0x11: ask to UPLOAD flash second frame
elif STEP == 0x11:
a = ubyte_array(0x21, 0x00, 0x00, 0x00, 0x00, 0x40, 0x55, 0x55) # 这里有期望upload的长度信息
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# step0x12: transfer flash traffic control frame
elif STEP == 0x12:
a = ubyte_array(0x30, 0x00, 0x00, 0x03, 0xd2, 0x55, 0x55, 0x55)
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 0, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('CAN1通道发送成功')
print('STEP is %#x' % STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
CURRENT_STEP = STEP
STEP = 0x00
time.sleep(0.3)
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# STEP0x00: 还没有收到ECU回复
elif STEP == 0x00:
# print("[WAITING RESPONSE]VCI_Receive have not found corresponding ECU response!\n")
if NoResCount < NO_RES_COUNT_MAX:
NoResCount = NoResCount + 1
time.sleep(1)
else:
print("VCI_Receive have not found corresponding ECU response many times,test failed!!!")
break
# step0xFF: 所有流程结束
elif STEP == 0xFF:
CURRENT_STEP = STEP
time.sleep(1)
print("VCI_Receive have found all corresponding ECU response!\n test SUCESS!!!\n")
break
# 通道1接收数据
def receive_can1():
global STEP
global CURRENT_STEP
global seed
global seed_size
rx_vci_can_obj = VCI_CAN_OBJ_ARRAY(2500) # 结构体数组
while 'true':
time.sleep(0.03)
ret = canDLL.VCI_Receive(VCI_USBCAN2, 0, 0, byref(rx_vci_can_obj.ADDR), 2500, 0)
while ret > 0: # 接收到数目为 ret帧 的数据,一个等待周期的缓存数据可能不止一条
print('\r\nCAN1通道接收成功', ret)
for i in range(0, ret):
print("%#x" % rx_vci_can_obj.STRUCT_ARRAY[i].ID,
list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
if CURRENT_STEP == 0x01 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x50 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x03:
print("STEP[0X01] extension diagConversation resp received!\n")
STEP = 0X03
ret = 0
break
elif CURRENT_STEP == 0x03 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x71 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x01:
print("STEP[0X03] update condition resp received!\n")
STEP = 0X05
ret = 0
break
elif CURRENT_STEP == 0x07 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x06 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[
2] == 0x01:
print("STEP[0X07] ask seed resp 4 seeds received!\n")
seed0 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[3]
seed1 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[4]
seed2 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[5]
seed3 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[6]
seed = (seed0 << 24) + (seed1 << 16) + (seed2 << 8) + seed3
print('seed:%#x' % seed)
seed_size = 4
STEP = 0X71
ret = 0
break
elif CURRENT_STEP == 0x07 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x04 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[
2] == 0x01:
print("STEP[0X07] ask seed resp 2 seeds received!\n")
seed0 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[3]
seed1 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[4]
seed = (seed1 << 8) + seed2
print('seed:%#x' % seed)
seed_size = 2
STEP = 0X71
ret = 0
break
elif CURRENT_STEP == 0x71 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x02:
print("STEP[0X71] send key resp received!\n")
STEP = 0X08
ret = 0
break
elif CURRENT_STEP == 0x08 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x50 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x02:
print("STEP[0X08] enter codeConversation resp received!\n")
STEP = 0X09
ret = 0
break
elif CURRENT_STEP == 0x09 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x06 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[
2] == 0x01:
print("STEP[0X09] ask seed resp 4 seeds received!\n")
seed0 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[3]
seed1 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[4]
seed2 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[5]
seed3 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[6]
seed = (seed0 << 24) + (seed1 << 16) + (seed2 << 8) + seed3
print('seed:%#x' % seed)
seed_size = 4
STEP = 0X91
ret = 0
break
elif CURRENT_STEP == 0x09 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x04 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[
2] == 0x01:
print("STEP[0X09] ask seed resp 2 seeds received!\n")
seed0 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[3]
seed1 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[4]
seed = (seed1 << 8) + seed2
print('seed:%#x' % seed)
seed_size = 2
STEP = 0X91
ret = 0
break
elif CURRENT_STEP == 0x91 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x02:
print("STEP[0X91] send key resp received!\n")
STEP = 0X10
ret = 0
break
elif CURRENT_STEP == 0x10 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x30:
print("STEP[0X10] ask to UPLOAD flash stream control frame resp received!!\n")
STEP = 0X11
ret = 0
break
elif CURRENT_STEP == 0x11 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x36:
print("STEP[0X11] flash transport resp(flash data 1th frame) received!\n")
STEP = 0X12
ret = 0
break
elif CURRENT_STEP == 0x12 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x21:
print("STEP[0X12]flash transport resp(flash data 2th frame) received!\n")
ret = 0
STEP = 0XFF
break
else:
print("CAN1 received unknown data")
time.sleep(0.2)
ret = ret - 1
# 通道2作为模拟器接收数据
def receive_can2():
global CURRENT_STEP
rx_vci_can_obj = VCI_CAN_OBJ_ARRAY(2500) # 结构体数组
while 'true':
ret = canDLL.VCI_Receive(VCI_USBCAN2, 0, 1, byref(rx_vci_can_obj.ADDR), 2500, 0)
while ret > 0: # 接收到一帧数据
# print('\r\n【模拟器】CAN2通道接收成功')
# print("%#x" % rx_vci_can_obj.STRUCT_ARRAY[0].ID, list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[0].Data))
time.sleep(0.1)
ubyte_array1 = c_ubyte * 8
ubyte_3array1 = c_ubyte * 3
if CURRENT_STEP == 0x01:
a = ubyte_array1(0x06, 0x50, 0x03, 0x00, 0x32, 0x01, 0xf4, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0xF1:
break
if CURRENT_STEP == 0x03:
a_pend = ubyte_array1(0x03, 0x7f, 0x31, 0x78, 0xaa, 0xaa, 0xaa, 0xaa)
a = ubyte_array1(0x04, 0x71, 0x01, 0x02, 0x03, 0xaa, 0xaa, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a_pend, b) # 单次发送
canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
time.sleep(0.05)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0x05:
# 7DF消息不需要响应
break
if CURRENT_STEP == 0x06:
# 7DF消息不需要响应
break
if CURRENT_STEP == 0X07:
a = ubyte_array1(0x06, 0x67, 0x01, 0xff, 0xff, 0xff, 0xff, 0xaa)
# a = ubyte_array1(0x04, 0x67, 0x01, 0xff, 0xff, 0xaa, 0xaa, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0X71:
a = ubyte_array1(0x02, 0x67, 0x02, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0X08:
a = ubyte_array1(0x06, 0x50, 0x02, 0x00, 0x32, 0x01, 0xf4, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0X09:
# a = ubyte_array1(0x06, 0x67, 0x01, 0xff, 0xff, 0xff, 0xff, 0xaa)
a = ubyte_array1(0x04, 0x67, 0x01, 0xff, 0xff, 0xaa, 0xaa, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0X91:
a = ubyte_array1(0x02, 0x67, 0x02, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0X10:
a = ubyte_array1(0x30, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0x11:
a = ubyte_array1(0x10, 0x42, 0x36, 0x01, 0xcc, 0xcc, 0xcc, 0xcc)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
if CURRENT_STEP == 0X12:
a = ubyte_array1(0x21, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc)
b = ubyte_3array1(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(can_id + 8, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, 1, byref(vci_can_obj), 1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP%#x' % CURRENT_STEP)
# print("%#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
elif CURRENT_STEP == 0XFF:
break
else:
print('【模拟器】CURRENT_STEP unknown!%#x' % CURRENT_STEP)
break
if __name__ == "__main__":
CanDLLName = './ControlCAN.dll' # 把DLL放到对应的目录下
canDLL = windll.LoadLibrary('./ControlCAN.dll')
print(CanDLLName)
can_init_and_start()
t1 = threading.Thread(target=send_can1)
t2 = threading.Thread(target=receive_can1)
t3 = threading.Thread(target=receive_can2)
t1.start()
t2.start()
if TestMode == 'true':
t3.start()
t1.join()
t2.join()
if TestMode == 'true':
t3.join()
# 关闭
canDLL.VCI_CloseDevice(VCI_USBCAN2, 0)
调试结果:
D:\Users\PycharmProjects\CX_CANTEST\venv\Scripts\python.exe D:/Users//PycharmProjects/CX_CANTEST/main.py
./ControlCAN.dll
调用 VCI_OpenDevice成功
调用 VCI_InitCAN1成功
调用 VCI_StartCAN1成功
调用 VCI_InitCAN2 成功
调用 VCI_StartCAN2 成功
CAN1通道发送4 wake up messages成功
STEP is 0xf1
0x12d [ 0x50,0x14,0x02,0x20,0x44,0xF1,0x2F,0x15, ]
CAN1通道发送成功
STEP is 0x1
0x777 [ 0x02,0x10,0x03,0x55,0x55,0x55,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x06,0x50,0x03,0x00,0x32,0x01,0xF4,0xAA, ]
STEP[0X01] extension diagConversation resp received!
CAN1通道发送成功
STEP is 0x3
0x777 [ 0x04,0x31,0x01,0x02,0x03,0x55,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x03,0x7F,0x31,0x78,0xAA,0xAA,0xAA,0xAA, ]
CAN1 received unknown data
CAN1通道接收成功 1
0x77f [ 0x04,0x71,0x01,0x02,0x03,0xAA,0xAA,0xAA, ]
STEP[0X03] update condition resp received!
CAN1通道发送成功
STEP is 0x5
0x7df [ 0x02,0x85,0x82,0x55,0x55,0x55,0x55,0x55, ]
CAN1通道发送成功
STEP is 0x6
0x7df [ 0x03,0x28,0x81,0x01,0x55,0x55,0x55,0x55, ]
CAN1通道发送成功
STEP is 0x7
0x777 [ 0x02,0x27,0x01,0x55,0x55,0x55,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x06,0x67,0x01,0xFF,0xFF,0xFF,0xFF,0xAA, ]
STEP[0X07] ask seed resp 4 seeds received!
seed:0xffffffff
after calculate,key is: 0xffffffff
key0:0xff
key1:0xff
key2:0xff
key3:0xff
CAN1通道发送成功
STEP is 0x71
0x777 [ 0x06,0x27,0x04,0xFF,0xFF,0xFF,0xFF,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x02,0x67,0x02,0xAA,0xAA,0xAA,0xAA,0xAA, ]
STEP[0X71] send key resp received!
CAN1通道发送成功
STEP is 0x8
0x777 [ 0x02,0x10,0x02,0x55,0x55,0x55,0x55,0x55, ]
【模拟器】CAN2通道发送成功 STEP0X0x8
0x77f [ 0x06,0x50,0x02,0x00,0x32,0x01,0xF4,0xAA, ]
CAN1通道接收成功 1
0x77f [ 0x06,0x50,0x02,0x00,0x32,0x01,0xF4,0xAA, ]
STEP[0X08] enter codeConversation resp received!
CAN1通道发送成功
STEP is 0x9
0x777 [ 0x02,0x27,0x01,0x55,0x55,0x55,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x04,0x67,0x01,0xFF,0xFF,0xAA,0xAA,0xAA, ]
STEP[0X09] ask seed resp 2 seeds received!
seed:0xffff
after calculate,key is: 0xffff
key0:0xff
key1:0xff
CAN1通道发送成功
STEP is 0x91
0x777 [ 0x06,0x27,0x02,0xFF,0xFF,0x55,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x02,0x67,0x02,0xAA,0xAA,0xAA,0xAA,0xAA, ]
STEP[0X91] send key resp received!
CAN1通道发送成功
STEP is 0x10
0x777 [ 0x10,0x0B,0x35,0x00,0x44,0x00,0xC1,0x00, ]
CAN1通道接收成功 1
0x77f [ 0x30,0x00,0x00,0x00,0x00,0x00,0x00,0x00, ]
STEP[0X10] ask to UPLOAD flash stream control frame resp received!!
CAN1通道发送成功
STEP is 0x11
0x777 [ 0x21,0x00,0x00,0x00,0x00,0x40,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x10,0x42,0x36,0x01,0xCC,0xCC,0xCC,0xCC, ]
STEP[0X11] flash transport resp(flash data 1th frame) received!
CAN1通道发送成功
STEP is 0x12
0x777 [ 0x30,0x00,0x00,0x03,0xD2,0x55,0x55,0x55, ]
CAN1通道接收成功 1
0x77f [ 0x21,0xCC,0xCC,0xCC,0xCC,0xCC,0xCC,0xCC, ]
STEP[0X12]flash transport resp(flash data 2th frame) received!
VCI_Receive have found all corresponding ECU response!
test SUCESS!!!
附:
一些可以稍作修改用于测试的代码记录:
# python3.8.0 64位(python 32位要用32位的DLL)
#
# 结构体数组类
import ctypes
from ctypes import *
import threading
import time
import re
VCI_USBCAN2 = 4
STATUS_OK = 1
NO_RES_COUNT_MAX = 3 # 等待响应的时间(周期)上限
STEP = 0xB5 # 起始步骤
# step0xA0: 判断有哪些ECUID主动向外汇报状态
# step0xA1: 判断有多少ECU存在
# step0xA2: 获取ECU信息 vin、ecu序列号、硬件版本号、软件版本号、ECU安装日期
# step0xA3: 判断有多少ECU可以提升到扩展会话
# step0xA4: 判断有多少ECU可以请求安全解锁
# step0xB1: ECU信息遍历获取
# step0xB2: 判断是否有ECU可以直接提升到编程会话
# step0xB3: 判断ECU安全解锁种子是否有规律
# step0xB4: 判断ECU安全解锁尝试暴力破解
# step0xB5: 发现了安全解锁算法后对ECU安全解锁1
# step0xB5: 发现了安全解锁算法后对ECU安全解锁2
# step0xB6: ECU REST
# step0xB7: ECU REST
# step0xB8: 对单个ECU发送遍历报文
# step0xB9: ECU RESET测试
# STEP0x00: 还没有收到ECU回复
# step0xFF: 所有流程结束
CURRENT_STEP = 0X00 # 当前所处的CAN数据发送和接收的步骤,“接收线程“会根据”当前步骤”和“CAN数据内容”判断是否是正确响应
can_id = 0x720 # ECU的CANID,每个ECU不一样
key_k = 29 # keyK is used to calculate key from seed
fun_id = 0x7df # 功能寻址的CANID,固定值
B_STOP_RECEIVE = 0
seed = 0 # UDS 0X27,ECU返回过来的seed
seed_size = 0 # UDS 0X27,ECU返回过来的seed位数,一般是2或者4
currentBlock = 2
blockCount = 3
canid_list_report = []
canid_list = []
canid_list_askvin = []
canid_list_extendsession = []
canid_list_respseed = []
# 全局变量
# TestMode = 'true' # 是否是测试模式,测试模式时,CAN1 CAN2直连。非测试模式时CAN2不使用。
TestMode = 'false'
class VCI_INIT_CONFIG(Structure):
_fields_ = [("AccCode", c_uint),
("AccMask", c_uint),
("Reserved", c_uint),
("Filter", c_ubyte),
("Timing0", c_ubyte),
("Timing1", c_ubyte),
("Mode", c_ubyte)
]
class VCI_CAN_OBJ(Structure):
_fields_ = [("ID", c_uint),
("TimeStamp", c_uint),
("TimeFlag", c_ubyte),
("SendType", c_ubyte),
("RemoteFlag", c_ubyte),
("ExternFlag", c_ubyte),
("DataLen", c_ubyte),
("Data", c_ubyte * 8),
("Reserved", c_ubyte * 3)
]
class VCI_CAN_OBJ_ARRAY(Structure):
_fields_ = [('SIZE', ctypes.c_uint16), ('STRUCT_ARRAY', ctypes.POINTER(VCI_CAN_OBJ))]
def __init__(self, num_of_structs):
# 这个括号不能少
self.STRUCT_ARRAY = ctypes.cast((VCI_CAN_OBJ * num_of_structs)(), ctypes.POINTER(VCI_CAN_OBJ)) # 结构体数组
self.SIZE = num_of_structs # 结构体长度
self.ADDR = self.STRUCT_ARRAY[0] # 结构体数组地址 byref()转c地址
def list_to_hex_string(list_data):
list_str = '[ '
for x in list_data:
list_str += '0x{:02X},'.format(x)
list_str += ' ]'
return list_str
def vci_transmit(canid, data, channel):
ubyte_array = c_ubyte * 8
ubyte_3array = c_ubyte * 3
a = ubyte_array(data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7])
b = ubyte_3array(0, 0, 0)
vci_can_obj = VCI_CAN_OBJ(canid, 0, 0, 0, 0, 0, 8, a, b) # 单次发送
print("send %#x" % vci_can_obj.ID, list_to_hex_string(vci_can_obj.Data))
ret = canDLL.VCI_Transmit(VCI_USBCAN2, 0, channel, byref(vci_can_obj), 1)
return ret
def readfile_transmit(filename, canid):
f = open(filename, 'r')
all_data = f.readlines()
for everyline in all_data:
everyline = everyline.replace("\n", "")
everyline = everyline.replace(" ", "")
everyline = everyline[1:-1]
eveyline1 = []
everyline1 = re.split(',', everyline)
for i in range(0, 8):
everyline1[i] = int(everyline1[i], 16)
if everyline1[2] == 0x36:
print("sleep0.5")
ret = vci_transmit(canid=canid, data=everyline1, channel=0)
if ret == 0:
print("vci_transmit fail!")
return
return ret
def can_init_and_start():
ret_open_device = canDLL.VCI_OpenDevice(VCI_USBCAN2, 0, 0)
if ret_open_device == STATUS_OK:
print('调用 VCI_OpenDevice成功')
if ret_open_device != STATUS_OK:
print('调用 VCI_OpenDevice出错')
if STEP == 0xA2 or STEP >= 0xB1:
# 初始0通道
vci_init_config = VCI_INIT_CONFIG((can_id + 8) << 21, 0x00000000, 0,
1, 0x00, 0x1C, 0) # 波特率125 Kbps 0x03 0x1C ||| 波特率500 Kbps 0x00 0x1C
else:
vci_init_config = VCI_INIT_CONFIG((can_id + 8) << 21, 0xFFFFFFFF, 0,
1, 0x00, 0x1C, 0) # 波特率125 Kbps 0x03 0x1C ||| 波特率500 Kbps 0x00 0x1C
ret_can0_init = canDLL.VCI_InitCAN(VCI_USBCAN2, 0, 0, byref(vci_init_config))
if ret_can0_init == STATUS_OK:
print('调用 VCI_InitCAN1成功')
if ret_can0_init != STATUS_OK:
print('调用 VCI_InitCAN1出错')
ret_can0_start = canDLL.VCI_StartCAN(VCI_USBCAN2, 0, 0)
if ret_can0_start == STATUS_OK:
print('调用 VCI_StartCAN1成功')
if ret_can0_start != STATUS_OK:
print('调用 VCI_StartCAN1出错')
if TestMode == 'true':
# 初始1通道
ret_can1_init = canDLL.VCI_InitCAN(VCI_USBCAN2, 0, 1, byref(vci_init_config))
if ret_can1_init == STATUS_OK:
print('调用 VCI_InitCAN2 成功')
if ret_can1_init != STATUS_OK:
print('调用 VCI_InitCAN2 出错')
ret_can1_start = canDLL.VCI_StartCAN(VCI_USBCAN2, 0, 1)
if ret_can1_start == STATUS_OK:
print('调用 VCI_StartCAN2 成功')
if ret_can1_start != STATUS_OK:
print('调用 VCI_StartCAN2 出错')
# 通道1发送数据
def send_can1():
ubyte_array = c_ubyte * 8
ubyte_3array = c_ubyte * 3
NoResCount = 0
global STEP
global CURRENT_STEP
global can_id
global seed_size
global key_k
global B_STOP_RECEIVE
data_detect = [0x02, 0x10, 0x01, 0x55, 0x55, 0x55, 0x55, 0x55]
data_handshake = [0x02, 0x3e, 0x00, 0x55, 0x55, 0x55, 0x55, 0x55]
data_askvin = [0x03, 0x22, 0xF1, 0x90, 0x55, 0x55, 0x55, 0x55]
data_askecunumber = [0x03, 0x22, 0xF1, 0x8C, 0x55, 0x55, 0x55, 0x55]
data_askecusoftversion = [0x03, 0x22, 0xF1, 0x93, 0x55, 0x55, 0x55, 0x55]
data_askecuhardversion = [0x03, 0x22, 0xF1, 0x95, 0x55, 0x55, 0x55, 0x55]
data_askecuinstalldata = [0x03, 0x22, 0xF1, 0x9D, 0x55, 0x55, 0x55, 0x55]
data_enter03session = [0x02, 0x10, 0x03, 0x55, 0x55, 0x55, 0x55, 0x55]
data_entercodesession = [0x02, 0x10, 0x02, 0x55, 0x55, 0x55, 0x55, 0x55]
data_askseed = [0x02, 0x27, 0x01, 0x55, 0x55, 0x55, 0x55, 0x55]
data_fluidcontrol = [0x30, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
while 'true':
# step0xA0:判断有哪些ECUID主动向外汇报状态
if STEP == 0xA0:
data = [0x00, 0x00, 0x00, 0x55, 0x55, 0x55, 0x55, 0x55]
vci_transmit(canid=0x700, data=data, channel=0)
CURRENT_STEP = STEP
time.sleep(20)
print('canid received is:', list_to_hex_string(canid_list_report))
canid_list.sort()
print('canid sorted is:', list_to_hex_string(canid_list_report))
with open("eculist_report.txt", "w+") as file:
for j in canid_list_report:
file.write(hex(j))
file.write("\n")
STEP = 0xFF
# step0xA1: 判断有多少ECU存在
if STEP == 0xA1:
vci_transmit(canid=fun_id, data=data_detect, channel=0)
ret = vci_transmit(canid=fun_id, data=data_handshake, channel=0)
if ret == STATUS_OK:
print('STEP[%#x] CAN1通道广播发送handshake成功' % STEP)
CURRENT_STEP = STEP
time.sleep(5)
print('canid received is:', list_to_hex_string(canid_list))
canid_list.sort()
print('canid sorted is:', list_to_hex_string(canid_list))
with open("eculist.txt", "w+") as file:
for j in canid_list:
file.write(hex(j))
file.write("\n")
STEP = 0xFF
# STEP = 0xA2
if ret != STATUS_OK:
print('STEP[%#x] transmit fail!' % STEP)
return
# step0xA2: 获取ECU信息
elif STEP == 0xA2:
ecuinfo_list = [0x720]
CURRENT_STEP = STEP
for id in ecuinfo_list:
print("***************************************ask vin")
vci_transmit(canid=id, data=data_askvin, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_fluidcontrol, channel=0)
time.sleep(3)
print("***************************************ask ecunumber")
vci_transmit(canid=id, data=data_askecunumber, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_fluidcontrol, channel=0)
time.sleep(3)
print("***************************************ask ecusoftversion")
vci_transmit(canid=id, data=data_askecusoftversion, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_fluidcontrol, channel=0)
time.sleep(3)
print("***************************************ask askecuhardversion")
vci_transmit(canid=id, data=data_askecuhardversion, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_fluidcontrol, channel=0)
time.sleep(3)
print("***************************************ask ecuintalldata")
vci_transmit(canid=id, data=data_askecuinstalldata, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_fluidcontrol, channel=0)
time.sleep(3)
time.sleep(2)
STEP = 0xFF
# STEP = 0xA3
# step0xA3: 判断有多少ECU可以提升到扩展会话
elif STEP == 0xA3:
ret = vci_transmit(canid=fun_id, data=data_enter03session, channel=0)
if ret == STATUS_OK:
print('STEP[%#x] CAN1通道广播发送请求进入扩展会话成功' % STEP)
CURRENT_STEP = STEP
time.sleep(5)
print('canid received is:', list_to_hex_string(canid_list_extendsession))
canid_list_extendsession.sort()
print('canid sorted is:', list_to_hex_string(canid_list_extendsession))
with open("eculist_extendsession.txt", "w+") as file:
for j in canid_list_extendsession:
file.write(hex(j - 8))
file.write("/")
file.write(hex(j))
file.write("\n")
STEP = 0xFF
# STEP = 0xA4
if ret != STATUS_OK:
print('STEP[%#x] transmit fail!' % STEP)
return
# step0xA4: 判断有多少ECU可以请求安全解锁
elif STEP == 0xA4:
CURRENT_STEP = STEP
for each_canid in [0x721]:
vci_transmit(canid=each_canid, data=data_enter03session, channel=0)
time.sleep(0.2)
ret = vci_transmit(canid=each_canid, data=data_askseed, channel=0)
if ret == STATUS_OK:
print('STEP[%#x] CAN1通道广播发送请求种子成功' % STEP)
CURRENT_STEP = STEP
time.sleep(2)
print('canid received is:', list_to_hex_string(canid_list_respseed))
canid_list_respseed.sort()
print('canid sorted is:', list_to_hex_string(canid_list_respseed))
with open("eculist_respseed.txt", "w+") as file:
for j in canid_list_respseed:
file.write(hex(j))
file.write("\n")
STEP = 0xFF
if ret != STATUS_OK:
print('STEP[%#x] transmit fail!' % STEP)
return
# step0xB1: 遍历获取ECU信息
elif STEP == 0xB1:
ecuinfo_list = [0x721]
CURRENT_STEP = STEP
for id in ecuinfo_list:
for i in range(0xF1, 0xF2, 1):
for j in range(255):
data = [0x03, 0x22, i, j, 0x55, 0x55, 0x55, 0x55]
vci_transmit(canid=id, data=data, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_fluidcontrol, channel=0)
time.sleep(0.1)
time.sleep(5)
STEP = 0xFF
# STEP = 0xA3
# step0xB2: 判断是否有ECU可以直接提升到编程会话
elif STEP == 0xB2:
ecuinfo_list = [0X721]
CURRENT_STEP = STEP
for id in ecuinfo_list:
vci_transmit(canid=id, data=data_enter03session, channel=0)
time.sleep(0.2)
vci_transmit(canid=id, data=data_entercodesession, channel=0)
time.sleep(0.1)
time.sleep(3)
STEP = 0xFF
# step0xB3:判断ECU安全解锁种子是否有规律
elif STEP == 0xB3:
ecuinfo_list = [0x721]
CURRENT_STEP = STEP
for id in ecuinfo_list:
vci_transmit(canid=id, data=data_enter03session, channel=0)
time.sleep(0.2)
for i in range(200):
vci_transmit(canid=id, data=data_askseed, channel=0)
time.sleep(0.1)
time.sleep(0.2)
time.sleep(3)
STEP = 0xFF
# step0xB4:判断ECU安全解锁尝试暴力破解
elif STEP == 0xB4:
CURRENT_STEP = STEP
canid_test = 0x721
vci_transmit(canid=canid_test, data=data_enter03session, channel=0)
time.sleep(0.2)
vci_transmit(canid=canid_test, data=data_askseed, channel=0)
for i in range(255):
for j in range(255):
data = [0x04, 0x27, 0x02, i, j, 0x55, 0x55, 0x55]
vci_transmit(canid=canid_test, data=data, channel=0)
# time.sleep(0.1)
time.sleep(3)
STEP = 0x00
# step0xB5:发现了安全解锁算法后对ECU安全解锁1
elif STEP == 0xB5:
canid_test = 0x720
vci_transmit(canid=canid_test, data=data_enter03session, channel=0)
time.sleep(0.2)
vci_transmit(canid=canid_test, data=data_askseed, channel=0)
CURRENT_STEP = STEP
STEP = 0x00
# step0xB6:发现了安全解锁算法后对ECU安全解锁2
elif STEP == 0xB6:
if seed_size == 4:
key = key_k ^ seed
key0 = (key >> 24) % 256
key1 = (key >> 16) % 256
key2 = (key >> 8) % 256
key3 = key % 256
print('key0:%#x' % key0)
print('key1:%#x' % key1)
print('key2:%#x' % key2)
print('key3:%#x' % key3)
data = [0x06, 0x27, 0x04, key0, key1, key2, key3, 0x55]
elif seed_size == 2:
key = key_k ^ seed
key0 = (key >> 8) % 256
key1 = key % 256
print('key0:%#x' % key0)
print('key1:%#x' % key1)
data = [0x06, 0x27, 0x02, key0, key1, 0x55, 0x55, 0x55]
else:
print("seed size unknown")
STEP = 0xFF
break
ret = vci_transmit(canid=can_id, data=data, channel=0)
if ret == STATUS_OK:
print('STEP[%#x] CAN1通道发送send key成功' % STEP)
CURRENT_STEP = STEP
time.sleep(0.3)
STEP = 0xFF
continue
if ret != STATUS_OK:
print(STEP, 'CAN1通道发送失败')
break
# STEP0x00: 还没有收到ECU回复
elif STEP == 0x00:
print("[WAITING RESPONSE]VCI_Receive have not found corresponding ECU response!\n")
if NoResCount < NO_RES_COUNT_MAX:
NoResCount = NoResCount + 1
time.sleep(1)
else:
print("VCI_Receive have not found corresponding ECU response many times,test failed!!!")
break
# step0xFF: 所有流程结束
elif STEP == 0xFF:
CURRENT_STEP = STEP
time.sleep(1)
print("VCI_Receive have found all corresponding ECU response!\n test SUCESS!!!\n")
B_STOP_RECEIVE = 1
return
# 通道1接收数据
def receive_can1():
global STEP
global CURRENT_STEP
global seed
global seed_size
global B_STOP_RECEIVE
rx_vci_can_obj = VCI_CAN_OBJ_ARRAY(2500) # 结构体数组
while 'true':
time.sleep(0.1)
if B_STOP_RECEIVE == 1:
print("stop receive")
return
ret = canDLL.VCI_Receive(VCI_USBCAN2, 0, 0, byref(rx_vci_can_obj.ADDR), 2500, 0)
while ret > 0: # 接收到数目为 ret帧 的数据,一个等待周期的缓存数据可能不止一条
print('\r\nCAN1通道接收成功,数目是:', ret)
for i in range(0, ret):
print("%#x" % rx_vci_can_obj.STRUCT_ARRAY[i].ID,
list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
with open("all_receive.txt", "a") as file:
file.write(str(hex(rx_vci_can_obj.STRUCT_ARRAY[i].ID)))
file.write(list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
file.write("\n")
if CURRENT_STEP == 0xA0:
if rx_vci_can_obj.STRUCT_ARRAY[i].ID not in canid_list_report:
canid_list_report.append(rx_vci_can_obj.STRUCT_ARRAY[i].ID)
if CURRENT_STEP == 0xA1:
if rx_vci_can_obj.STRUCT_ARRAY[i].ID not in canid_list:
canid_list.append(rx_vci_can_obj.STRUCT_ARRAY[i].ID)
elif CURRENT_STEP == 0xA2 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0xF1:
with open("ECU_info.txt", "a") as file:
file.write(str(hex(rx_vci_can_obj.STRUCT_ARRAY[i].ID)))
file.write(list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
file.write("\n")
elif CURRENT_STEP == 0xA3 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x50 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x03:
if rx_vci_can_obj.STRUCT_ARRAY[i].ID not in canid_list_extendsession:
canid_list_extendsession.append(rx_vci_can_obj.STRUCT_ARRAY[i].ID)
elif CURRENT_STEP == 0xA4 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x01:
if rx_vci_can_obj.STRUCT_ARRAY[i].ID not in canid_list_respseed:
canid_list_respseed.append(rx_vci_can_obj.STRUCT_ARRAY[i].ID)
elif CURRENT_STEP == 0xB1 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0xF1:
with open("ECU_info_all.txt", "a") as file:
file.write(str(hex(rx_vci_can_obj.STRUCT_ARRAY[i].ID)))
file.write(list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
file.write("\n")
elif CURRENT_STEP == 0xB2 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x50 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x02:
with open("ECU_info_code_session.txt", "a") as file:
file.write(str(hex(rx_vci_can_obj.STRUCT_ARRAY[i].ID)))
file.write(list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
file.write("\n")
elif CURRENT_STEP == 0xB3 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x01:
with open("ECU_info_respseed.txt", "a") as file:
file.write(str(hex(rx_vci_can_obj.STRUCT_ARRAY[i].ID)))
file.write(list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
file.write("\n")
elif CURRENT_STEP == 0xB4 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x02:
with open("ECU_info_respkey.txt", "a") as file:
file.write(str(hex(rx_vci_can_obj.STRUCT_ARRAY[i].ID)))
file.write(list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[i].Data))
file.write("\n")
elif CURRENT_STEP == 0xB5 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x01:
if rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x06:
print("STEP[0X07] ask seed resp 4 seeds received!\n")
seed0 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[3]
seed1 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[4]
seed2 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[5]
seed3 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[6]
seed = (seed0 << 24) + (seed1 << 16) + (seed2 << 8) + seed3
print('seed:%#x' % seed)
seed_size = 4
elif rx_vci_can_obj.STRUCT_ARRAY[i].Data[0] == 0x04:
print("STEP[0X07] ask seed resp 2 seeds received!\n")
seed0 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[3]
seed1 = rx_vci_can_obj.STRUCT_ARRAY[i].Data[4]
seed = (seed0 << 8) + seed1
print('seed:%#x' % seed)
seed_size = 2
STEP = 0xB6
elif CURRENT_STEP == 0xB6 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x67 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x02:
print("STEP[0X07] key check sucess received!\n")
STEP = 0xFF
elif CURRENT_STEP == 0x01 and rx_vci_can_obj.STRUCT_ARRAY[i].Data[1] == 0x50 and \
rx_vci_can_obj.STRUCT_ARRAY[i].Data[2] == 0x03:
print("STEP[0X01] 03 diagConversation resp received!\n")
STEP = 0X03
ret = 0
break
else:
# print("CAN1 received unknown data")
# time.sleep(0.2)
pass
ret = ret - 1
# 通道2作为模拟器接收数据
def receive_can2():
global CURRENT_STEP
rx_vci_can_obj = VCI_CAN_OBJ_ARRAY(2500) # 结构体数组
while 'true':
ret = canDLL.VCI_Receive(VCI_USBCAN2, 0, 1, byref(rx_vci_can_obj.ADDR), 2500, 0)
while ret > 0: # 接收到一帧数据
# print('\r\n【模拟器】CAN2通道接收成功')
# print("%#x" % rx_vci_can_obj.STRUCT_ARRAY[0].ID, list_to_hex_string(rx_vci_can_obj.STRUCT_ARRAY[0].Data))
time.sleep(0.1)
if CURRENT_STEP == 0xA1 or CURRENT_STEP == 0xA0:
data = [0x02, 0x7E, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00]
for i in range(10):
vci_transmit(canid=0x720 - i + 8, data=data, channel=1)
for i in range(10):
vci_transmit(canid=0x730 - i + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xA2:
for i in range(5):
data = [0x14, 0xF1, 0x90, 0xC1, 0xC2, 0xC3, 0xC4, 0xC5]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
data = [0x21, 0xC6, 0xC7, 0xC8, 0xC9, 0xCA, 0xCB, 0xCC]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
data = [0x22, 0xD, 0xCE, 0xCF, 0xC0, 0xC1, 0x00, 0x00]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
for i in range(5):
data = [0x14, 0xF1, 0x93, 0xC1, 0xC2, 0xC3, 0xC4, 0xC5]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
data = [0x21, 0xC6, 0xC7, 0xC8, 0xC9, 0xCA, 0xCB, 0xCC]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
data = [0x22, 0xD, 0xCE, 0xCF, 0xC0, 0xC1, 0x00, 0x00]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xA3:
for i in range(5):
data = [0x06, 0x50, 0x03, 0x00, 0x32, 0x00, 0xC8, 0xAA]
vci_transmit(canid=can_id - i + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xA4:
for i in range(3):
data = [0x06, 0x67, 0x01, 0xCC, 0xCC, 0xCC, 0xC0 + i, 0x00]
vci_transmit(canid=can_id - i + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xB2:
for i in range(3):
data = [0x02, 0x50, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00]
vci_transmit(canid=can_id - i + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xB3:
for i in range(3):
for j in range(10):
data = [0x06, 0x67, 0x01, 0xCC, 0xCC, 0xCC, 0xC0 + j, 0x00]
vci_transmit(canid=can_id + i + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xB4:
time.sleep(3)
data = [0x04, 0x67, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00]
vci_transmit(canid=can_id + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xB5:
time.sleep(0.1)
data = [0x04, 0x67, 0x01, 0x12, 0x34, 0x00, 0x00, 0x00]
vci_transmit(canid=can_id + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0xB6:
time.sleep(0.1)
data = [0x02, 0x67, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00]
vci_transmit(canid=can_id + 8, data=data, channel=1)
ret = 0
break
elif CURRENT_STEP == 0x01:
data = [0x06, 0x50, 0x03, 0x00, 0x32, 0x01, 0xf4, 0xaa]
ret = vci_transmit(canid=can_id + 8, data=data, channel=1)
if ret == STATUS_OK:
# print('【模拟器】CAN2通道发送成功 STEP0X%#x' % CURRENT_STEP)
time.sleep(0.3)
break
if ret != STATUS_OK:
print('【模拟器】CAN2通道发送失败')
break
elif CURRENT_STEP == 0XFF:
return
else:
print('【模拟器】CURRENT_STEP unknown!%#x' % CURRENT_STEP)
break
if __name__ == "__main__":
CanDLLName = './ControlCAN.dll' # 把DLL放到对应的目录下
canDLL = windll.LoadLibrary('./ControlCAN.dll')
print(CanDLLName)
can_init_and_start()
t1 = threading.Thread(target=send_can1)
t2 = threading.Thread(target=receive_can1)
t3 = threading.Thread(target=receive_can2)
t1.start()
t2.start()
if TestMode == 'true':
t3.start()
t1.join()
t2.join()
if TestMode == 'true':
t3.join()
# 关闭
canDLL.VCI_CloseDevice(VCI_USBCAN2, 0)

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)