DBC_Converter/DBC_to_C_TX.py

315 lines
12 KiB
Python

import re
import sys
import cantools
def parse_dbc_file(file_path):
signals = []
current_message = None
try:
with open(file_path, 'r', encoding='cp1252') as f:
lines = f.readlines()
print(f"[INFO] Read {len(lines)} lines from DBC file.")
start_parsing = False
for line in lines:
line = line.strip()
if not line:
continue
if not start_parsing:
if line.startswith("BO_"):
start_parsing = True
else:
continue
if line.startswith("BO_"):
if current_message and current_message["signals"]:
signals.append(current_message)
print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.")
parts = line.split()
if len(parts) < 5:
print(f"[WARNING] Skipping malformed BO_ line: {line}")
continue
current_message = {
"id": parts[1],
"name": parts[2].replace(":", ""),
"dlc": parts[3],
"transmitter": parts[4],
"signals": []
}
print(f"[INFO] Found message: {current_message['name']}")
elif line.startswith("SG_") and current_message:
try:
parts = line.split(":")
if len(parts) < 2:
print(f"[WARNING] Skipping malformed SG_ line: {line}")
continue
signal_name = parts[0].split()[1].strip()
bit_info = parts[1].split()[0]
receiver = parts[1].split()[-1]
if current_message["transmitter"] != "VCU":
continue
byte_offset, rest = bit_info.split("|")
bit_offset = int(rest.split("@")[0])
size = int(re.search(r'\d+', rest.split("@")[0]).group())
factor_offset_match = re.search(r'\(([^)]+)\)', line)
if not factor_offset_match:
print(f"[WARNING] Skipping signal with missing factor/offset: {line}")
continue
factor_offset = factor_offset_match.group(1).split(",")
factor = float(factor_offset[0])
offset = float(factor_offset[1])
current_message["signals"].append({
"name": signal_name,
"byte_offset": int(byte_offset) // 8,
"bit_offset": bit_offset,
"size": size,
"factor": factor,
"offset": offset,
"signed": '-' in rest.split("@")[1]
})
print(f"[INFO] Added signal: {signal_name}")
except (ValueError, IndexError, AttributeError) as e:
print(f"[ERROR] Error parsing signal: {line}, {e}")
continue
if current_message and current_message["signals"]:
signals.append(current_message)
print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.")
except Exception as e:
print(f"[ERROR] Error while parsing DBC file: {e}")
return []
print(f"[INFO] Parsed {len(signals)} messages.")
return signals
def generate_tx_structs(signals, output_file):
if not signals:
print("[WARNING] No signals to generate TX structs for.")
return
with open(output_file, 'w') as f:
f.write("#ifndef GENERATED_TX_STRUCTS_H\n")
f.write("#define GENERATED_TX_STRUCTS_H\n\n")
f.write("#include <stdint.h>\n\n")
for message in signals:
hex_id = f"0x{int(message['id']):X}"
f.write(f"typedef struct\n{{\n")
for signal in message["signals"]:
if signal["size"] <= 32:
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
else:
f.write(f" float {signal['name']};\n")
f.write(f"}} CH0_{message['name']}_{hex_id};\n\n")
f.write("#endif // GENERATED_TX_STRUCTS_H\n")
print(f"[INFO] TX structs written to {output_file}")
def generate_tx_globals(signals, output_file, header_file):
if not signals:
print("[WARNING] No signals to generate TX globals for.")
return
with open(output_file, 'w') as f:
for message in signals:
for signal in message["signals"]:
if signal["size"] > 32:
f.write(f"float GV_{signal['name']} = 0.0f;\n")
else:
f.write(f"uint32_t GV_{signal['name']} = 0;\n")
with open(header_file, 'w') as f:
f.write("#ifndef GENERATED_TX_GLOBALS_H\n")
f.write("#define GENERATED_TX_GLOBALS_H\n\n")
f.write("#include <stdint.h>\n\n")
for message in signals:
for signal in message["signals"]:
if signal["size"] > 32:
f.write(f"extern float GV_{signal['name']};\n")
else:
f.write(f"extern uint32_t GV_{signal['name']};\n")
f.write("\n#endif // GENERATED_TX_GLOBALS_H\n")
print(f"[INFO] TX globals written to {output_file} and {header_file}")
def generate_tx_functions(signals, output_file):
if not signals:
print("[WARNING] No signals to generate TX functions for.")
return
with open(output_file, 'w') as f:
for message in signals:
hex_id = f"0x{int(message['id']):X}"
function_name = f"void Output_Data_Set_{message['name']}_CH0_{hex_id}(void)"
f.write(f"{function_name}\n{{\n")
for signal in message["signals"]:
factor_str = f"/ {signal['factor']}" if signal["factor"] != 1.0 else ""
offset_str = f"- {signal['offset']}" if signal["offset"] != 0.0 else ""
mask = f"_{signal['size']}bit"
f.write(
f" ECU3.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = "
f"(int)((GV_{signal['name']} {offset_str}) {factor_str}) & {mask};\n"
)
f.write("}\n\n")
print(f"[INFO] TX functions written to {output_file}")
def generate_tx_initialization(signals, output_file):
if not signals:
print("[WARNING] No TX signals found for initialization generation.")
return
with open(output_file, 'w') as f:
f.write("void Initialize_TX_Signals(void)\n{\n")
for message in signals:
hex_id = f"0x{int(message['id']):X}"
f.write(f" // {message['name']} ({hex_id})\n")
for signal in message["signals"]:
f.write(f" ECU3.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = 0;\n")
f.write("\n")
f.write("}\n")
print(f"[INFO] TX initialization function written to {output_file}")
def parse_dbc_for_tx_messages(file_path):
tx_messages = []
try:
with open(file_path, 'r', encoding='cp1252') as f:
lines = f.readlines()
print(f"[INFO] Read {len(lines)} lines from DBC file.")
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith("BO_"):
parts = line.split()
if len(parts) < 5:
print(f"[WARNING] Skipping malformed BO_ line: {line}")
continue
message_id = int(parts[1]) # ID for sorting
message_name = parts[2].replace(":", "")
transmitter = parts[4]
# Only include messages transmitted by VCU
if transmitter == "VCU":
tx_messages.append((message_id, message_name))
print(f"[INFO] Found TX message: {message_name} (ID: {message_id})")
except Exception as e:
print(f"[ERROR] Error while parsing DBC file: {e}")
# Sort messages by message name or ID
tx_messages.sort(key=lambda x: x[0]) # Sort by message ID
return [message[1] for message in tx_messages]
def generate_tx_enum(tx_messages, output_file, cycle):
if not tx_messages:
print("[WARNING] No TX messages found for enum generation.")
return
with open(output_file, 'w') as f:
f.write("typedef enum {\n")
for idx, message in enumerate(tx_messages):
enum_name = f"ECU3_CH0_TX_{message}_{cycle}"
f.write(f" {enum_name} = {idx},\n")
f.write(" NUMBER_OF_ECU3_CH0_TX_MESSAGE,\n")
f.write("} ECU3_CH0_TX;\n")
print(f"[INFO] TX enum written to {output_file}")
def generate_vcu_can_transmit_single_c_file(dbc_path, output_file):
# Load the DBC file
db = cantools.database.load_file(dbc_path)
# Initialize the header of the C file
c_file_content = """
#include "can.h"
"""
# Iterate through all messages in the DBC file
for message in db.messages:
# Check if the message's sending node is "VCU"
if "VCU" in message.senders:
# Add the function definition for the message
c_file_content += f"""
void Transmit_{message.name}_CH0_0x{message.frame_id:X}(void)
{{
"""
# Iterate through signals and generate bit-packing logic
buffer_assignments = [""] * message.length
for signal in message.signals:
start_byte = signal.start // 8
start_bit = signal.start % 8
signal_length = signal.length
signal_name = f"ECU3.TX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name}"
# Handle 8-bit chunks
while signal_length > 0:
bits_in_byte = min(8 - start_bit, signal_length)
shift_amount = (signal.length - signal_length)
shift_expr = f"({signal_name} >> shift{shift_amount}) << shift{start_bit}" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})"
buffer_index = start_byte
# Add to the buffer assignments
if buffer_assignments[buffer_index]:
buffer_assignments[buffer_index] += f"\n | {shift_expr}"
else:
buffer_assignments[buffer_index] = f"{shift_expr}"
# Update pointers
signal_length -= bits_in_byte
start_byte += 1
start_bit = 0
# Write buffer assignments to the function
for i, assignment in enumerate(buffer_assignments):
if assignment:
c_file_content += f" CAN_ch[0].tx.buf[{i}] = ({assignment}) & _8bit;\n"
# Add the send function call
c_file_content += f"""
can_send_config(CAN_INST_0, g_messageObjectConf_ECU3_0ch_TX[ECU3_CH0_TX_{message.name}_10ms]);
}}
"""
# Write the generated code to a single C file
with open(output_file, "w") as c_file:
c_file.write(c_file_content)
print(f"Generated C file with all VCU messages: {output_file}")
if __name__ == "__main__":
dbc_file_path = sys.argv[1]
output_structs_file = "C:/Users/MSI/Desktop/DB/generated_tx_structs.h"
output_globals_file = "C:/Users/MSI/Desktop/DB/generated_tx_globals.c"
output_globals_header = "C:/Users/MSI/Desktop/DB/generated_tx_globals.h"
output_tx_functions_file = "C:/Users/MSI/Desktop/DB/generated_tx_functions.c"
output_tx_initialization = "C:/Users/MSI/Desktop/DB/generated_tx_initialization.c"
output_enum_file = "C:/Users/MSI/Desktop/DB/generated_tx_enum.h"
cycle_time = "10ms"
output_c_file = "C:/Users/MSI/Desktop/DB/Transmit_All_VCU_Messages.c" # Replace with your desired output file name
generate_vcu_can_transmit_single_c_file(dbc_file_path, output_c_file)
signals = parse_dbc_file(dbc_file_path)
generate_tx_structs(signals, output_structs_file)
generate_tx_globals(signals, output_globals_file, output_globals_header)
generate_tx_functions(signals, output_tx_functions_file)
generate_tx_initialization(signals, output_tx_initialization)
tx_messages = parse_dbc_for_tx_messages(dbc_file_path)
generate_tx_enum(tx_messages, output_enum_file, cycle_time)