import re import sys import cantools def parse_dbc_file(file_path): signals = [] current_message = None try: with open(file_path, 'r', encoding='cp1252') as f: lines = f.readlines() print(f"[INFO] Read {len(lines)} lines from DBC file.") start_parsing = False for line in lines: line = line.strip() if not line: continue if not start_parsing: if line.startswith("BO_"): start_parsing = True else: continue if line.startswith("BO_"): if current_message and current_message["signals"]: signals.append(current_message) print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.") parts = line.split() if len(parts) < 5: print(f"[WARNING] Skipping malformed BO_ line: {line}") continue current_message = { "id": parts[1], "name": parts[2].replace(":", ""), "dlc": parts[3], "transmitter": parts[4], "signals": [] } print(f"[INFO] Found message: {current_message['name']}") elif line.startswith("SG_") and current_message: try: parts = line.split(":") if len(parts) < 2: print(f"[WARNING] Skipping malformed SG_ line: {line}") continue signal_name = parts[0].split()[1].strip() bit_info = parts[1].split()[0] receiver = parts[1].split()[-1] if current_message["transmitter"] != "VCU": continue byte_offset, rest = bit_info.split("|") bit_offset = int(rest.split("@")[0]) size = int(re.search(r'\d+', rest.split("@")[0]).group()) factor_offset_match = re.search(r'\(([^)]+)\)', line) if not factor_offset_match: print(f"[WARNING] Skipping signal with missing factor/offset: {line}") continue factor_offset = factor_offset_match.group(1).split(",") factor = float(factor_offset[0]) offset = float(factor_offset[1]) current_message["signals"].append({ "name": signal_name, "byte_offset": int(byte_offset) // 8, "bit_offset": bit_offset, "size": size, "factor": factor, "offset": offset, "signed": '-' in rest.split("@")[1] }) print(f"[INFO] Added signal: {signal_name}") except (ValueError, IndexError, AttributeError) as e: print(f"[ERROR] Error parsing signal: {line}, {e}") continue if current_message and current_message["signals"]: signals.append(current_message) print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.") except Exception as e: print(f"[ERROR] Error while parsing DBC file: {e}") return [] print(f"[INFO] Parsed {len(signals)} messages.") return signals def generate_tx_structs(signals, output_file): if not signals: print("[WARNING] No signals to generate TX structs for.") return with open(output_file, 'w') as f: f.write("#ifndef GENERATED_TX_STRUCTS_H\n") f.write("#define GENERATED_TX_STRUCTS_H\n\n") f.write("#include \n\n") for message in signals: hex_id = f"0x{int(message['id']):X}" f.write(f"typedef struct\n{{\n") for signal in message["signals"]: if signal["size"] <= 32: f.write(f" uint32_t {signal['name']} : {signal['size']};\n") else: f.write(f" float {signal['name']};\n") f.write(f"}} CH0_{message['name']}_{hex_id};\n\n") f.write("#endif // GENERATED_TX_STRUCTS_H\n") print(f"[INFO] TX structs written to {output_file}") def generate_tx_globals(signals, output_file, header_file): if not signals: print("[WARNING] No signals to generate TX globals for.") return with open(output_file, 'w') as f: for message in signals: for signal in message["signals"]: if signal["size"] > 32: f.write(f"float GV_{signal['name']} = 0.0f;\n") else: f.write(f"uint32_t GV_{signal['name']} = 0;\n") with open(header_file, 'w') as f: f.write("#ifndef GENERATED_TX_GLOBALS_H\n") f.write("#define GENERATED_TX_GLOBALS_H\n\n") f.write("#include \n\n") for message in signals: for signal in message["signals"]: if signal["size"] > 32: f.write(f"extern float GV_{signal['name']};\n") else: f.write(f"extern uint32_t GV_{signal['name']};\n") f.write("\n#endif // GENERATED_TX_GLOBALS_H\n") print(f"[INFO] TX globals written to {output_file} and {header_file}") def generate_tx_functions(signals, output_file): if not signals: print("[WARNING] No signals to generate TX functions for.") return with open(output_file, 'w') as f: for message in signals: hex_id = f"0x{int(message['id']):X}" function_name = f"void Output_Data_Set_{message['name']}_CH0_{hex_id}(void)" f.write(f"{function_name}\n{{\n") for signal in message["signals"]: factor_str = f"/ {signal['factor']}" if signal["factor"] != 1.0 else "" offset_str = f"- {signal['offset']}" if signal["offset"] != 0.0 else "" mask = f"_{signal['size']}bit" f.write( f" ECU3.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = " f"(int)((GV_{signal['name']} {offset_str}) {factor_str}) & {mask};\n" ) f.write("}\n\n") print(f"[INFO] TX functions written to {output_file}") def generate_tx_initialization(signals, output_file): if not signals: print("[WARNING] No TX signals found for initialization generation.") return with open(output_file, 'w') as f: f.write("void Initialize_TX_Signals(void)\n{\n") for message in signals: hex_id = f"0x{int(message['id']):X}" f.write(f" // {message['name']} ({hex_id})\n") for signal in message["signals"]: f.write(f" ECU3.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = 0;\n") f.write("\n") f.write("}\n") print(f"[INFO] TX initialization function written to {output_file}") def parse_dbc_for_tx_messages(file_path): tx_messages = [] try: with open(file_path, 'r', encoding='cp1252') as f: lines = f.readlines() print(f"[INFO] Read {len(lines)} lines from DBC file.") for line in lines: line = line.strip() if not line: continue if line.startswith("BO_"): parts = line.split() if len(parts) < 5: print(f"[WARNING] Skipping malformed BO_ line: {line}") continue message_id = int(parts[1]) # ID for sorting message_name = parts[2].replace(":", "") transmitter = parts[4] # Only include messages transmitted by VCU if transmitter == "VCU": tx_messages.append((message_id, message_name)) print(f"[INFO] Found TX message: {message_name} (ID: {message_id})") except Exception as e: print(f"[ERROR] Error while parsing DBC file: {e}") # Sort messages by message name or ID tx_messages.sort(key=lambda x: x[0]) # Sort by message ID return [message[1] for message in tx_messages] def generate_tx_enum(tx_messages, output_file, cycle): if not tx_messages: print("[WARNING] No TX messages found for enum generation.") return with open(output_file, 'w') as f: f.write("typedef enum {\n") for idx, message in enumerate(tx_messages): enum_name = f"ECU3_CH0_TX_{message}_{cycle}" f.write(f" {enum_name} = {idx},\n") f.write(" NUMBER_OF_ECU3_CH0_TX_MESSAGE,\n") f.write("} ECU3_CH0_TX;\n") print(f"[INFO] TX enum written to {output_file}") def generate_vcu_can_transmit_single_c_file(dbc_path, output_file): # Load the DBC file db = cantools.database.load_file(dbc_path) # Initialize the header of the C file c_file_content = """ #include "can.h" """ # Iterate through all messages in the DBC file for message in db.messages: # Check if the message's sending node is "VCU" if "VCU" in message.senders: # Add the function definition for the message c_file_content += f""" void Transmit_{message.name}_CH0_0x{message.frame_id:X}(void) {{ """ # Iterate through signals and generate bit-packing logic buffer_assignments = [""] * message.length for signal in message.signals: start_byte = signal.start // 8 start_bit = signal.start % 8 signal_length = signal.length signal_name = f"ECU3.TX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name}" # Handle 8-bit chunks while signal_length > 0: bits_in_byte = min(8 - start_bit, signal_length) shift_amount = (signal.length - signal_length) shift_expr = f"({signal_name} >> shift{shift_amount}) << shift{start_bit}" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})" buffer_index = start_byte # Add to the buffer assignments if buffer_assignments[buffer_index]: buffer_assignments[buffer_index] += f"\n | {shift_expr}" else: buffer_assignments[buffer_index] = f"{shift_expr}" # Update pointers signal_length -= bits_in_byte start_byte += 1 start_bit = 0 # Write buffer assignments to the function for i, assignment in enumerate(buffer_assignments): if assignment: c_file_content += f" CAN_ch[0].tx.buf[{i}] = ({assignment}) & _8bit;\n" # Add the send function call c_file_content += f""" can_send_config(CAN_INST_0, g_messageObjectConf_ECU3_0ch_TX[ECU3_CH0_TX_{message.name}_10ms]); }} """ # Write the generated code to a single C file with open(output_file, "w") as c_file: c_file.write(c_file_content) print(f"Generated C file with all VCU messages: {output_file}") if __name__ == "__main__": dbc_file_path = sys.argv[1] output_structs_file = "C:/Users/MSI/Desktop/DB/generated_tx_structs.h" output_globals_file = "C:/Users/MSI/Desktop/DB/generated_tx_globals.c" output_globals_header = "C:/Users/MSI/Desktop/DB/generated_tx_globals.h" output_tx_functions_file = "C:/Users/MSI/Desktop/DB/generated_tx_functions.c" output_tx_initialization = "C:/Users/MSI/Desktop/DB/generated_tx_initialization.c" output_enum_file = "C:/Users/MSI/Desktop/DB/generated_tx_enum.h" cycle_time = "10ms" output_c_file = "C:/Users/MSI/Desktop/DB/Transmit_All_VCU_Messages.c" # Replace with your desired output file name generate_vcu_can_transmit_single_c_file(dbc_file_path, output_c_file) signals = parse_dbc_file(dbc_file_path) generate_tx_structs(signals, output_structs_file) generate_tx_globals(signals, output_globals_file, output_globals_header) generate_tx_functions(signals, output_tx_functions_file) generate_tx_initialization(signals, output_tx_initialization) tx_messages = parse_dbc_for_tx_messages(dbc_file_path) generate_tx_enum(tx_messages, output_enum_file, cycle_time)