mirror of
https://github.com/3minbe/DBC_Converter.git
synced 2026-05-17 01:23:58 +09:00
DBC 파일 로드 기능을 위한 새로운 파일 추가 및 신호 패턴에 RX ECU 이름 필드 추가
This commit is contained in:
parent
4178fd686a
commit
42707cd97e
0
DBC_Converter_RX.py
Normal file
0
DBC_Converter_RX.py
Normal file
0
DBC_Converter_TX.py
Normal file
0
DBC_Converter_TX.py
Normal file
@ -3,6 +3,7 @@ import sys
|
|||||||
import cantools
|
import cantools
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
def parse_dbc_file(file_path):
|
def parse_dbc_file(file_path):
|
||||||
signals = []
|
signals = []
|
||||||
@ -95,12 +96,12 @@ def parse_dbc_file(file_path):
|
|||||||
return signals
|
return signals
|
||||||
|
|
||||||
|
|
||||||
def generate_vcu_rx_function_with_factors(dbc_path, output_file, channel_info):
|
def generate_vcu_rx_function_with_factors(dbc_path, output_file):
|
||||||
# Load the DBC file
|
# Load the DBC file
|
||||||
db = cantools.database.load_file(dbc_path)
|
db = cantools.database.load_file(dbc_path)
|
||||||
|
|
||||||
# Initialize the header of the C file
|
# Initialize the header of the C file
|
||||||
c_file_content = f"""
|
c_file_content = """
|
||||||
#include "can.h"
|
#include "can.h"
|
||||||
|
|
||||||
// Declare Factors and Offsets for signals
|
// Declare Factors and Offsets for signals
|
||||||
@ -130,10 +131,9 @@ def generate_vcu_rx_function_with_factors(dbc_path, output_file, channel_info):
|
|||||||
# Check if "VCU" is in the receivers list
|
# Check if "VCU" is in the receivers list
|
||||||
if "VCU" in message.receivers:
|
if "VCU" in message.receivers:
|
||||||
# Define the temporary struct
|
# Define the temporary struct
|
||||||
channel = channel_info.get(str(message.frame_id), "CH0") # 기본 채널 설정
|
temp_struct_name = f"CH0_MV1_0x{message.frame_id:X}_temp"
|
||||||
temp_struct_name = f"{channel}_MV1_0x{message.frame_id:X}_temp"
|
|
||||||
c_file_content += f"""
|
c_file_content += f"""
|
||||||
void Receive_{message.name}_{channel}_0x{message.frame_id:X}(void)
|
void Receive_{message.name}_CH0_0x{message.frame_id:X}(void)
|
||||||
{{
|
{{
|
||||||
struct {{
|
struct {{
|
||||||
"""
|
"""
|
||||||
@ -153,16 +153,16 @@ void Receive_{message.name}_{channel}_0x{message.frame_id:X}(void)
|
|||||||
start_byte = signal.start // 8
|
start_byte = signal.start // 8
|
||||||
start_bit = signal.start % 8
|
start_bit = signal.start % 8
|
||||||
signal_length = signal.length
|
signal_length = signal.length
|
||||||
shift_expr = f"(CAN_ch[{channel}].rx.buf[{start_byte}] >> shift{start_bit})"
|
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{start_bit})"
|
||||||
if signal_length > 8:
|
if signal_length > 8:
|
||||||
# Handle multi-byte signals
|
# Handle multi-byte signals
|
||||||
multi_byte_expr = []
|
multi_byte_expr = []
|
||||||
for i in range((signal_length + 7) // 8):
|
for i in range((signal_length + 7) // 8):
|
||||||
byte_shift = i * 8
|
byte_shift = i * 8
|
||||||
if i == 0:
|
if i == 0:
|
||||||
multi_byte_expr.append(f"(CAN_ch[{channel}].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
||||||
else:
|
else:
|
||||||
multi_byte_expr.append(f"(CAN_ch[{channel}].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
||||||
shift_expr = " | ".join(multi_byte_expr)
|
shift_expr = " | ".join(multi_byte_expr)
|
||||||
c_file_content += f" {temp_struct_name}.{signal.name}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
c_file_content += f" {temp_struct_name}.{signal.name}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
||||||
|
|
||||||
@ -171,14 +171,14 @@ void Receive_{message.name}_{channel}_0x{message.frame_id:X}(void)
|
|||||||
# Assign to final ECU variables
|
# Assign to final ECU variables
|
||||||
for signal in message.signals:
|
for signal in message.signals:
|
||||||
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
||||||
offset_name = f"Offset_m_{abs(signal.offset)::.0f}"
|
offset_name = f"Offset_m_{abs(signal.offset):.0f}"
|
||||||
factor = f" * {factor_name}" if signal.scale != 1 else ""
|
factor = f" * {factor_name}" if signal.scale != 1 else ""
|
||||||
offset = f" + {offset_name}" if signal.offset != 0 else ""
|
offset = f" + {offset_name}" if signal.offset != 0 else ""
|
||||||
temp_var = f"{temp_struct_name}.{signal.name}_temp"
|
temp_var = f"{temp_struct_name}.{signal.name}_temp"
|
||||||
if signal.is_signed:
|
if signal.is_signed:
|
||||||
c_file_content += f" VCU.RX.{channel}_{message.name}_0x{message.frame_id:X}.{signal.name} = ({temp_var}{factor}){offset};\n"
|
c_file_content += f" ECU3.RX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name} = ({temp_var}{factor}){offset};\n"
|
||||||
else:
|
else:
|
||||||
c_file_content += f" VCU.RX.{channel}_{message.name}_0x{message.frame_id:X}.{signal.name} = {temp_var}{factor}{offset};\n"
|
c_file_content += f" ECU3.RX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name} = {temp_var}{factor}{offset};\n"
|
||||||
|
|
||||||
c_file_content += "}\n"
|
c_file_content += "}\n"
|
||||||
|
|
||||||
@ -188,7 +188,7 @@ void Receive_{message.name}_{channel}_0x{message.frame_id:X}(void)
|
|||||||
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
||||||
|
|
||||||
|
|
||||||
def generate_input_functions(signals, output_file, channel_info):
|
def generate_input_functions(signals, output_file):
|
||||||
if not signals:
|
if not signals:
|
||||||
print("[WARNING] No signals to generate Input functions for.")
|
print("[WARNING] No signals to generate Input functions for.")
|
||||||
return
|
return
|
||||||
@ -196,16 +196,15 @@ def generate_input_functions(signals, output_file, channel_info):
|
|||||||
with open(output_file, 'w') as f:
|
with open(output_file, 'w') as f:
|
||||||
for message in signals:
|
for message in signals:
|
||||||
hex_id = f"0x{int(message['id']):X}"
|
hex_id = f"0x{int(message['id']):X}"
|
||||||
channel = channel_info.get(str(message['id']), "CH0")
|
function_name = f"void Input_Data_Set_{message['name']}_CH0_{hex_id}(void)"
|
||||||
function_name = f"void Input_Data_Set_{message['name']}_{channel}_{hex_id}(void)"
|
|
||||||
f.write(f"{function_name}\n{{\n")
|
f.write(f"{function_name}\n{{\n")
|
||||||
for signal in message["signals"]:
|
for signal in message["signals"]:
|
||||||
f.write(f" GV_{signal['name']} = VCU.RX.{channel}_RX_{message['name']}_{hex_id}.{signal['name']};\n")
|
f.write(f" GV_{signal['name']} = ECU3.RX.CH0_RX_{message['name']}_{hex_id}.{signal['name']};\n")
|
||||||
f.write("}\n\n")
|
f.write("}\n\n")
|
||||||
print(f"[INFO] Input functions written to {output_file}")
|
print(f"[INFO] Input functions written to {output_file}")
|
||||||
|
|
||||||
|
|
||||||
def generate_structs(signals, output_file, channel_info):
|
def generate_structs(signals, output_file):
|
||||||
if not signals:
|
if not signals:
|
||||||
print("[WARNING] No signals to generate structs for.")
|
print("[WARNING] No signals to generate structs for.")
|
||||||
return
|
return
|
||||||
@ -217,19 +216,18 @@ def generate_structs(signals, output_file, channel_info):
|
|||||||
|
|
||||||
for message in signals:
|
for message in signals:
|
||||||
hex_id = f"0x{int(message['id']):X}"
|
hex_id = f"0x{int(message['id']):X}"
|
||||||
channel = channel_info.get(str(message['id']), "CH0")
|
|
||||||
f.write(f"typedef struct\n{{\n")
|
f.write(f"typedef struct\n{{\n")
|
||||||
for signal in message["signals"]:
|
for signal in message["signals"]:
|
||||||
if signal["size"] <= 32:
|
if signal["size"] <= 32:
|
||||||
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
||||||
else:
|
else:
|
||||||
f.write(f" float {signal['name']};\n")
|
f.write(f" float {signal['name']};\n")
|
||||||
f.write(f"}} {channel}_RX_{message['name']}_{hex_id};\n\n")
|
f.write(f"}} CH0_RX_{message['name']}_{hex_id};\n\n")
|
||||||
f.write("#endif // GENERATED_STRUCTS_H\n")
|
f.write("#endif // GENERATED_STRUCTS_H\n")
|
||||||
print(f"[INFO] Structs written to {output_file}")
|
print(f"[INFO] Structs written to {output_file}")
|
||||||
|
|
||||||
|
|
||||||
def generate_globals(signals, output_file, header_file, channel_info):
|
def generate_globals(signals, output_file, header_file):
|
||||||
if not signals:
|
if not signals:
|
||||||
print("[WARNING] No signals to generate globals for.")
|
print("[WARNING] No signals to generate globals for.")
|
||||||
return
|
return
|
||||||
@ -238,7 +236,6 @@ def generate_globals(signals, output_file, header_file, channel_info):
|
|||||||
f.write("#include <generated_globals.h>\n")
|
f.write("#include <generated_globals.h>\n")
|
||||||
|
|
||||||
for message in signals:
|
for message in signals:
|
||||||
channel = channel_info.get(str(message['id']), "CH0")
|
|
||||||
for signal in message["signals"]:
|
for signal in message["signals"]:
|
||||||
if signal["size"] > 32:
|
if signal["size"] > 32:
|
||||||
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
||||||
@ -250,7 +247,6 @@ def generate_globals(signals, output_file, header_file, channel_info):
|
|||||||
f.write("#define GENERATED_GLOBALS_H\n\n")
|
f.write("#define GENERATED_GLOBALS_H\n\n")
|
||||||
f.write("#include <stdint.h>\n\n")
|
f.write("#include <stdint.h>\n\n")
|
||||||
for message in signals:
|
for message in signals:
|
||||||
channel = channel_info.get(str(message['id']), "CH0")
|
|
||||||
for signal in message["signals"]:
|
for signal in message["signals"]:
|
||||||
if signal["size"] > 32:
|
if signal["size"] > 32:
|
||||||
f.write(f"extern float GV_{signal['name']};\n")
|
f.write(f"extern float GV_{signal['name']};\n")
|
||||||
@ -260,17 +256,16 @@ def generate_globals(signals, output_file, header_file, channel_info):
|
|||||||
print(f"[INFO] Globals and extern declarations written to {output_file} and {header_file}")
|
print(f"[INFO] Globals and extern declarations written to {output_file} and {header_file}")
|
||||||
|
|
||||||
|
|
||||||
def generate_initialization(signals, output_file, channel_info):
|
def generate_initialization(signals, output_file):
|
||||||
if not signals:
|
if not signals:
|
||||||
print("[WARNING] No signals to generate initialization for.")
|
print("[WARNING] No signals to generate initialization for.")
|
||||||
return
|
return
|
||||||
|
|
||||||
with open(output_file, 'w') as f:
|
with open(output_file, 'w') as f:
|
||||||
f.write("void VCU_Data_Init(void)\n{\n")
|
f.write("void ECU3_Data_Init(void)\n{\n")
|
||||||
for message in signals:
|
for message in signals:
|
||||||
hex_id = f"0x{int(message['id']):X}"
|
hex_id = f"0x{int(message['id']):X}"
|
||||||
channel = channel_info.get(str(message['id']), "CH0")
|
struct_prefix = f"ECU3.RX.CH0_RX_{message['name']}_{hex_id}"
|
||||||
struct_prefix = f"VCU.RX.{channel}_RX_{message['name']}_{hex_id}"
|
|
||||||
for signal in message["signals"]:
|
for signal in message["signals"]:
|
||||||
if signal["offset"] != 0.0:
|
if signal["offset"] != 0.0:
|
||||||
if signal["size"] <= 32:
|
if signal["size"] <= 32:
|
||||||
@ -290,20 +285,6 @@ def generate_initialization(signals, output_file, channel_info):
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
dbc_file_path = sys.argv[1]
|
dbc_file_path = sys.argv[1]
|
||||||
output_dir = sys.argv[2]
|
output_dir = sys.argv[2]
|
||||||
channel_info_path = sys.argv[3]
|
|
||||||
|
|
||||||
with open(channel_info_path, 'r', encoding='cp1252') as f:
|
|
||||||
try:
|
|
||||||
channel_info = json.load(f)
|
|
||||||
# Convert channel info to int
|
|
||||||
for key in channel_info:
|
|
||||||
channel_info[key] = int(channel_info[key].replace("CH", ""))
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
print(f"[ERROR] Error decoding JSON: {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[ERROR] Error reading channel info file: {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
output_c_file = f"{output_dir}/generated_receive.c"
|
output_c_file = f"{output_dir}/generated_receive.c"
|
||||||
output_input_file = f"{output_dir}/generated_input.c"
|
output_input_file = f"{output_dir}/generated_input.c"
|
||||||
@ -313,8 +294,8 @@ if __name__ == "__main__":
|
|||||||
output_initialization_file = f"{output_dir}/generated_init.c"
|
output_initialization_file = f"{output_dir}/generated_init.c"
|
||||||
|
|
||||||
signals = parse_dbc_file(dbc_file_path)
|
signals = parse_dbc_file(dbc_file_path)
|
||||||
generate_vcu_rx_function_with_factors(dbc_file_path, output_c_file, channel_info)
|
generate_vcu_rx_function_with_factors(dbc_file_path, output_c_file)
|
||||||
generate_input_functions(signals, output_input_file, channel_info)
|
generate_input_functions(signals, output_input_file)
|
||||||
generate_structs(signals, output_structs_file, channel_info)
|
generate_structs(signals, output_structs_file)
|
||||||
generate_globals(signals, output_globals_file, output_globals_header, channel_info)
|
generate_globals(signals, output_globals_file, output_globals_header)
|
||||||
generate_initialization(signals, output_initialization_file, channel_info)
|
generate_initialization(signals, output_initialization_file)
|
||||||
@ -8,7 +8,7 @@ def load_dbc_file(file_path):
|
|||||||
|
|
||||||
# 메시지와 시그널을 추출하는 정규 표현식
|
# 메시지와 시그널을 추출하는 정규 표현식
|
||||||
message_pattern = re.compile(r'BO_\s+(\d+)\s+(\w+)\s*:\s*(\d+)\s+(\w+)')
|
message_pattern = re.compile(r'BO_\s+(\d+)\s+(\w+)\s*:\s*(\d+)\s+(\w+)')
|
||||||
signal_pattern = re.compile(r'SG_\s+(\w+)\s*:\s*(\d+)\|(\d+)@(\d+)\+\s*\(([^,]+),\s*([^)]+)\)')
|
signal_pattern = re.compile(r'SG_\s+(\w+)\s*:\s*(\d+)\|(\d+)@(\d+)\+\s*\(([^,]+),\s*([^)]+)\)\s*\[[^\]]+\]\s*\"[^\"]*\"\s+(\w+)')
|
||||||
|
|
||||||
# 메시지와 시그널 매칭
|
# 메시지와 시그널 매칭
|
||||||
messages = {}
|
messages = {}
|
||||||
@ -43,13 +43,15 @@ def load_dbc_file(file_path):
|
|||||||
byte_order = int(signal_match.group(4))
|
byte_order = int(signal_match.group(4))
|
||||||
factor = float(signal_match.group(5))
|
factor = float(signal_match.group(5))
|
||||||
offset = float(signal_match.group(6))
|
offset = float(signal_match.group(6))
|
||||||
|
rx_ecu_name = signal_match.group(7)
|
||||||
messages[current_message]["Signals"].append({
|
messages[current_message]["Signals"].append({
|
||||||
"Signal name": signal_name,
|
"Signal name": signal_name,
|
||||||
"msb": msb,
|
"msb": msb,
|
||||||
"Length": length,
|
"Length": length,
|
||||||
"Byte order": byte_order,
|
"Byte order": byte_order,
|
||||||
"Factor": factor,
|
"Factor": factor,
|
||||||
"Offset": offset
|
"Offset": offset,
|
||||||
|
"RX ECU name": rx_ecu_name
|
||||||
})
|
})
|
||||||
|
|
||||||
return messages
|
return messages
|
||||||
@ -59,7 +61,8 @@ def load_dbc_file(file_path):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
file_path = sys.argv[1]
|
# file_path = sys.argv[1]
|
||||||
|
file_path = 'C:/Users/MSI/SynologyDrive/3min_be/한자연/!과제/초안전/#Debug/DBC/241007_primary_HyperSafe4.dbc'
|
||||||
messages = load_dbc_file(file_path)
|
messages = load_dbc_file(file_path)
|
||||||
|
|
||||||
if messages:
|
if messages:
|
||||||
|
|||||||
BIN
__pycache__/Data_Parsing.cpython-313.pyc
Normal file
BIN
__pycache__/Data_Parsing.cpython-313.pyc
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user