mirror of
https://github.com/3minbe/DBC_Converter.git
synced 2026-05-17 01:23:58 +09:00
301 lines
12 KiB
Python
301 lines
12 KiB
Python
import re
|
|
import sys
|
|
import cantools
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
|
|
def parse_dbc_file(file_path):
|
|
signals = []
|
|
current_message = None
|
|
try:
|
|
with open(file_path, 'r', encoding='cp1252') as f:
|
|
lines = f.readlines()
|
|
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
|
|
|
start_parsing = False
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
if not start_parsing:
|
|
if line.startswith("BO_"):
|
|
start_parsing = True
|
|
else:
|
|
continue
|
|
|
|
if line.startswith("BO_"):
|
|
if current_message and current_message["signals"]:
|
|
signals.append(current_message)
|
|
print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
|
|
|
parts = line.split()
|
|
if len(parts) < 5:
|
|
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
|
continue
|
|
|
|
current_message = {
|
|
"id": parts[1],
|
|
"name": parts[2].replace(":", ""),
|
|
"dlc": parts[3],
|
|
"transmitter": parts[4],
|
|
"signals": []
|
|
}
|
|
print(f"[INFO] Found message: {current_message['name']}")
|
|
|
|
elif line.startswith("SG_") and current_message:
|
|
try:
|
|
parts = line.split(":")
|
|
if len(parts) < 2:
|
|
print(f"[WARNING] Skipping malformed SG_ line: {line}")
|
|
continue
|
|
|
|
signal_name = parts[0].split()[1].strip()
|
|
bit_info = parts[1].split()[0]
|
|
receiver = parts[1].split()[-1]
|
|
|
|
if receiver != "VCU":
|
|
continue
|
|
|
|
byte_offset, rest = bit_info.split("|")
|
|
bit_offset = int(rest.split("@")[0])
|
|
size = int(re.search(r'\d+', rest.split("@")[0]).group())
|
|
factor_offset_match = re.search(r'\(([^)]+)\)', line)
|
|
if not factor_offset_match:
|
|
print(f"[WARNING] Skipping signal with missing factor/offset: {line}")
|
|
continue
|
|
|
|
factor_offset = factor_offset_match.group(1).split(",")
|
|
factor = float(factor_offset[0])
|
|
offset = float(factor_offset[1])
|
|
|
|
current_message["signals"].append({
|
|
"name": signal_name,
|
|
"byte_offset": int(byte_offset) // 8,
|
|
"bit_offset": bit_offset,
|
|
"size": size,
|
|
"factor": factor,
|
|
"offset": offset,
|
|
"signed": '-' in rest.split("@")[1]
|
|
})
|
|
print(f"[INFO] Added signal: {signal_name}")
|
|
except (ValueError, IndexError, AttributeError) as e:
|
|
print(f"[ERROR] Error parsing signal: {line}, {e}")
|
|
continue
|
|
|
|
if current_message and current_message["signals"]:
|
|
signals.append(current_message)
|
|
print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Error while parsing DBC file: {e}")
|
|
return []
|
|
|
|
print(f"[INFO] Parsed {len(signals)} messages.")
|
|
return signals
|
|
|
|
|
|
def generate_vcu_rx_function_with_factors(dbc_path, output_file):
|
|
# Load the DBC file
|
|
db = cantools.database.load_file(dbc_path)
|
|
|
|
# Initialize the header of the C file
|
|
c_file_content = """
|
|
#include "can.h"
|
|
|
|
// Declare Factors and Offsets for signals
|
|
"""
|
|
|
|
# Collect unique Factors and Offsets
|
|
factors_offsets = {}
|
|
for message in db.messages:
|
|
if "VCU" in message.receivers:
|
|
for signal in message.signals:
|
|
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
|
offset_name = f"Offset_m_{abs(signal.offset):.0f}"
|
|
if signal.scale != 1:
|
|
factors_offsets[factor_name] = signal.scale
|
|
if signal.offset != 0:
|
|
factors_offsets[offset_name] = signal.offset
|
|
|
|
# Add Factor and Offset variable declarations
|
|
for name, value in factors_offsets.items():
|
|
c_file_content += f"const float {name} = {value};\n"
|
|
|
|
# Add a newline for separation
|
|
c_file_content += "\n"
|
|
|
|
# Iterate through all messages in the DBC file
|
|
for message in db.messages:
|
|
# Check if "VCU" is in the receivers list
|
|
if "VCU" in message.receivers:
|
|
# Define the temporary struct
|
|
temp_struct_name = f"CH0_MV1_0x{message.frame_id:X}_temp"
|
|
c_file_content += f"""
|
|
void Receive_{message.name}_CH0_0x{message.frame_id:X}(void)
|
|
{{
|
|
struct {{
|
|
"""
|
|
|
|
# Add temporary variables to the struct
|
|
for signal in message.signals:
|
|
if signal.is_signed:
|
|
signal_type = "signed int"
|
|
else:
|
|
signal_type = "unsigned int"
|
|
c_file_content += f" {signal_type} {signal.name}_temp : {signal.length};\n"
|
|
|
|
c_file_content += f" }} {temp_struct_name};\n\n"
|
|
|
|
# Add temp assignments
|
|
for signal in message.signals:
|
|
start_byte = signal.start // 8
|
|
start_bit = signal.start % 8
|
|
signal_length = signal.length
|
|
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{start_bit})"
|
|
if signal_length > 8:
|
|
# Handle multi-byte signals
|
|
multi_byte_expr = []
|
|
for i in range((signal_length + 7) // 8):
|
|
byte_shift = i * 8
|
|
if i == 0:
|
|
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
|
else:
|
|
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
|
shift_expr = " | ".join(multi_byte_expr)
|
|
c_file_content += f" {temp_struct_name}.{signal.name}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
|
|
|
c_file_content += "\n"
|
|
|
|
# Assign to final ECU variables
|
|
for signal in message.signals:
|
|
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
|
offset_name = f"Offset_m_{abs(signal.offset):.0f}"
|
|
factor = f" * {factor_name}" if signal.scale != 1 else ""
|
|
offset = f" + {offset_name}" if signal.offset != 0 else ""
|
|
temp_var = f"{temp_struct_name}.{signal.name}_temp"
|
|
if signal.is_signed:
|
|
c_file_content += f" ECU3.RX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name} = ({temp_var}{factor}){offset};\n"
|
|
else:
|
|
c_file_content += f" ECU3.RX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name} = {temp_var}{factor}{offset};\n"
|
|
|
|
c_file_content += "}\n"
|
|
|
|
# Write the generated code to a single C file
|
|
with open(output_file, "w") as c_file:
|
|
c_file.write(c_file_content)
|
|
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
|
|
|
|
|
def generate_input_functions(signals, output_file):
|
|
if not signals:
|
|
print("[WARNING] No signals to generate Input functions for.")
|
|
return
|
|
|
|
with open(output_file, 'w') as f:
|
|
for message in signals:
|
|
hex_id = f"0x{int(message['id']):X}"
|
|
function_name = f"void Input_Data_Set_{message['name']}_CH0_{hex_id}(void)"
|
|
f.write(f"{function_name}\n{{\n")
|
|
for signal in message["signals"]:
|
|
f.write(f" GV_{signal['name']} = ECU3.RX.CH0_RX_{message['name']}_{hex_id}.{signal['name']};\n")
|
|
f.write("}\n\n")
|
|
print(f"[INFO] Input functions written to {output_file}")
|
|
|
|
|
|
def generate_structs(signals, output_file):
|
|
if not signals:
|
|
print("[WARNING] No signals to generate structs for.")
|
|
return
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write("#ifndef GENERATED_STRUCTS_H\n")
|
|
f.write("#define GENERATED_STRUCTS_H\n\n")
|
|
f.write("#include <stdint.h>\n\n")
|
|
|
|
for message in signals:
|
|
hex_id = f"0x{int(message['id']):X}"
|
|
f.write(f"typedef struct\n{{\n")
|
|
for signal in message["signals"]:
|
|
if signal["size"] <= 32:
|
|
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
|
else:
|
|
f.write(f" float {signal['name']};\n")
|
|
f.write(f"}} CH0_RX_{message['name']}_{hex_id};\n\n")
|
|
f.write("#endif // GENERATED_STRUCTS_H\n")
|
|
print(f"[INFO] Structs written to {output_file}")
|
|
|
|
|
|
def generate_globals(signals, output_file, header_file):
|
|
if not signals:
|
|
print("[WARNING] No signals to generate globals for.")
|
|
return
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write("#include <generated_globals.h>\n")
|
|
|
|
for message in signals:
|
|
for signal in message["signals"]:
|
|
if signal["size"] > 32:
|
|
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
|
else:
|
|
f.write(f"uint32_t GV_{signal['name']} = 0;\n")
|
|
|
|
with open(header_file, 'w') as f:
|
|
f.write("#ifndef GENERATED_GLOBALS_H\n")
|
|
f.write("#define GENERATED_GLOBALS_H\n\n")
|
|
f.write("#include <stdint.h>\n\n")
|
|
for message in signals:
|
|
for signal in message["signals"]:
|
|
if signal["size"] > 32:
|
|
f.write(f"extern float GV_{signal['name']};\n")
|
|
else:
|
|
f.write(f"extern uint32_t GV_{signal['name']};\n")
|
|
f.write("\n#endif // GENERATED_GLOBALS_H\n")
|
|
print(f"[INFO] Globals and extern declarations written to {output_file} and {header_file}")
|
|
|
|
|
|
def generate_initialization(signals, output_file):
|
|
if not signals:
|
|
print("[WARNING] No signals to generate initialization for.")
|
|
return
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write("void ECU3_Data_Init(void)\n{\n")
|
|
for message in signals:
|
|
hex_id = f"0x{int(message['id']):X}"
|
|
struct_prefix = f"ECU3.RX.CH0_RX_{message['name']}_{hex_id}"
|
|
for signal in message["signals"]:
|
|
if signal["offset"] != 0.0:
|
|
if signal["size"] <= 32:
|
|
f.write(f" {struct_prefix}.{signal['name']} = {signal['offset']};\n")
|
|
else:
|
|
f.write(f" {struct_prefix}.{signal['name']} = {signal['offset']}f;\n")
|
|
else:
|
|
if signal["size"] <= 32:
|
|
f.write(f" {struct_prefix}.{signal['name']} = 0;\n")
|
|
else:
|
|
f.write(f" {struct_prefix}.{signal['name']} = 0.0f;\n")
|
|
f.write("\n")
|
|
f.write("}\n")
|
|
print(f"[INFO] Initialization function written to {output_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
dbc_file_path = sys.argv[1]
|
|
output_dir = sys.argv[2]
|
|
|
|
output_c_file = f"{output_dir}/generated_receive.c"
|
|
output_input_file = f"{output_dir}/generated_input.c"
|
|
output_structs_file = f"{output_dir}/generated_structs.h"
|
|
output_globals_file = f"{output_dir}/generated_globals.c"
|
|
output_globals_header = f"{output_dir}/generated_globals.h"
|
|
output_initialization_file = f"{output_dir}/generated_init.c"
|
|
|
|
signals = parse_dbc_file(dbc_file_path)
|
|
generate_vcu_rx_function_with_factors(dbc_file_path, output_c_file)
|
|
generate_input_functions(signals, output_input_file)
|
|
generate_structs(signals, output_structs_file)
|
|
generate_globals(signals, output_globals_file, output_globals_header)
|
|
generate_initialization(signals, output_initialization_file) |