mirror of
https://github.com/3minbe/DBC_Converter.git
synced 2026-05-17 01:23:58 +09:00
DBC 변환 기능 개선: 공통 출력 디렉토리 추가 및 변환 스크립트 이름 변경
This commit is contained in:
parent
42707cd97e
commit
f372bf5bca
@ -489,15 +489,18 @@ class MainView(QtWidgets.QMainWindow):
|
||||
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
rx_output_dir = os.path.join(base_output_dir, file_name, "RX")
|
||||
tx_output_dir = os.path.join(base_output_dir, file_name, "TX")
|
||||
common_output_dir = os.path.join(base_output_dir, file_name, "Common")
|
||||
dbc_output_dir = os.path.join(base_output_dir, "#DBC") # DBC 파일 저장 경로 설정
|
||||
os.makedirs(rx_output_dir, exist_ok=True)
|
||||
os.makedirs(tx_output_dir, exist_ok=True)
|
||||
os.makedirs(common_output_dir, exist_ok=True)
|
||||
os.makedirs(dbc_output_dir, exist_ok=True) # DBC 파일 저장 경로 생성
|
||||
channel_info = self.settings.get("channel_info", {}).get(os.path.basename(file_path), "CH0")
|
||||
try:
|
||||
shutil.copy(file_path, dbc_output_dir) # DBC 파일 복사
|
||||
subprocess.run(["python", "DBC_to_C_RX.py", file_path, rx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_to_C_TX.py", file_path, tx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_Converter_RX.py", file_path, rx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_Converter_TX.py", file_path, tx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_Converter_Common.py", file_path, common_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
self.updateAlertText(f"변환 성공", [file_path])
|
||||
except subprocess.CalledProcessError as e:
|
||||
self.updateAlertText(f"변환 실패: {e}", [file_path])
|
||||
|
||||
77
DBC_Converter_Common.py
Normal file
77
DBC_Converter_Common.py
Normal file
@ -0,0 +1,77 @@
|
||||
import os
|
||||
import sys
|
||||
from DBC_Converter_Data_Parsing import load_dbc_file
|
||||
|
||||
def generate_structs(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate structs for.")
|
||||
return
|
||||
|
||||
# Ensure the output directory exists
|
||||
output_dir = os.path.dirname(output_file)
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_STRUCTS_H\n")
|
||||
f.write("#define GENERATED_STRUCTS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
|
||||
tx_structs = {}
|
||||
rx_structs = {}
|
||||
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
|
||||
if message_info["TX ECU name"] == "VCU":
|
||||
struct_name = f"{message_name}_{hex_id}"
|
||||
tx_structs[struct_name] = []
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] <= 32:
|
||||
tx_structs[struct_name].append(f" uint32_t {signal['Signal name']} : {signal['Length']};")
|
||||
else:
|
||||
tx_structs[struct_name].append(f" float {signal['Signal name']};")
|
||||
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] == "VCU":
|
||||
struct_name = f"{message_name}_{hex_id}"
|
||||
if struct_name not in rx_structs:
|
||||
rx_structs[struct_name] = []
|
||||
if signal["Length"] <= 32:
|
||||
rx_structs[struct_name].append(f" uint32_t {signal['Signal name']} : {signal['Length']};")
|
||||
else:
|
||||
rx_structs[struct_name].append(f" float {signal['Signal name']};")
|
||||
|
||||
f.write("typedef struct {\n")
|
||||
f.write(" typedef struct {\n")
|
||||
for struct_name, fields in tx_structs.items():
|
||||
f.write(f" typedef struct {{\n")
|
||||
for field in fields:
|
||||
f.write(f" {field}\n")
|
||||
f.write(f" }} {struct_name};\n\n")
|
||||
f.write(" } TX;\n\n")
|
||||
|
||||
f.write(" typedef struct {\n")
|
||||
for struct_name, fields in rx_structs.items():
|
||||
f.write(f" typedef struct {{\n")
|
||||
for field in fields:
|
||||
f.write(f" {field}\n")
|
||||
f.write(f" }} {struct_name};\n\n")
|
||||
f.write(" } RX;\n")
|
||||
f.write("} VCU;\n\n")
|
||||
|
||||
f.write("#endif // GENERATED_STRUCTS_H\n")
|
||||
print(f"[INFO] Structs written to {output_file}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_structs_file = f"{output_dir}/generated_structs.h"
|
||||
|
||||
signals = load_dbc_file(dbc_file_path)
|
||||
if signals is None:
|
||||
print(f"[ERROR] Failed to load DBC file: {dbc_file_path}")
|
||||
sys.exit(1)
|
||||
|
||||
generate_structs(signals, output_structs_file)
|
||||
@ -8,7 +8,7 @@ def load_dbc_file(file_path):
|
||||
|
||||
# 메시지와 시그널을 추출하는 정규 표현식
|
||||
message_pattern = re.compile(r'BO_\s+(\d+)\s+(\w+)\s*:\s*(\d+)\s+(\w+)')
|
||||
signal_pattern = re.compile(r'SG_\s+(\w+)\s*:\s*(\d+)\|(\d+)@(\d+)\+\s*\(([^,]+),\s*([^)]+)\)\s*\[[^\]]+\]\s*\"[^\"]*\"\s+(\w+)')
|
||||
signal_pattern = re.compile(r'SG_\s+(\w+)\s*:\s*(\d+)\|(\d+)@(\d+)([+-])\s*\(([^,]+),\s*([^)]+)\)\s*\[[^\]]+\]\s*\"[^\"]*\"\s+(\w+)')
|
||||
|
||||
# 메시지와 시그널 매칭
|
||||
messages = {}
|
||||
@ -41,14 +41,16 @@ def load_dbc_file(file_path):
|
||||
msb = int(signal_match.group(2))
|
||||
length = int(signal_match.group(3))
|
||||
byte_order = int(signal_match.group(4))
|
||||
factor = float(signal_match.group(5))
|
||||
offset = float(signal_match.group(6))
|
||||
rx_ecu_name = signal_match.group(7)
|
||||
sign = signal_match.group(5)
|
||||
factor = float(signal_match.group(6))
|
||||
offset = float(signal_match.group(7))
|
||||
rx_ecu_name = signal_match.group(8)
|
||||
messages[current_message]["Signals"].append({
|
||||
"Signal name": signal_name,
|
||||
"msb": msb,
|
||||
"Length": length,
|
||||
"Byte order": byte_order,
|
||||
"Sign": sign,
|
||||
"Factor": factor,
|
||||
"Offset": offset,
|
||||
"RX ECU name": rx_ecu_name
|
||||
@ -61,8 +63,7 @@ def load_dbc_file(file_path):
|
||||
return None
|
||||
|
||||
if __name__ == "__main__":
|
||||
# file_path = sys.argv[1]
|
||||
file_path = 'C:/Users/MSI/SynologyDrive/3min_be/한자연/!과제/초안전/#Debug/DBC/241007_primary_HyperSafe4.dbc'
|
||||
file_path = sys.argv[1]
|
||||
messages = load_dbc_file(file_path)
|
||||
|
||||
if messages:
|
||||
@ -0,0 +1,212 @@
|
||||
import sys
|
||||
from DBC_Converter_Data_Parsing import load_dbc_file
|
||||
|
||||
|
||||
#============================== Generate Globals ==============================#
|
||||
def generate_globals(signals, C_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate globals for.")
|
||||
return
|
||||
|
||||
with open(C_file, 'w') as f:
|
||||
f.write("#include <generated_globals.h>\n")
|
||||
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"float GV_{signal['Signal name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['Signal name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"extern float GV_{signal['Signal name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['Signal name']};\n")
|
||||
f.write("\n#endif // GENERATED_GLOBALS_H\n")
|
||||
print(f"[INFO] Globals and extern declarations written to {C_file} and {header_file}")
|
||||
|
||||
|
||||
#============================== Generate VCU RX Function ==============================#
|
||||
def generate_vcu_rx_function_with_factors(signals, output_file):
|
||||
if not signals:
|
||||
print("[ERROR] No signals to generate VCU RX functions for.")
|
||||
return
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
|
||||
// Declare Factors and Offsets for signals
|
||||
"""
|
||||
|
||||
# Collect unique Factors and Offsets
|
||||
factors_offsets = {}
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
factor_name = f"Factor_{str(signal['Factor']).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal['Offset']):.0f}"
|
||||
if signal['Factor'] != 1:
|
||||
factors_offsets[factor_name] = signal['Factor']
|
||||
if signal['Offset'] != 0:
|
||||
factors_offsets[offset_name] = signal['Offset']
|
||||
|
||||
# Add Factor and Offset variable declarations
|
||||
for name, value in factors_offsets.items():
|
||||
c_file_content += f"const float {name} = {value};\n"
|
||||
|
||||
# Add a newline for separation
|
||||
c_file_content += "\n"
|
||||
|
||||
# Iterate through all messages in the signals
|
||||
for message_name, message_info in signals.items():
|
||||
# Define the temporary struct
|
||||
temp_struct_name = f"{message_name}_temp"
|
||||
|
||||
# Check if any signal in the message has RX ECU name as VCU
|
||||
has_vcu_signal = any(signal["RX ECU name"] == "VCU" for signal in message_info["Signals"])
|
||||
if not has_vcu_signal:
|
||||
continue # Skip this message if no signal has RX ECU name as VCU
|
||||
|
||||
c_file_content += f"""
|
||||
void {message_name}_RX_{message_info['ID']}(void)
|
||||
{{
|
||||
struct {{
|
||||
"""
|
||||
|
||||
# Add temporary variables to the struct
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] != "VCU":
|
||||
continue # RX ECU name이 VCU가 아닌 경우 건너뜁니다.
|
||||
if signal["Sign"] == "-":
|
||||
signal_type = "signed int"
|
||||
elif signal["Sign"] == "+":
|
||||
signal_type = "unsigned int"
|
||||
|
||||
c_file_content += f" {signal_type} {signal['Signal name']}_temp : {signal['Length']};\n"
|
||||
|
||||
c_file_content += f" }} {temp_struct_name};\n\n"
|
||||
|
||||
# Add temp assignments
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] != "VCU":
|
||||
continue # RX ECU name이 VCU가 아닌 경우 건너뜁니다.
|
||||
start_byte = signal["msb"] // 8
|
||||
start_bit = signal["msb"] % 8
|
||||
signal_length = signal["Length"]
|
||||
if signal["Byte order"] == 0: # Motorola (Big Endian)
|
||||
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{start_bit})"
|
||||
if signal_length > 8:
|
||||
# Handle multi-byte signals
|
||||
multi_byte_expr = []
|
||||
for i in range((signal_length + 7) // 8):
|
||||
byte_shift = i * 8
|
||||
if i == 0:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
||||
else:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
||||
shift_expr = " | ".join(multi_byte_expr)
|
||||
else: # Intel (Little Endian)
|
||||
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{start_bit})"
|
||||
if signal_length > 8:
|
||||
# Handle multi-byte signals
|
||||
multi_byte_expr = []
|
||||
for i in range((signal_length + 7) // 8):
|
||||
byte_shift = i * 8
|
||||
if i == 0:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
||||
else:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
||||
shift_expr = " | ".join(multi_byte_expr)
|
||||
c_file_content += f" {temp_struct_name}.{signal['Signal name']}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
||||
|
||||
c_file_content += "\n"
|
||||
|
||||
# Assign to final ECU variables
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] != "VCU":
|
||||
continue # RX ECU name이 VCU가 아닌 경우 건너뜁니다.
|
||||
factor_name = f"Factor_{str(signal['Factor']).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal['Offset']):.0f}"
|
||||
factor = f" * {factor_name}" if signal['Factor'] != 1 else ""
|
||||
offset = f" + {offset_name}" if signal['Offset'] != 0 else ""
|
||||
temp_var = f"{temp_struct_name}.{signal['Signal name']}_temp"
|
||||
if signal["Byte order"] == 1:
|
||||
c_file_content += f" VCU.RX.{message_name}_{message_info['ID']}.{signal['Signal name']} = ({temp_var}{factor}){offset};\n"
|
||||
else:
|
||||
c_file_content += f" VCU.RX.{message_name}_{message_info['ID']}.{signal['Signal name']} = {temp_var}{factor}{offset};\n"
|
||||
|
||||
c_file_content += "}\n"
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
||||
|
||||
|
||||
#============================== Generate Input Function ==============================#
|
||||
def generate_input_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate Input functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
function_name = f"void Input_Data_Set_{message_name}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message_info["Signals"]:
|
||||
f.write(f" GV_{signal['Signal name']} = VCU.RX.CH0_RX_{message_name}_{hex_id}.{signal['Signal name']};\n")
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] Input functions written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate Initialization ==============================#
|
||||
def generate_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate initialization for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void VCU_Data_Init(void)\n{\n")
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
struct_prefix = f"VCU.RX.CH0_RX_{message_name}_{hex_id}"
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Offset"] != 0.0:
|
||||
if signal["Length"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = {signal['Offset']};\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = {signal['Offset']}f;\n")
|
||||
else:
|
||||
if signal["Length"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = 0;\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = 0.0f;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] Initialization function written to {output_file}")
|
||||
|
||||
|
||||
#============================== Main ==============================#
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_globals_C_file = f"{output_dir}/generated_globals.c"
|
||||
output_globals_header_file = f"{output_dir}/generated_globals.h"
|
||||
output_c_file = f"{output_dir}/generated_receive.c"
|
||||
output_input_file = f"{output_dir}/generated_input.c"
|
||||
output_initialization_file = f"{output_dir}/generated_init.c"
|
||||
|
||||
signals = load_dbc_file(dbc_file_path)
|
||||
|
||||
generate_globals(signals, output_globals_C_file, output_globals_header_file)
|
||||
generate_vcu_rx_function_with_factors(signals, output_c_file)
|
||||
generate_input_functions(signals, output_input_file)
|
||||
generate_initialization(signals, output_initialization_file)
|
||||
@ -0,0 +1,186 @@
|
||||
import sys
|
||||
from DBC_Converter_Data_Parsing import load_dbc_file
|
||||
|
||||
|
||||
|
||||
#============================== Generate TX Globals ==============================#
|
||||
def generate_tx_globals(signals, output_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX globals for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"float GV_{signal['Signal name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['Signal name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_TX_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_TX_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"extern float GV_{signal['Signal name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['Signal name']};\n")
|
||||
f.write("\n#endif // GENERATED_TX_GLOBALS_H\n")
|
||||
print(f"[INFO] TX globals written to {output_file} and {header_file}")
|
||||
|
||||
|
||||
#============================== Generate TX Functions ==============================#
|
||||
def generate_tx_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
function_name = f"void Output_Data_Set_{message_name}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message_info["Signals"]:
|
||||
factor_str = f"/ {signal['Factor']}" if signal["Factor"] != 1.0 else ""
|
||||
offset_str = f"- {signal['Offset']}" if signal["Offset"] != 0.0 else ""
|
||||
mask = f"_{signal['Length']}bit"
|
||||
f.write(
|
||||
f" VCU.TX.CH0_{message_name}_{hex_id}.{signal['Signal name']} = "
|
||||
f"(int)((GV_{signal['Signal name']} {offset_str}) {factor_str}) & {mask};\n"
|
||||
)
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] TX functions written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate TX Initialization ==============================#
|
||||
def generate_tx_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No TX signals found for initialization generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void Initialize_TX_Signals(void)\n{\n")
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
f.write(f" // {message_name} ({hex_id})\n")
|
||||
for signal in message_info["Signals"]:
|
||||
f.write(f" VCU.TX.CH0_{message_name}_{hex_id}.{signal['Signal name']} = 0;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] TX initialization function written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate TX Enum ==============================#
|
||||
def generate_tx_enum(signals, output_file, cycle):
|
||||
if not signals:
|
||||
print("[WARNING] No TX messages found for enum generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("typedef enum {\n")
|
||||
for idx, message_name in enumerate(signals.keys()):
|
||||
enum_name = f"VCU_CH0_TX_{message_name}_{cycle}"
|
||||
f.write(f" {enum_name} = {idx},\n")
|
||||
f.write(" NUMBER_OF_VCU_CH0_TX_MESSAGE,\n")
|
||||
f.write("} VCU_CH0_TX;\n")
|
||||
print(f"[INFO] TX enum written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate TX C File ==============================#
|
||||
def generate_vcu_can_transmit_single_c_file(signals, output_file):
|
||||
if not signals:
|
||||
print("[ERROR] No signals to generate VCU CAN transmit functions for.")
|
||||
return
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
"""
|
||||
|
||||
# Iterate through all messages in the signals
|
||||
for message_name, message_info in signals.items():
|
||||
# Check if any signal in the message has RX ECU name as VCU
|
||||
has_vcu_signal = any(signal["RX ECU name"] == "VCU" for signal in message_info["Signals"])
|
||||
if not has_vcu_signal:
|
||||
continue # Skip this message if no signal has RX ECU name as VCU
|
||||
|
||||
# Add the function definition for the message
|
||||
c_file_content += f"""
|
||||
void Transmit_{message_name}_CH0_{message_info['ID']}(void)
|
||||
{{
|
||||
"""
|
||||
|
||||
# Iterate through signals and generate bit-packing logic
|
||||
buffer_assignments = [""] * message_info["DLC"]
|
||||
for signal in message_info["Signals"]:
|
||||
start_byte = signal["msb"] // 8
|
||||
start_bit = signal["msb"] % 8
|
||||
signal_length = signal["Length"]
|
||||
signal_name = f"VCU.TX.CH0_{message_name}_{message_info['ID']}.{signal['Signal name']}"
|
||||
|
||||
# Handle 8-bit chunks
|
||||
while signal_length > 0:
|
||||
bits_in_byte = min(8 - start_bit, signal_length)
|
||||
shift_amount = (signal["Length"] - signal_length)
|
||||
shift_expr = f"({signal_name} >> shift{shift_amount}) << shift{start_bit}" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})"
|
||||
buffer_index = start_byte
|
||||
|
||||
# Ensure buffer_index is within the range of buffer_assignments
|
||||
if buffer_index >= len(buffer_assignments):
|
||||
print(f"[ERROR] Buffer index {buffer_index} out of range for message {message_name}")
|
||||
break
|
||||
|
||||
# Add to the buffer assignments
|
||||
if buffer_assignments[buffer_index]:
|
||||
buffer_assignments[buffer_index] += f"\n | {shift_expr}"
|
||||
else:
|
||||
buffer_assignments[buffer_index] = f"{shift_expr}"
|
||||
|
||||
# Update pointers
|
||||
signal_length -= bits_in_byte
|
||||
start_byte += 1
|
||||
start_bit = 0
|
||||
|
||||
# Write buffer assignments to the function
|
||||
for i, assignment in enumerate(buffer_assignments):
|
||||
if assignment:
|
||||
c_file_content += f" CAN_ch[0].tx.buf[{i}] = ({assignment}) & _8bit;\n"
|
||||
|
||||
# Add the send function call
|
||||
c_file_content += f"""
|
||||
can_send_config(CAN_INST_0, g_messageObjectConf_VCU_0ch_TX[VCU_CH0_TX_{message_name}_10ms]);
|
||||
}}
|
||||
"""
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated C file with all VCU messages: {output_file}")
|
||||
|
||||
|
||||
#============================== Main ==============================#
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_structs_file = f"{output_dir}/generated_tx_structs.h"
|
||||
output_globals_file = f"{output_dir}/generated_tx_globals.c"
|
||||
output_globals_header = f"{output_dir}/generated_tx_globals.h"
|
||||
output_tx_functions_file = f"{output_dir}/generated_tx_functions.c"
|
||||
output_tx_initialization = f"{output_dir}/generated_tx_initialization.c"
|
||||
output_enum_file = f"{output_dir}/generated_tx_enum.h"
|
||||
output_c_file = f"{output_dir}/Transmit_All_VCU_Messages.c" # Replace with your desired output file name
|
||||
cycle_time = "10ms"
|
||||
|
||||
signals = load_dbc_file(dbc_file_path)
|
||||
if signals is None:
|
||||
print(f"[ERROR] Failed to load DBC file: {dbc_file_path}")
|
||||
sys.exit(1)
|
||||
|
||||
generate_tx_globals(signals, output_globals_file, output_globals_header)
|
||||
generate_tx_functions(signals, output_tx_functions_file)
|
||||
generate_tx_initialization(signals, output_tx_initialization)
|
||||
generate_tx_enum(signals, output_enum_file, cycle_time)
|
||||
generate_vcu_can_transmit_single_c_file(signals, output_c_file)
|
||||
301
DBC_to_C_RX.py
301
DBC_to_C_RX.py
@ -1,301 +0,0 @@
|
||||
import re
|
||||
import sys
|
||||
import cantools
|
||||
from datetime import datetime
|
||||
import json
|
||||
import os
|
||||
|
||||
def parse_dbc_file(file_path):
|
||||
signals = []
|
||||
current_message = None
|
||||
try:
|
||||
with open(file_path, 'r', encoding='cp1252') as f:
|
||||
lines = f.readlines()
|
||||
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
||||
|
||||
start_parsing = False
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if not start_parsing:
|
||||
if line.startswith("BO_"):
|
||||
start_parsing = True
|
||||
else:
|
||||
continue
|
||||
|
||||
if line.startswith("BO_"):
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
||||
continue
|
||||
|
||||
current_message = {
|
||||
"id": parts[1],
|
||||
"name": parts[2].replace(":", ""),
|
||||
"dlc": parts[3],
|
||||
"transmitter": parts[4],
|
||||
"signals": []
|
||||
}
|
||||
print(f"[INFO] Found message: {current_message['name']}")
|
||||
|
||||
elif line.startswith("SG_") and current_message:
|
||||
try:
|
||||
parts = line.split(":")
|
||||
if len(parts) < 2:
|
||||
print(f"[WARNING] Skipping malformed SG_ line: {line}")
|
||||
continue
|
||||
|
||||
signal_name = parts[0].split()[1].strip()
|
||||
bit_info = parts[1].split()[0]
|
||||
receiver = parts[1].split()[-1]
|
||||
|
||||
if receiver != "VCU":
|
||||
continue
|
||||
|
||||
byte_offset, rest = bit_info.split("|")
|
||||
bit_offset = int(rest.split("@")[0])
|
||||
size = int(re.search(r'\d+', rest.split("@")[0]).group())
|
||||
factor_offset_match = re.search(r'\(([^)]+)\)', line)
|
||||
if not factor_offset_match:
|
||||
print(f"[WARNING] Skipping signal with missing factor/offset: {line}")
|
||||
continue
|
||||
|
||||
factor_offset = factor_offset_match.group(1).split(",")
|
||||
factor = float(factor_offset[0])
|
||||
offset = float(factor_offset[1])
|
||||
|
||||
current_message["signals"].append({
|
||||
"name": signal_name,
|
||||
"byte_offset": int(byte_offset) // 8,
|
||||
"bit_offset": bit_offset,
|
||||
"size": size,
|
||||
"factor": factor,
|
||||
"offset": offset,
|
||||
"signed": '-' in rest.split("@")[1]
|
||||
})
|
||||
print(f"[INFO] Added signal: {signal_name}")
|
||||
except (ValueError, IndexError, AttributeError) as e:
|
||||
print(f"[ERROR] Error parsing signal: {line}, {e}")
|
||||
continue
|
||||
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error while parsing DBC file: {e}")
|
||||
return []
|
||||
|
||||
print(f"[INFO] Parsed {len(signals)} messages.")
|
||||
return signals
|
||||
|
||||
|
||||
def generate_vcu_rx_function_with_factors(dbc_path, output_file):
|
||||
# Load the DBC file
|
||||
db = cantools.database.load_file(dbc_path)
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
|
||||
// Declare Factors and Offsets for signals
|
||||
"""
|
||||
|
||||
# Collect unique Factors and Offsets
|
||||
factors_offsets = {}
|
||||
for message in db.messages:
|
||||
if "VCU" in message.receivers:
|
||||
for signal in message.signals:
|
||||
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal.offset):.0f}"
|
||||
if signal.scale != 1:
|
||||
factors_offsets[factor_name] = signal.scale
|
||||
if signal.offset != 0:
|
||||
factors_offsets[offset_name] = signal.offset
|
||||
|
||||
# Add Factor and Offset variable declarations
|
||||
for name, value in factors_offsets.items():
|
||||
c_file_content += f"const float {name} = {value};\n"
|
||||
|
||||
# Add a newline for separation
|
||||
c_file_content += "\n"
|
||||
|
||||
# Iterate through all messages in the DBC file
|
||||
for message in db.messages:
|
||||
# Check if "VCU" is in the receivers list
|
||||
if "VCU" in message.receivers:
|
||||
# Define the temporary struct
|
||||
temp_struct_name = f"CH0_MV1_0x{message.frame_id:X}_temp"
|
||||
c_file_content += f"""
|
||||
void Receive_{message.name}_CH0_0x{message.frame_id:X}(void)
|
||||
{{
|
||||
struct {{
|
||||
"""
|
||||
|
||||
# Add temporary variables to the struct
|
||||
for signal in message.signals:
|
||||
if signal.is_signed:
|
||||
signal_type = "signed int"
|
||||
else:
|
||||
signal_type = "unsigned int"
|
||||
c_file_content += f" {signal_type} {signal.name}_temp : {signal.length};\n"
|
||||
|
||||
c_file_content += f" }} {temp_struct_name};\n\n"
|
||||
|
||||
# Add temp assignments
|
||||
for signal in message.signals:
|
||||
start_byte = signal.start // 8
|
||||
start_bit = signal.start % 8
|
||||
signal_length = signal.length
|
||||
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{start_bit})"
|
||||
if signal_length > 8:
|
||||
# Handle multi-byte signals
|
||||
multi_byte_expr = []
|
||||
for i in range((signal_length + 7) // 8):
|
||||
byte_shift = i * 8
|
||||
if i == 0:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
||||
else:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
||||
shift_expr = " | ".join(multi_byte_expr)
|
||||
c_file_content += f" {temp_struct_name}.{signal.name}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
||||
|
||||
c_file_content += "\n"
|
||||
|
||||
# Assign to final ECU variables
|
||||
for signal in message.signals:
|
||||
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal.offset):.0f}"
|
||||
factor = f" * {factor_name}" if signal.scale != 1 else ""
|
||||
offset = f" + {offset_name}" if signal.offset != 0 else ""
|
||||
temp_var = f"{temp_struct_name}.{signal.name}_temp"
|
||||
if signal.is_signed:
|
||||
c_file_content += f" ECU3.RX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name} = ({temp_var}{factor}){offset};\n"
|
||||
else:
|
||||
c_file_content += f" ECU3.RX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name} = {temp_var}{factor}{offset};\n"
|
||||
|
||||
c_file_content += "}\n"
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
||||
|
||||
|
||||
def generate_input_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate Input functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
function_name = f"void Input_Data_Set_{message['name']}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
f.write(f" GV_{signal['name']} = ECU3.RX.CH0_RX_{message['name']}_{hex_id}.{signal['name']};\n")
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] Input functions written to {output_file}")
|
||||
|
||||
|
||||
def generate_structs(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate structs for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_STRUCTS_H\n")
|
||||
f.write("#define GENERATED_STRUCTS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
f.write(f"typedef struct\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
||||
else:
|
||||
f.write(f" float {signal['name']};\n")
|
||||
f.write(f"}} CH0_RX_{message['name']}_{hex_id};\n\n")
|
||||
f.write("#endif // GENERATED_STRUCTS_H\n")
|
||||
print(f"[INFO] Structs written to {output_file}")
|
||||
|
||||
|
||||
def generate_globals(signals, output_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate globals for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#include <generated_globals.h>\n")
|
||||
|
||||
for message in signals:
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message in signals:
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"extern float GV_{signal['name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['name']};\n")
|
||||
f.write("\n#endif // GENERATED_GLOBALS_H\n")
|
||||
print(f"[INFO] Globals and extern declarations written to {output_file} and {header_file}")
|
||||
|
||||
|
||||
def generate_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate initialization for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void ECU3_Data_Init(void)\n{\n")
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
struct_prefix = f"ECU3.RX.CH0_RX_{message['name']}_{hex_id}"
|
||||
for signal in message["signals"]:
|
||||
if signal["offset"] != 0.0:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = {signal['offset']};\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = {signal['offset']}f;\n")
|
||||
else:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = 0;\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = 0.0f;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] Initialization function written to {output_file}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_c_file = f"{output_dir}/generated_receive.c"
|
||||
output_input_file = f"{output_dir}/generated_input.c"
|
||||
output_structs_file = f"{output_dir}/generated_structs.h"
|
||||
output_globals_file = f"{output_dir}/generated_globals.c"
|
||||
output_globals_header = f"{output_dir}/generated_globals.h"
|
||||
output_initialization_file = f"{output_dir}/generated_init.c"
|
||||
|
||||
signals = parse_dbc_file(dbc_file_path)
|
||||
generate_vcu_rx_function_with_factors(dbc_file_path, output_c_file)
|
||||
generate_input_functions(signals, output_input_file)
|
||||
generate_structs(signals, output_structs_file)
|
||||
generate_globals(signals, output_globals_file, output_globals_header)
|
||||
generate_initialization(signals, output_initialization_file)
|
||||
323
DBC_to_C_TX.py
323
DBC_to_C_TX.py
@ -1,323 +0,0 @@
|
||||
import re
|
||||
import sys
|
||||
import cantools
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
def parse_dbc_file(file_path):
|
||||
signals = []
|
||||
current_message = None
|
||||
try:
|
||||
with open(file_path, 'r', encoding='cp1252') as f:
|
||||
lines = f.readlines()
|
||||
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
||||
|
||||
start_parsing = False
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if not start_parsing:
|
||||
if line.startswith("BO_"):
|
||||
start_parsing = True
|
||||
else:
|
||||
continue
|
||||
|
||||
if line.startswith("BO_"):
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
||||
continue
|
||||
|
||||
current_message = {
|
||||
"id": parts[1],
|
||||
"name": parts[2].replace(":", ""),
|
||||
"dlc": parts[3],
|
||||
"transmitter": parts[4],
|
||||
"signals": []
|
||||
}
|
||||
print(f"[INFO] Found message: {current_message['name']}")
|
||||
|
||||
elif line.startswith("SG_") and current_message:
|
||||
try:
|
||||
parts = line.split(":")
|
||||
if len(parts) < 2:
|
||||
print(f"[WARNING] Skipping malformed SG_ line: {line}")
|
||||
continue
|
||||
|
||||
signal_name = parts[0].split()[1].strip()
|
||||
bit_info = parts[1].split()[0]
|
||||
receiver = parts[1].split()[-1]
|
||||
|
||||
if current_message["transmitter"] != "VCU":
|
||||
continue
|
||||
|
||||
byte_offset, rest = bit_info.split("|")
|
||||
bit_offset = int(rest.split("@")[0])
|
||||
size = int(re.search(r'\d+', rest.split("@")[0]).group())
|
||||
factor_offset_match = re.search(r'\(([^)]+)\)', line)
|
||||
if not factor_offset_match:
|
||||
print(f"[WARNING] Skipping signal with missing factor/offset: {line}")
|
||||
continue
|
||||
|
||||
factor_offset = factor_offset_match.group(1).split(",")
|
||||
factor = float(factor_offset[0])
|
||||
offset = float(factor_offset[1])
|
||||
|
||||
current_message["signals"].append({
|
||||
"name": signal_name,
|
||||
"byte_offset": int(byte_offset) // 8,
|
||||
"bit_offset": bit_offset,
|
||||
"size": size,
|
||||
"factor": factor,
|
||||
"offset": offset,
|
||||
"signed": '-' in rest.split("@")[1]
|
||||
})
|
||||
print(f"[INFO] Added signal: {signal_name}")
|
||||
except (ValueError, IndexError, AttributeError) as e:
|
||||
print(f"[ERROR] Error parsing signal: {line}, {e}")
|
||||
continue
|
||||
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error while parsing DBC file: {e}")
|
||||
return []
|
||||
|
||||
print(f"[INFO] Parsed {len(signals)} messages.")
|
||||
return signals
|
||||
|
||||
|
||||
def generate_tx_structs(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX structs for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_TX_STRUCTS_H\n")
|
||||
f.write("#define GENERATED_TX_STRUCTS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
f.write(f"typedef struct\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
||||
else:
|
||||
f.write(f" float {signal['name']};\n")
|
||||
f.write(f"}} CH0_{message['name']}_{hex_id};\n\n")
|
||||
f.write("#endif // GENERATED_TX_STRUCTS_H\n")
|
||||
print(f"[INFO] TX structs written to {output_file}")
|
||||
|
||||
|
||||
def generate_tx_globals(signals, output_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX globals for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message in signals:
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_TX_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_TX_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message in signals:
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"extern float GV_{signal['name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['name']};\n")
|
||||
f.write("\n#endif // GENERATED_TX_GLOBALS_H\n")
|
||||
print(f"[INFO] TX globals written to {output_file} and {header_file}")
|
||||
|
||||
|
||||
def generate_tx_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
function_name = f"void Output_Data_Set_{message['name']}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
factor_str = f"/ {signal['factor']}" if signal["factor"] != 1.0 else ""
|
||||
offset_str = f"- {signal['offset']}" if signal["offset"] != 0.0 else ""
|
||||
mask = f"_{signal['size']}bit"
|
||||
f.write(
|
||||
f" VCU.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = "
|
||||
f"(int)((GV_{signal['name']} {offset_str}) {factor_str}) & {mask};\n"
|
||||
)
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] TX functions written to {output_file}")
|
||||
|
||||
def generate_tx_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No TX signals found for initialization generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void Initialize_TX_Signals(void)\n{\n")
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
f.write(f" // {message['name']} ({hex_id})\n")
|
||||
for signal in message["signals"]:
|
||||
f.write(f" VCU.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = 0;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] TX initialization function written to {output_file}")
|
||||
|
||||
def parse_dbc_for_tx_messages(file_path):
|
||||
tx_messages = []
|
||||
try:
|
||||
with open(file_path, 'r', encoding='cp1252') as f:
|
||||
lines = f.readlines()
|
||||
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith("BO_"):
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
||||
continue
|
||||
|
||||
message_id = int(parts[1]) # ID for sorting
|
||||
message_name = parts[2].replace(":", "")
|
||||
transmitter = parts[4]
|
||||
|
||||
# Only include messages transmitted by VCU
|
||||
if transmitter == "VCU":
|
||||
tx_messages.append((message_id, message_name))
|
||||
print(f"[INFO] Found TX message: {message_name} (ID: {message_id})")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error while parsing DBC file: {e}")
|
||||
|
||||
# Sort messages by message name or ID
|
||||
tx_messages.sort(key=lambda x: x[0]) # Sort by message ID
|
||||
return [message[1] for message in tx_messages]
|
||||
|
||||
|
||||
def generate_tx_enum(tx_messages, output_file, cycle):
|
||||
if not tx_messages:
|
||||
print("[WARNING] No TX messages found for enum generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("typedef enum {\n")
|
||||
for idx, message in enumerate(tx_messages):
|
||||
enum_name = f"VCU_CH0_TX_{message}_{cycle}"
|
||||
f.write(f" {enum_name} = {idx},\n")
|
||||
f.write(" NUMBER_OF_VCU_CH0_TX_MESSAGE,\n")
|
||||
f.write("} VCU_CH0_TX;\n")
|
||||
print(f"[INFO] TX enum written to {output_file}")
|
||||
|
||||
def generate_vcu_can_transmit_single_c_file(dbc_path, output_file):
|
||||
# Load the DBC file
|
||||
db = cantools.database.load_file(dbc_path)
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
"""
|
||||
|
||||
# Iterate through all messages in the DBC file
|
||||
for message in db.messages:
|
||||
# Check if the message's sending node is "VCU"
|
||||
if "VCU" in message.senders:
|
||||
# Add the function definition for the message
|
||||
c_file_content += f"""
|
||||
void Transmit_{message.name}_CH0_0x{message.frame_id:X}(void)
|
||||
{{
|
||||
"""
|
||||
|
||||
# Iterate through signals and generate bit-packing logic
|
||||
buffer_assignments = [""] * message.length
|
||||
for signal in message.signals:
|
||||
start_byte = signal.start // 8
|
||||
start_bit = signal.start % 8
|
||||
signal_length = signal.length
|
||||
signal_name = f"VCU.TX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name}"
|
||||
|
||||
# Handle 8-bit chunks
|
||||
while signal_length > 0:
|
||||
bits_in_byte = min(8 - start_bit, signal_length)
|
||||
shift_amount = (signal.length - signal_length)
|
||||
shift_expr = f"({signal_name} >> shift{shift_amount}) << shift{start_bit}" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})"
|
||||
buffer_index = start_byte
|
||||
|
||||
# Ensure buffer_index is within the range of buffer_assignments
|
||||
if buffer_index >= len(buffer_assignments):
|
||||
print(f"[ERROR] Buffer index {buffer_index} out of range for message {message.name}")
|
||||
break
|
||||
|
||||
# Add to the buffer assignments
|
||||
if buffer_assignments[buffer_index]:
|
||||
buffer_assignments[buffer_index] += f"\n | {shift_expr}"
|
||||
else:
|
||||
buffer_assignments[buffer_index] = f"{shift_expr}"
|
||||
|
||||
# Update pointers
|
||||
signal_length -= bits_in_byte
|
||||
start_byte += 1
|
||||
start_bit = 0
|
||||
|
||||
# Write buffer assignments to the function
|
||||
for i, assignment in enumerate(buffer_assignments):
|
||||
if assignment:
|
||||
c_file_content += f" CAN_ch[0].tx.buf[{i}] = ({assignment}) & _8bit;\n"
|
||||
|
||||
# Add the send function call
|
||||
c_file_content += f"""
|
||||
can_send_config(CAN_INST_0, g_messageObjectConf_VCU_0ch_TX[VCU_CH0_TX_{message.name}_10ms]);
|
||||
}}
|
||||
"""
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated C file with all VCU messages: {output_file}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_structs_file = f"{output_dir}/generated_tx_structs.h"
|
||||
output_globals_file = f"{output_dir}/generated_tx_globals.c"
|
||||
output_globals_header = f"{output_dir}/generated_tx_globals.h"
|
||||
output_tx_functions_file = f"{output_dir}/generated_tx_functions.c"
|
||||
output_tx_initialization = f"{output_dir}/generated_tx_initialization.c"
|
||||
output_enum_file = f"{output_dir}/generated_tx_enum.h"
|
||||
cycle_time = "10ms"
|
||||
output_c_file = f"{output_dir}/Transmit_All_VCU_Messages.c" # Replace with your desired output file name
|
||||
|
||||
generate_vcu_can_transmit_single_c_file(dbc_file_path, output_c_file)
|
||||
|
||||
|
||||
signals = parse_dbc_file(dbc_file_path)
|
||||
generate_tx_structs(signals, output_structs_file)
|
||||
generate_tx_globals(signals, output_globals_file, output_globals_header)
|
||||
generate_tx_functions(signals, output_tx_functions_file)
|
||||
generate_tx_initialization(signals, output_tx_initialization)
|
||||
tx_messages = parse_dbc_for_tx_messages(dbc_file_path)
|
||||
generate_tx_enum(tx_messages, output_enum_file, cycle_time)
|
||||
BIN
__pycache__/DBC_Converter_Data_Parsing.cpython-313.pyc
Normal file
BIN
__pycache__/DBC_Converter_Data_Parsing.cpython-313.pyc
Normal file
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue
Block a user