mirror of
https://github.com/3minbe/DBC_Converter.git
synced 2026-05-17 01:23:58 +09:00
Merge pull request #5 from 3minbe/smbaeWS_LapTop
Smbae ws lap top → main
This commit is contained in:
commit
c396328073
@ -89,12 +89,13 @@ class MainView(QtWidgets.QMainWindow):
|
||||
self.default_save_path = os.path.join(os.path.expanduser("~"), "Desktop") # 바탕화면 경로 설정
|
||||
self.file_paths = [] # 파일 경로 저장 리스트
|
||||
self.channel_info = {} # 채널 정보 초기화
|
||||
self.channel_options = ["CH0", "CH1", "CH2", "CH3", "CH4", "CH5"] # 채널 옵션 설정
|
||||
self.loadSettings() # 설정 로드
|
||||
self.setupUI(base_path) # UI 설정
|
||||
self.centerWindow() # 창을 화면 중앙에 위치
|
||||
self.sortTreeView(0, True) # 기본 파일명 오름차순 정렬
|
||||
self.channel_options = ["CH0", "CH1", "CH2", "CH3", "CH4", "CH5"] # 채널 옵션 설정
|
||||
self.setupCloseEvent() # 프로그램 종료 이벤트 설정
|
||||
self.tree.itemSelectionChanged.connect(self.onFileSelectionChanged)
|
||||
|
||||
def loadSettings(self):
|
||||
self.settings_file = "settings.json"
|
||||
@ -160,10 +161,10 @@ class MainView(QtWidgets.QMainWindow):
|
||||
|
||||
# 보기 메뉴 항목 추가
|
||||
size_menu = view_menu.addMenu("창크기")
|
||||
size_menu.addAction("기본", lambda: self.setWindowSize("default")).setShortcut('Ctrl+0')
|
||||
size_menu.addAction("작게", lambda: self.setWindowSize("small")).setShortcut('Ctrl+1')
|
||||
size_menu.addAction("보통", lambda: self.setWindowSize("medium")).setShortcut('Ctrl+2')
|
||||
size_menu.addAction("크게", lambda: self.setWindowSize("large")).setShortcut('Ctrl+3')
|
||||
size_menu.addAction("기본", lambda: self.setWindowSize("default")).setShortcut('Ctrl+0')
|
||||
size_menu.addAction("자동", lambda: self.setWindowSize("auto")).setShortcut('Ctrl+4')
|
||||
|
||||
sort_menu = view_menu.addMenu("정렬")
|
||||
@ -288,10 +289,46 @@ class MainView(QtWidgets.QMainWindow):
|
||||
|
||||
settings_label = QtWidgets.QLabel("설정")
|
||||
settings_label.setAlignment(QtCore.Qt.AlignCenter)
|
||||
settings_label.setStyleSheet("font-weight: bold; font-size: 14px;") # 글자를 굵게 하고 크기를 16px로 설정
|
||||
settings_layout.addWidget(settings_label)
|
||||
|
||||
# 설정 라벨과 채널 선택 라벨 사이에 작은 간격 추가
|
||||
settings_layout.addSpacing(20) # 10 픽셀 간격 추가
|
||||
|
||||
# 채널 선택 드롭다운 메뉴 추가
|
||||
channel_label = QtWidgets.QLabel("# 채널 선택")
|
||||
channel_label.setAlignment(QtCore.Qt.AlignLeft)
|
||||
settings_layout.addWidget(channel_label)
|
||||
|
||||
self.channel_combo = QtWidgets.QComboBox()
|
||||
self.channel_combo.addItems(self.channel_options)
|
||||
self.channel_combo.setEnabled(False) # 초기에는 비활성화
|
||||
self.channel_combo.currentIndexChanged.connect(self.onChannelChanged)
|
||||
settings_layout.addWidget(self.channel_combo)
|
||||
|
||||
# 상단 정렬을 위해 빈 공간 추가
|
||||
settings_layout.addStretch()
|
||||
|
||||
return settings_panel
|
||||
|
||||
def onFileSelectionChanged(self):
|
||||
selected_items = self.tree.selectedItems()
|
||||
if selected_items:
|
||||
self.channel_combo.setEnabled(True)
|
||||
current_channel = selected_items[0].text(2)
|
||||
self.channel_combo.setCurrentText(current_channel)
|
||||
else:
|
||||
self.channel_combo.setEnabled(False)
|
||||
|
||||
def onChannelChanged(self):
|
||||
selected_items = self.tree.selectedItems()
|
||||
if selected_items:
|
||||
new_channel = self.channel_combo.currentText()
|
||||
for item in selected_items:
|
||||
item.setText(2, new_channel)
|
||||
self.updateChannelInfo(item.text(1), item.text(0), new_channel)
|
||||
self.updateAlertText(f"채널 변경", [f"{item.text(0)}의 채널이 {new_channel}(으)로 변경되었습니다." for item in selected_items])
|
||||
|
||||
def selectSavePath(self):
|
||||
selected_path = QtWidgets.QFileDialog.getExistingDirectory(self, "저장 경로 선택", self.default_save_path)
|
||||
if selected_path:
|
||||
@ -421,7 +458,7 @@ class MainView(QtWidgets.QMainWindow):
|
||||
elif size == "large":
|
||||
self.resize(1200, 1000)
|
||||
elif size == "default":
|
||||
self.resize(1000, 600)
|
||||
self.resize(1250, 600)
|
||||
elif size == "auto":
|
||||
self.adjustSize()
|
||||
|
||||
@ -489,15 +526,18 @@ class MainView(QtWidgets.QMainWindow):
|
||||
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
rx_output_dir = os.path.join(base_output_dir, file_name, "RX")
|
||||
tx_output_dir = os.path.join(base_output_dir, file_name, "TX")
|
||||
common_output_dir = os.path.join(base_output_dir, file_name, "Common")
|
||||
dbc_output_dir = os.path.join(base_output_dir, "#DBC") # DBC 파일 저장 경로 설정
|
||||
os.makedirs(rx_output_dir, exist_ok=True)
|
||||
os.makedirs(tx_output_dir, exist_ok=True)
|
||||
os.makedirs(common_output_dir, exist_ok=True)
|
||||
os.makedirs(dbc_output_dir, exist_ok=True) # DBC 파일 저장 경로 생성
|
||||
channel_info = self.settings.get("channel_info", {}).get(os.path.basename(file_path), "CH0")
|
||||
try:
|
||||
shutil.copy(file_path, dbc_output_dir) # DBC 파일 복사
|
||||
subprocess.run(["python", "DBC_to_C_RX.py", file_path, rx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_to_C_TX.py", file_path, tx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_Converter_RX.py", file_path, rx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_Converter_TX.py", file_path, tx_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
subprocess.run(["python", "DBC_Converter_Common.py", file_path, common_output_dir, json.dumps({os.path.basename(file_path): channel_info})], check=True)
|
||||
self.updateAlertText(f"변환 성공", [file_path])
|
||||
except subprocess.CalledProcessError as e:
|
||||
self.updateAlertText(f"변환 실패: {e}", [file_path])
|
||||
|
||||
77
DBC_Converter_Common.py
Normal file
77
DBC_Converter_Common.py
Normal file
@ -0,0 +1,77 @@
|
||||
import os
|
||||
import sys
|
||||
from DBC_Converter_Data_Parsing import load_dbc_file
|
||||
|
||||
def generate_structs(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate structs for.")
|
||||
return
|
||||
|
||||
# Ensure the output directory exists
|
||||
output_dir = os.path.dirname(output_file)
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_STRUCTS_H\n")
|
||||
f.write("#define GENERATED_STRUCTS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
|
||||
tx_structs = {}
|
||||
rx_structs = {}
|
||||
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
|
||||
if message_info["TX ECU name"] == "VCU":
|
||||
struct_name = f"{message_name}_{hex_id}"
|
||||
tx_structs[struct_name] = []
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] <= 32:
|
||||
tx_structs[struct_name].append(f" uint32_t {signal['Signal name']} : {signal['Length']};")
|
||||
else:
|
||||
tx_structs[struct_name].append(f" float {signal['Signal name']};")
|
||||
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] == "VCU":
|
||||
struct_name = f"{message_name}_{hex_id}"
|
||||
if struct_name not in rx_structs:
|
||||
rx_structs[struct_name] = []
|
||||
if signal["Length"] <= 32:
|
||||
rx_structs[struct_name].append(f" uint32_t {signal['Signal name']} : {signal['Length']};")
|
||||
else:
|
||||
rx_structs[struct_name].append(f" float {signal['Signal name']};")
|
||||
|
||||
f.write("typedef struct {\n")
|
||||
f.write(" typedef struct {\n")
|
||||
for struct_name, fields in tx_structs.items():
|
||||
f.write(f" typedef struct {{\n")
|
||||
for field in fields:
|
||||
f.write(f" {field}\n")
|
||||
f.write(f" }} {struct_name};\n\n")
|
||||
f.write(" } TX;\n\n")
|
||||
|
||||
f.write(" typedef struct {\n")
|
||||
for struct_name, fields in rx_structs.items():
|
||||
f.write(f" typedef struct {{\n")
|
||||
for field in fields:
|
||||
f.write(f" {field}\n")
|
||||
f.write(f" }} {struct_name};\n\n")
|
||||
f.write(" } RX;\n")
|
||||
f.write("} VCU;\n\n")
|
||||
|
||||
f.write("#endif // GENERATED_STRUCTS_H\n")
|
||||
print(f"[INFO] Structs written to {output_file}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_structs_file = f"{output_dir}/generated_structs.h"
|
||||
|
||||
signals = load_dbc_file(dbc_file_path)
|
||||
if signals is None:
|
||||
print(f"[ERROR] Failed to load DBC file: {dbc_file_path}")
|
||||
sys.exit(1)
|
||||
|
||||
generate_structs(signals, output_structs_file)
|
||||
@ -1,4 +1,4 @@
|
||||
import re
|
||||
import re, sys
|
||||
|
||||
def load_dbc_file(file_path):
|
||||
try:
|
||||
@ -8,7 +8,7 @@ def load_dbc_file(file_path):
|
||||
|
||||
# 메시지와 시그널을 추출하는 정규 표현식
|
||||
message_pattern = re.compile(r'BO_\s+(\d+)\s+(\w+)\s*:\s*(\d+)\s+(\w+)')
|
||||
signal_pattern = re.compile(r'SG_\s+(\w+)\s*:\s*(\d+)\|(\d+)@(\d+)\+\s*\(([^,]+),\s*([^)]+)\)')
|
||||
signal_pattern = re.compile(r'SG_\s+(\w+)\s*:\s*(\d+)\|(\d+)@(\d+)([+-])\s*\(([^,]+),\s*([^)]+)\)\s*\[[^\]]+\]\s*\"[^\"]*\"\s+(\w+)')
|
||||
|
||||
# 메시지와 시그널 매칭
|
||||
messages = {}
|
||||
@ -41,15 +41,19 @@ def load_dbc_file(file_path):
|
||||
msb = int(signal_match.group(2))
|
||||
length = int(signal_match.group(3))
|
||||
byte_order = int(signal_match.group(4))
|
||||
factor = float(signal_match.group(5))
|
||||
offset = float(signal_match.group(6))
|
||||
sign = signal_match.group(5)
|
||||
factor = float(signal_match.group(6))
|
||||
offset = float(signal_match.group(7))
|
||||
rx_ecu_name = signal_match.group(8)
|
||||
messages[current_message]["Signals"].append({
|
||||
"Signal name": signal_name,
|
||||
"msb": msb,
|
||||
"Length": length,
|
||||
"Byte order": byte_order,
|
||||
"Sign": sign,
|
||||
"Factor": factor,
|
||||
"Offset": offset
|
||||
"Offset": offset,
|
||||
"RX ECU name": rx_ecu_name
|
||||
})
|
||||
|
||||
return messages
|
||||
@ -59,13 +63,11 @@ def load_dbc_file(file_path):
|
||||
return None
|
||||
|
||||
if __name__ == "__main__":
|
||||
file_path = 'C:/Users/MSI/Desktop/python/motorola_tx/MOTOROLA_V2.dbc'
|
||||
file_path = sys.argv[1]
|
||||
messages = load_dbc_file(file_path)
|
||||
print(messages)
|
||||
print("\n")
|
||||
|
||||
# if messages:
|
||||
# for message_name, message_info in messages.items():
|
||||
# print(f"[INFO] Message: {message_name}, Info: {message_info['id']}, {message_info['dlc']}, {message_info['tx_ecu_name']}")
|
||||
# for signal in message_info['signals']:
|
||||
# print(f" Signal: {signal}")
|
||||
|
||||
if messages:
|
||||
for message_name, message_info in messages.items():
|
||||
print(f"[INFO] Message: {message_name}, Info: {message_info['ID']}, {message_info['DLC']}, {message_info['TX ECU name']}")
|
||||
for signal in message_info['Signals']:
|
||||
print(f" └ Signal: {signal}")
|
||||
207
DBC_Converter_RX.py
Normal file
207
DBC_Converter_RX.py
Normal file
@ -0,0 +1,207 @@
|
||||
import sys
|
||||
from DBC_Converter_Data_Parsing import load_dbc_file
|
||||
|
||||
#============================== Generate Globals ==============================#
|
||||
def generate_globals(signals, C_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate globals for.")
|
||||
return
|
||||
|
||||
with open(C_file, 'w') as f:
|
||||
f.write("#include <generated_globals.h>\n")
|
||||
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"float GV_{signal['Signal name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['Signal name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"extern float GV_{signal['Signal name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['Signal name']};\n")
|
||||
f.write("\n#endif // GENERATED_GLOBALS_H\n")
|
||||
print(f"[INFO] Globals and extern declarations written to {C_file} and {header_file}")
|
||||
|
||||
#============================== Generate VCU RX Function ==============================#
|
||||
def generate_vcu_rx_function_with_factors(signals, output_file):
|
||||
if not signals:
|
||||
print("[ERROR] No signals to generate VCU RX functions for.")
|
||||
return
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
|
||||
// Declare Factors and Offsets for signals
|
||||
"""
|
||||
|
||||
# Collect unique Factors and Offsets
|
||||
factors_offsets = {}
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
factor_name = f"Factor_{str(signal['Factor']).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal['Offset']):.0f}"
|
||||
if signal['Factor'] != 1:
|
||||
factors_offsets[factor_name] = signal['Factor']
|
||||
if signal['Offset'] != 0:
|
||||
factors_offsets[offset_name] = signal['Offset']
|
||||
|
||||
# Add Factor and Offset variable declarations
|
||||
for name, value in factors_offsets.items():
|
||||
c_file_content += f"const float {name} = {value};\n"
|
||||
|
||||
# Add a newline for separation
|
||||
c_file_content += "\n"
|
||||
|
||||
# Iterate through all messages in the signals
|
||||
for message_name, message_info in signals.items():
|
||||
# Define the temporary struct
|
||||
temp_struct_name = f"{message_name}_temp"
|
||||
|
||||
# Check if any signal in the message has RX ECU name as VCU
|
||||
has_vcu_signal = any(signal["RX ECU name"] == "VCU" for signal in message_info["Signals"])
|
||||
if not has_vcu_signal:
|
||||
continue # Skip this message if no signal has RX ECU name as VCU
|
||||
|
||||
c_file_content += f"""
|
||||
void Receive_{message_name}_{message_info['ID']}(void)
|
||||
{{
|
||||
struct {{
|
||||
"""
|
||||
|
||||
# Add temporary variables to the struct
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] != "VCU":
|
||||
continue # RX ECU name이 VCU가 아닌 경우 건너뜁니다.
|
||||
if signal["Sign"] == "-":
|
||||
signal_type = "signed int"
|
||||
elif signal["Sign"] == "+":
|
||||
signal_type = "unsigned int"
|
||||
|
||||
c_file_content += f" {signal_type} {signal['Signal name']}_temp : {signal['Length']};\n"
|
||||
|
||||
c_file_content += f" }} {temp_struct_name};\n\n"
|
||||
|
||||
# Add temp assignments
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] != "VCU":
|
||||
continue # RX ECU name이 VCU가 아닌 경우 건너뜁니다.
|
||||
start_byte = signal["msb"] // 8
|
||||
start_bit = signal["msb"] % 8
|
||||
signal_length = signal["Length"]
|
||||
if signal["Byte order"] == 0: # Motorola (Big Endian)
|
||||
lsb = signal["msb"] + 8*(signal_length // 8) - (signal_length % 8 - 1)
|
||||
if signal_length > 8:
|
||||
# Handle multi-byte signals
|
||||
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] << shift{7 - start_bit})"
|
||||
multi_byte_expr = []
|
||||
for i in range((signal_length + 7) // 8):
|
||||
byte_shift = (7 - start_bit) + (i * 8)
|
||||
if i == 0:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{7 - start_bit})")
|
||||
else:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{byte_shift})")
|
||||
shift_expr = " | ".join(multi_byte_expr)
|
||||
else :
|
||||
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{lsb % 8})"
|
||||
else: # Intel (Little Endian)
|
||||
shift_expr = f"(CAN_ch[0].rx.buf[{start_byte}] >> shift{start_bit})"
|
||||
if signal_length > 8:
|
||||
# Handle multi-byte signals
|
||||
multi_byte_expr = []
|
||||
for i in range((signal_length + 7) // 8):
|
||||
byte_shift = i * 8
|
||||
if i == 0:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
||||
else:
|
||||
multi_byte_expr.append(f"(CAN_ch[0].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
||||
shift_expr = " | ".join(multi_byte_expr)
|
||||
c_file_content += f" {temp_struct_name}.{signal['Signal name']}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
||||
|
||||
c_file_content += "\n"
|
||||
|
||||
# Assign to final ECU variables
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["RX ECU name"] != "VCU":
|
||||
continue # RX ECU name이 VCU가 아닌 경우 건너뜁니다.
|
||||
factor_name = f"Factor_{str(signal['Factor']).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal['Offset']):.0f}"
|
||||
factor = f" * {factor_name}" if signal['Factor'] != 1 else ""
|
||||
offset = f" + {offset_name}" if signal['Offset'] != 0 else ""
|
||||
temp_var = f"{temp_struct_name}.{signal['Signal name']}_temp"
|
||||
c_file_content += f" VCU.RX.{message_name}_{message_info['ID']}.{signal['Signal name']} = ({temp_var}{factor}){offset};\n"
|
||||
|
||||
c_file_content += "}\n"
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
||||
|
||||
#============================== Generate Input Function ==============================#
|
||||
def generate_input_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate Input functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
function_name = f"void Input_Data_Set_{message_name}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message_info["Signals"]:
|
||||
f.write(f" GV_{signal['Signal name']} = VCU.RX.CH0_RX_{message_name}_{hex_id}.{signal['Signal name']};\n")
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] Input functions written to {output_file}")
|
||||
|
||||
#============================== Generate Initialization ==============================#
|
||||
def generate_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate initialization for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void VCU_Data_Init(void)\n{\n")
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
struct_prefix = f"VCU.RX.CH0_RX_{message_name}_{hex_id}"
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Offset"] != 0.0:
|
||||
if signal["Length"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = {signal['Offset']};\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = {signal['Offset']}f;\n")
|
||||
else:
|
||||
if signal["Length"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = 0;\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['Signal name']} = 0.0f;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] Initialization function written to {output_file}")
|
||||
|
||||
#============================== Main ==============================#
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_globals_C_file = f"{output_dir}/generated_globals.c"
|
||||
output_globals_header_file = f"{output_dir}/generated_globals.h"
|
||||
output_c_file = f"{output_dir}/generated_receive.c"
|
||||
output_input_file = f"{output_dir}/generated_input.c"
|
||||
output_initialization_file = f"{output_dir}/generated_init.c"
|
||||
|
||||
signals = load_dbc_file(dbc_file_path)
|
||||
|
||||
generate_globals(signals, output_globals_C_file, output_globals_header_file)
|
||||
generate_vcu_rx_function_with_factors(signals, output_c_file)
|
||||
generate_input_functions(signals, output_input_file)
|
||||
generate_initialization(signals, output_initialization_file)
|
||||
192
DBC_Converter_TX.py
Normal file
192
DBC_Converter_TX.py
Normal file
@ -0,0 +1,192 @@
|
||||
import sys
|
||||
from DBC_Converter_Data_Parsing import load_dbc_file
|
||||
|
||||
|
||||
|
||||
#============================== Generate TX Globals ==============================#
|
||||
def generate_tx_globals(signals, output_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX globals for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"float GV_{signal['Signal name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['Signal name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_TX_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_TX_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message_name, message_info in signals.items():
|
||||
for signal in message_info["Signals"]:
|
||||
if signal["Length"] > 32:
|
||||
f.write(f"extern float GV_{signal['Signal name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['Signal name']};\n")
|
||||
f.write("\n#endif // GENERATED_TX_GLOBALS_H\n")
|
||||
print(f"[INFO] TX globals written to {output_file} and {header_file}")
|
||||
|
||||
|
||||
#============================== Generate TX Functions ==============================#
|
||||
def generate_tx_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
function_name = f"void Output_Data_Set_{message_name}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message_info["Signals"]:
|
||||
factor_str = f"/ {signal['Factor']}" if signal["Factor"] != 1.0 else ""
|
||||
offset_str = f"- {signal['Offset']}" if signal["Offset"] != 0.0 else ""
|
||||
mask = f"_{signal['Length']}bit"
|
||||
f.write(
|
||||
f" VCU.TX.CH0_{message_name}_{hex_id}.{signal['Signal name']} = "
|
||||
f"(int)((GV_{signal['Signal name']} {offset_str}) {factor_str}) & {mask};\n"
|
||||
)
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] TX functions written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate TX Initialization ==============================#
|
||||
def generate_tx_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No TX signals found for initialization generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void Initialize_TX_Signals(void)\n{\n")
|
||||
for message_name, message_info in signals.items():
|
||||
hex_id = message_info["ID"]
|
||||
f.write(f" // {message_name} ({hex_id})\n")
|
||||
for signal in message_info["Signals"]:
|
||||
f.write(f" VCU.TX.CH0_{message_name}_{hex_id}.{signal['Signal name']} = 0;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] TX initialization function written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate TX Enum ==============================#
|
||||
def generate_tx_enum(signals, output_file, cycle):
|
||||
if not signals:
|
||||
print("[WARNING] No TX messages found for enum generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("typedef enum {\n")
|
||||
for idx, message_name in enumerate(signals.keys()):
|
||||
enum_name = f"VCU_CH0_TX_{message_name}_{cycle}"
|
||||
f.write(f" {enum_name} = {idx},\n")
|
||||
f.write(" NUMBER_OF_VCU_CH0_TX_MESSAGE,\n")
|
||||
f.write("} VCU_CH0_TX;\n")
|
||||
print(f"[INFO] TX enum written to {output_file}")
|
||||
|
||||
|
||||
#============================== Generate TX C File ==============================#
|
||||
def generate_vcu_can_transmit_single_c_file(signals, output_file):
|
||||
if not signals:
|
||||
print("[ERROR] No signals to generate VCU CAN transmit functions for.")
|
||||
return
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
"""
|
||||
|
||||
# Iterate through all messages in the signals
|
||||
for message_name, message_info in signals.items():
|
||||
# Check if any signal in the message has RX ECU name as VCU
|
||||
has_vcu_signal = any(signal["RX ECU name"] != "VCU" for signal in message_info["Signals"])
|
||||
if not has_vcu_signal:
|
||||
continue # Skip this message if no signal has RX ECU name as VCU
|
||||
|
||||
# Add the function definition for the message
|
||||
c_file_content += f"""
|
||||
void Transmit_{message_name}_CH0_{message_info['ID']}(void)
|
||||
{{
|
||||
"""
|
||||
|
||||
# Iterate through signals and generate bit-packing logic
|
||||
buffer_assignments = [""] * message_info["DLC"]
|
||||
for signal in message_info["Signals"]:
|
||||
start_byte = signal["msb"] // 8
|
||||
start_bit = signal["msb"] % 8
|
||||
signal_length = signal["Length"]
|
||||
signal_name = f"VCU.TX.CH0_{message_name}_{message_info['ID']}.{signal['Signal name']}"
|
||||
|
||||
# Handle 8-bit chunks
|
||||
while signal_length > 0:
|
||||
bits_in_byte = min(8 - start_bit, signal_length)
|
||||
shift_amount = (signal["Length"] - signal_length)
|
||||
if signal["Byte order"] == 0: # Motorola (Big Endian)
|
||||
if bits_in_byte == 8:
|
||||
shift_expr = f"({signal_name} >> shift{shift_amount})"
|
||||
else:
|
||||
shift_expr = f"({signal_name} << shift{start_bit})" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})"
|
||||
else: # Intel (Little Endian)
|
||||
shift_expr = f"({signal_name} >> shift{shift_amount}) << shift{start_bit}" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})"
|
||||
buffer_index = start_byte
|
||||
|
||||
# Ensure buffer_index is within the range of buffer_assignments
|
||||
if buffer_index >= len(buffer_assignments):
|
||||
print(f"[ERROR] Buffer index {buffer_index} out of range for message {message_name}")
|
||||
break
|
||||
|
||||
# Add to the buffer assignments
|
||||
if buffer_assignments[buffer_index]:
|
||||
buffer_assignments[buffer_index] += f"\n | {shift_expr}"
|
||||
else:
|
||||
buffer_assignments[buffer_index] = f"{shift_expr}"
|
||||
|
||||
# Update pointers
|
||||
signal_length -= bits_in_byte
|
||||
start_byte += 1
|
||||
start_bit = 0
|
||||
|
||||
# Write buffer assignments to the function
|
||||
for i, assignment in enumerate(buffer_assignments):
|
||||
if assignment:
|
||||
c_file_content += f" CAN_ch[0].tx.buf[{i}] = ({assignment}) & _8bit;\n"
|
||||
|
||||
# Add the send function call
|
||||
c_file_content += f"""
|
||||
can_send_config(CAN_INST_0, g_messageObjectConf_VCU_0ch_TX[VCU_CH0_TX_{message_name}_10ms]);
|
||||
}}
|
||||
"""
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated C file with all VCU messages: {output_file}")
|
||||
|
||||
|
||||
#============================== Main ==============================#
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_structs_file = f"{output_dir}/generated_tx_structs.h"
|
||||
output_globals_file = f"{output_dir}/generated_tx_globals.c"
|
||||
output_globals_header = f"{output_dir}/generated_tx_globals.h"
|
||||
output_tx_functions_file = f"{output_dir}/generated_tx_functions.c"
|
||||
output_tx_initialization = f"{output_dir}/generated_tx_initialization.c"
|
||||
output_enum_file = f"{output_dir}/generated_tx_enum.h"
|
||||
output_c_file = f"{output_dir}/Transmit_All_VCU_Messages.c" # Replace with your desired output file name
|
||||
cycle_time = "10ms"
|
||||
|
||||
signals = load_dbc_file(dbc_file_path)
|
||||
if signals is None:
|
||||
print(f"[ERROR] Failed to load DBC file: {dbc_file_path}")
|
||||
sys.exit(1)
|
||||
|
||||
generate_tx_globals(signals, output_globals_file, output_globals_header)
|
||||
generate_tx_functions(signals, output_tx_functions_file)
|
||||
generate_tx_initialization(signals, output_tx_initialization)
|
||||
generate_tx_enum(signals, output_enum_file, cycle_time)
|
||||
generate_vcu_can_transmit_single_c_file(signals, output_c_file)
|
||||
320
DBC_to_C_RX.py
320
DBC_to_C_RX.py
@ -1,320 +0,0 @@
|
||||
import re
|
||||
import sys
|
||||
import cantools
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
def parse_dbc_file(file_path):
|
||||
signals = []
|
||||
current_message = None
|
||||
try:
|
||||
with open(file_path, 'r', encoding='cp1252') as f:
|
||||
lines = f.readlines()
|
||||
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
||||
|
||||
start_parsing = False
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if not start_parsing:
|
||||
if line.startswith("BO_"):
|
||||
start_parsing = True
|
||||
else:
|
||||
continue
|
||||
|
||||
if line.startswith("BO_"):
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
||||
continue
|
||||
|
||||
current_message = {
|
||||
"id": parts[1],
|
||||
"name": parts[2].replace(":", ""),
|
||||
"dlc": parts[3],
|
||||
"transmitter": parts[4],
|
||||
"signals": []
|
||||
}
|
||||
print(f"[INFO] Found message: {current_message['name']}")
|
||||
|
||||
elif line.startswith("SG_") and current_message:
|
||||
try:
|
||||
parts = line.split(":")
|
||||
if len(parts) < 2:
|
||||
print(f"[WARNING] Skipping malformed SG_ line: {line}")
|
||||
continue
|
||||
|
||||
signal_name = parts[0].split()[1].strip()
|
||||
bit_info = parts[1].split()[0]
|
||||
receiver = parts[1].split()[-1]
|
||||
|
||||
if receiver != "VCU":
|
||||
continue
|
||||
|
||||
byte_offset, rest = bit_info.split("|")
|
||||
bit_offset = int(rest.split("@")[0])
|
||||
size = int(re.search(r'\d+', rest.split("@")[0]).group())
|
||||
factor_offset_match = re.search(r'\(([^)]+)\)', line)
|
||||
if not factor_offset_match:
|
||||
print(f"[WARNING] Skipping signal with missing factor/offset: {line}")
|
||||
continue
|
||||
|
||||
factor_offset = factor_offset_match.group(1).split(",")
|
||||
factor = float(factor_offset[0])
|
||||
offset = float(factor_offset[1])
|
||||
|
||||
current_message["signals"].append({
|
||||
"name": signal_name,
|
||||
"byte_offset": int(byte_offset) // 8,
|
||||
"bit_offset": bit_offset,
|
||||
"size": size,
|
||||
"factor": factor,
|
||||
"offset": offset,
|
||||
"signed": '-' in rest.split("@")[1]
|
||||
})
|
||||
print(f"[INFO] Added signal: {signal_name}")
|
||||
except (ValueError, IndexError, AttributeError) as e:
|
||||
print(f"[ERROR] Error parsing signal: {line}, {e}")
|
||||
continue
|
||||
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error while parsing DBC file: {e}")
|
||||
return []
|
||||
|
||||
print(f"[INFO] Parsed {len(signals)} messages.")
|
||||
return signals
|
||||
|
||||
|
||||
def generate_vcu_rx_function_with_factors(dbc_path, output_file, channel_info):
|
||||
# Load the DBC file
|
||||
db = cantools.database.load_file(dbc_path)
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = f"""
|
||||
#include "can.h"
|
||||
|
||||
// Declare Factors and Offsets for signals
|
||||
"""
|
||||
|
||||
# Collect unique Factors and Offsets
|
||||
factors_offsets = {}
|
||||
for message in db.messages:
|
||||
if "VCU" in message.receivers:
|
||||
for signal in message.signals:
|
||||
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal.offset):.0f}"
|
||||
if signal.scale != 1:
|
||||
factors_offsets[factor_name] = signal.scale
|
||||
if signal.offset != 0:
|
||||
factors_offsets[offset_name] = signal.offset
|
||||
|
||||
# Add Factor and Offset variable declarations
|
||||
for name, value in factors_offsets.items():
|
||||
c_file_content += f"const float {name} = {value};\n"
|
||||
|
||||
# Add a newline for separation
|
||||
c_file_content += "\n"
|
||||
|
||||
# Iterate through all messages in the DBC file
|
||||
for message in db.messages:
|
||||
# Check if "VCU" is in the receivers list
|
||||
if "VCU" in message.receivers:
|
||||
# Define the temporary struct
|
||||
channel = channel_info.get(str(message.frame_id), "CH0") # 기본 채널 설정
|
||||
temp_struct_name = f"{channel}_MV1_0x{message.frame_id:X}_temp"
|
||||
c_file_content += f"""
|
||||
void Receive_{message.name}_{channel}_0x{message.frame_id:X}(void)
|
||||
{{
|
||||
struct {{
|
||||
"""
|
||||
|
||||
# Add temporary variables to the struct
|
||||
for signal in message.signals:
|
||||
if signal.is_signed:
|
||||
signal_type = "signed int"
|
||||
else:
|
||||
signal_type = "unsigned int"
|
||||
c_file_content += f" {signal_type} {signal.name}_temp : {signal.length};\n"
|
||||
|
||||
c_file_content += f" }} {temp_struct_name};\n\n"
|
||||
|
||||
# Add temp assignments
|
||||
for signal in message.signals:
|
||||
start_byte = signal.start // 8
|
||||
start_bit = signal.start % 8
|
||||
signal_length = signal.length
|
||||
shift_expr = f"(CAN_ch[{channel}].rx.buf[{start_byte}] >> shift{start_bit})"
|
||||
if signal_length > 8:
|
||||
# Handle multi-byte signals
|
||||
multi_byte_expr = []
|
||||
for i in range((signal_length + 7) // 8):
|
||||
byte_shift = i * 8
|
||||
if i == 0:
|
||||
multi_byte_expr.append(f"(CAN_ch[{channel}].rx.buf[{start_byte + i}] >> shift{start_bit})")
|
||||
else:
|
||||
multi_byte_expr.append(f"(CAN_ch[{channel}].rx.buf[{start_byte + i}] << shift{byte_shift})")
|
||||
shift_expr = " | ".join(multi_byte_expr)
|
||||
c_file_content += f" {temp_struct_name}.{signal.name}_temp = ({shift_expr}) & _{signal_length}bit;\n"
|
||||
|
||||
c_file_content += "\n"
|
||||
|
||||
# Assign to final ECU variables
|
||||
for signal in message.signals:
|
||||
factor_name = f"Factor_{str(signal.scale).replace('.', '_')}"
|
||||
offset_name = f"Offset_m_{abs(signal.offset)::.0f}"
|
||||
factor = f" * {factor_name}" if signal.scale != 1 else ""
|
||||
offset = f" + {offset_name}" if signal.offset != 0 else ""
|
||||
temp_var = f"{temp_struct_name}.{signal.name}_temp"
|
||||
if signal.is_signed:
|
||||
c_file_content += f" VCU.RX.{channel}_{message.name}_0x{message.frame_id:X}.{signal.name} = ({temp_var}{factor}){offset};\n"
|
||||
else:
|
||||
c_file_content += f" VCU.RX.{channel}_{message.name}_0x{message.frame_id:X}.{signal.name} = {temp_var}{factor}{offset};\n"
|
||||
|
||||
c_file_content += "}\n"
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated RX function C file with Factors and Offsets: {output_file}")
|
||||
|
||||
|
||||
def generate_input_functions(signals, output_file, channel_info):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate Input functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
channel = channel_info.get(str(message['id']), "CH0")
|
||||
function_name = f"void Input_Data_Set_{message['name']}_{channel}_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
f.write(f" GV_{signal['name']} = VCU.RX.{channel}_RX_{message['name']}_{hex_id}.{signal['name']};\n")
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] Input functions written to {output_file}")
|
||||
|
||||
|
||||
def generate_structs(signals, output_file, channel_info):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate structs for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_STRUCTS_H\n")
|
||||
f.write("#define GENERATED_STRUCTS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
channel = channel_info.get(str(message['id']), "CH0")
|
||||
f.write(f"typedef struct\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
||||
else:
|
||||
f.write(f" float {signal['name']};\n")
|
||||
f.write(f"}} {channel}_RX_{message['name']}_{hex_id};\n\n")
|
||||
f.write("#endif // GENERATED_STRUCTS_H\n")
|
||||
print(f"[INFO] Structs written to {output_file}")
|
||||
|
||||
|
||||
def generate_globals(signals, output_file, header_file, channel_info):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate globals for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#include <generated_globals.h>\n")
|
||||
|
||||
for message in signals:
|
||||
channel = channel_info.get(str(message['id']), "CH0")
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message in signals:
|
||||
channel = channel_info.get(str(message['id']), "CH0")
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"extern float GV_{signal['name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['name']};\n")
|
||||
f.write("\n#endif // GENERATED_GLOBALS_H\n")
|
||||
print(f"[INFO] Globals and extern declarations written to {output_file} and {header_file}")
|
||||
|
||||
|
||||
def generate_initialization(signals, output_file, channel_info):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate initialization for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void VCU_Data_Init(void)\n{\n")
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
channel = channel_info.get(str(message['id']), "CH0")
|
||||
struct_prefix = f"VCU.RX.{channel}_RX_{message['name']}_{hex_id}"
|
||||
for signal in message["signals"]:
|
||||
if signal["offset"] != 0.0:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = {signal['offset']};\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = {signal['offset']}f;\n")
|
||||
else:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = 0;\n")
|
||||
else:
|
||||
f.write(f" {struct_prefix}.{signal['name']} = 0.0f;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] Initialization function written to {output_file}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
channel_info_path = sys.argv[3]
|
||||
|
||||
with open(channel_info_path, 'r', encoding='cp1252') as f:
|
||||
try:
|
||||
channel_info = json.load(f)
|
||||
# Convert channel info to int
|
||||
for key in channel_info:
|
||||
channel_info[key] = int(channel_info[key].replace("CH", ""))
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"[ERROR] Error decoding JSON: {e}")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error reading channel info file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
output_c_file = f"{output_dir}/generated_receive.c"
|
||||
output_input_file = f"{output_dir}/generated_input.c"
|
||||
output_structs_file = f"{output_dir}/generated_structs.h"
|
||||
output_globals_file = f"{output_dir}/generated_globals.c"
|
||||
output_globals_header = f"{output_dir}/generated_globals.h"
|
||||
output_initialization_file = f"{output_dir}/generated_init.c"
|
||||
|
||||
signals = parse_dbc_file(dbc_file_path)
|
||||
generate_vcu_rx_function_with_factors(dbc_file_path, output_c_file, channel_info)
|
||||
generate_input_functions(signals, output_input_file, channel_info)
|
||||
generate_structs(signals, output_structs_file, channel_info)
|
||||
generate_globals(signals, output_globals_file, output_globals_header, channel_info)
|
||||
generate_initialization(signals, output_initialization_file, channel_info)
|
||||
323
DBC_to_C_TX.py
323
DBC_to_C_TX.py
@ -1,323 +0,0 @@
|
||||
import re
|
||||
import sys
|
||||
import cantools
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
def parse_dbc_file(file_path):
|
||||
signals = []
|
||||
current_message = None
|
||||
try:
|
||||
with open(file_path, 'r', encoding='cp1252') as f:
|
||||
lines = f.readlines()
|
||||
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
||||
|
||||
start_parsing = False
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if not start_parsing:
|
||||
if line.startswith("BO_"):
|
||||
start_parsing = True
|
||||
else:
|
||||
continue
|
||||
|
||||
if line.startswith("BO_"):
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
||||
continue
|
||||
|
||||
current_message = {
|
||||
"id": parts[1],
|
||||
"name": parts[2].replace(":", ""),
|
||||
"dlc": parts[3],
|
||||
"transmitter": parts[4],
|
||||
"signals": []
|
||||
}
|
||||
print(f"[INFO] Found message: {current_message['name']}")
|
||||
|
||||
elif line.startswith("SG_") and current_message:
|
||||
try:
|
||||
parts = line.split(":")
|
||||
if len(parts) < 2:
|
||||
print(f"[WARNING] Skipping malformed SG_ line: {line}")
|
||||
continue
|
||||
|
||||
signal_name = parts[0].split()[1].strip()
|
||||
bit_info = parts[1].split()[0]
|
||||
receiver = parts[1].split()[-1]
|
||||
|
||||
if current_message["transmitter"] != "VCU":
|
||||
continue
|
||||
|
||||
byte_offset, rest = bit_info.split("|")
|
||||
bit_offset = int(rest.split("@")[0])
|
||||
size = int(re.search(r'\d+', rest.split("@")[0]).group())
|
||||
factor_offset_match = re.search(r'\(([^)]+)\)', line)
|
||||
if not factor_offset_match:
|
||||
print(f"[WARNING] Skipping signal with missing factor/offset: {line}")
|
||||
continue
|
||||
|
||||
factor_offset = factor_offset_match.group(1).split(",")
|
||||
factor = float(factor_offset[0])
|
||||
offset = float(factor_offset[1])
|
||||
|
||||
current_message["signals"].append({
|
||||
"name": signal_name,
|
||||
"byte_offset": int(byte_offset) // 8,
|
||||
"bit_offset": bit_offset,
|
||||
"size": size,
|
||||
"factor": factor,
|
||||
"offset": offset,
|
||||
"signed": '-' in rest.split("@")[1]
|
||||
})
|
||||
print(f"[INFO] Added signal: {signal_name}")
|
||||
except (ValueError, IndexError, AttributeError) as e:
|
||||
print(f"[ERROR] Error parsing signal: {line}, {e}")
|
||||
continue
|
||||
|
||||
if current_message and current_message["signals"]:
|
||||
signals.append(current_message)
|
||||
print(f"[INFO] Added last message: {current_message['name']} with {len(current_message['signals'])} signals.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error while parsing DBC file: {e}")
|
||||
return []
|
||||
|
||||
print(f"[INFO] Parsed {len(signals)} messages.")
|
||||
return signals
|
||||
|
||||
|
||||
def generate_tx_structs(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX structs for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_TX_STRUCTS_H\n")
|
||||
f.write("#define GENERATED_TX_STRUCTS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
f.write(f"typedef struct\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] <= 32:
|
||||
f.write(f" uint32_t {signal['name']} : {signal['size']};\n")
|
||||
else:
|
||||
f.write(f" float {signal['name']};\n")
|
||||
f.write(f"}} CH0_{message['name']}_{hex_id};\n\n")
|
||||
f.write("#endif // GENERATED_TX_STRUCTS_H\n")
|
||||
print(f"[INFO] TX structs written to {output_file}")
|
||||
|
||||
|
||||
def generate_tx_globals(signals, output_file, header_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX globals for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message in signals:
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"float GV_{signal['name']} = 0.0f;\n")
|
||||
else:
|
||||
f.write(f"uint32_t GV_{signal['name']} = 0;\n")
|
||||
|
||||
with open(header_file, 'w') as f:
|
||||
f.write("#ifndef GENERATED_TX_GLOBALS_H\n")
|
||||
f.write("#define GENERATED_TX_GLOBALS_H\n\n")
|
||||
f.write("#include <stdint.h>\n\n")
|
||||
for message in signals:
|
||||
for signal in message["signals"]:
|
||||
if signal["size"] > 32:
|
||||
f.write(f"extern float GV_{signal['name']};\n")
|
||||
else:
|
||||
f.write(f"extern uint32_t GV_{signal['name']};\n")
|
||||
f.write("\n#endif // GENERATED_TX_GLOBALS_H\n")
|
||||
print(f"[INFO] TX globals written to {output_file} and {header_file}")
|
||||
|
||||
|
||||
def generate_tx_functions(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No signals to generate TX functions for.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
function_name = f"void Output_Data_Set_{message['name']}_CH0_{hex_id}(void)"
|
||||
f.write(f"{function_name}\n{{\n")
|
||||
for signal in message["signals"]:
|
||||
factor_str = f"/ {signal['factor']}" if signal["factor"] != 1.0 else ""
|
||||
offset_str = f"- {signal['offset']}" if signal["offset"] != 0.0 else ""
|
||||
mask = f"_{signal['size']}bit"
|
||||
f.write(
|
||||
f" VCU.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = "
|
||||
f"(int)((GV_{signal['name']} {offset_str}) {factor_str}) & {mask};\n"
|
||||
)
|
||||
f.write("}\n\n")
|
||||
print(f"[INFO] TX functions written to {output_file}")
|
||||
|
||||
def generate_tx_initialization(signals, output_file):
|
||||
if not signals:
|
||||
print("[WARNING] No TX signals found for initialization generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("void Initialize_TX_Signals(void)\n{\n")
|
||||
for message in signals:
|
||||
hex_id = f"0x{int(message['id']):X}"
|
||||
f.write(f" // {message['name']} ({hex_id})\n")
|
||||
for signal in message["signals"]:
|
||||
f.write(f" VCU.TX.CH0_{message['name']}_{hex_id}.{signal['name']} = 0;\n")
|
||||
f.write("\n")
|
||||
f.write("}\n")
|
||||
print(f"[INFO] TX initialization function written to {output_file}")
|
||||
|
||||
def parse_dbc_for_tx_messages(file_path):
|
||||
tx_messages = []
|
||||
try:
|
||||
with open(file_path, 'r', encoding='cp1252') as f:
|
||||
lines = f.readlines()
|
||||
print(f"[INFO] Read {len(lines)} lines from DBC file.")
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith("BO_"):
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
print(f"[WARNING] Skipping malformed BO_ line: {line}")
|
||||
continue
|
||||
|
||||
message_id = int(parts[1]) # ID for sorting
|
||||
message_name = parts[2].replace(":", "")
|
||||
transmitter = parts[4]
|
||||
|
||||
# Only include messages transmitted by VCU
|
||||
if transmitter == "VCU":
|
||||
tx_messages.append((message_id, message_name))
|
||||
print(f"[INFO] Found TX message: {message_name} (ID: {message_id})")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Error while parsing DBC file: {e}")
|
||||
|
||||
# Sort messages by message name or ID
|
||||
tx_messages.sort(key=lambda x: x[0]) # Sort by message ID
|
||||
return [message[1] for message in tx_messages]
|
||||
|
||||
|
||||
def generate_tx_enum(tx_messages, output_file, cycle):
|
||||
if not tx_messages:
|
||||
print("[WARNING] No TX messages found for enum generation.")
|
||||
return
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("typedef enum {\n")
|
||||
for idx, message in enumerate(tx_messages):
|
||||
enum_name = f"VCU_CH0_TX_{message}_{cycle}"
|
||||
f.write(f" {enum_name} = {idx},\n")
|
||||
f.write(" NUMBER_OF_VCU_CH0_TX_MESSAGE,\n")
|
||||
f.write("} VCU_CH0_TX;\n")
|
||||
print(f"[INFO] TX enum written to {output_file}")
|
||||
|
||||
def generate_vcu_can_transmit_single_c_file(dbc_path, output_file):
|
||||
# Load the DBC file
|
||||
db = cantools.database.load_file(dbc_path)
|
||||
|
||||
# Initialize the header of the C file
|
||||
c_file_content = """
|
||||
#include "can.h"
|
||||
"""
|
||||
|
||||
# Iterate through all messages in the DBC file
|
||||
for message in db.messages:
|
||||
# Check if the message's sending node is "VCU"
|
||||
if "VCU" in message.senders:
|
||||
# Add the function definition for the message
|
||||
c_file_content += f"""
|
||||
void Transmit_{message.name}_CH0_0x{message.frame_id:X}(void)
|
||||
{{
|
||||
"""
|
||||
|
||||
# Iterate through signals and generate bit-packing logic
|
||||
buffer_assignments = [""] * message.length
|
||||
for signal in message.signals:
|
||||
start_byte = signal.start // 8
|
||||
start_bit = signal.start % 8
|
||||
signal_length = signal.length
|
||||
signal_name = f"VCU.TX.CH0_{message.name}_0x{message.frame_id:X}.{signal.name}"
|
||||
|
||||
# Handle 8-bit chunks
|
||||
while signal_length > 0:
|
||||
bits_in_byte = min(8 - start_bit, signal_length)
|
||||
shift_amount = (signal.length - signal_length)
|
||||
shift_expr = f"({signal_name} >> shift{shift_amount}) << shift{start_bit}" if start_bit > 0 else f"({signal_name} >> shift{shift_amount})"
|
||||
buffer_index = start_byte
|
||||
|
||||
# Ensure buffer_index is within the range of buffer_assignments
|
||||
if buffer_index >= len(buffer_assignments):
|
||||
print(f"[ERROR] Buffer index {buffer_index} out of range for message {message.name}")
|
||||
break
|
||||
|
||||
# Add to the buffer assignments
|
||||
if buffer_assignments[buffer_index]:
|
||||
buffer_assignments[buffer_index] += f"\n | {shift_expr}"
|
||||
else:
|
||||
buffer_assignments[buffer_index] = f"{shift_expr}"
|
||||
|
||||
# Update pointers
|
||||
signal_length -= bits_in_byte
|
||||
start_byte += 1
|
||||
start_bit = 0
|
||||
|
||||
# Write buffer assignments to the function
|
||||
for i, assignment in enumerate(buffer_assignments):
|
||||
if assignment:
|
||||
c_file_content += f" CAN_ch[0].tx.buf[{i}] = ({assignment}) & _8bit;\n"
|
||||
|
||||
# Add the send function call
|
||||
c_file_content += f"""
|
||||
can_send_config(CAN_INST_0, g_messageObjectConf_VCU_0ch_TX[VCU_CH0_TX_{message.name}_10ms]);
|
||||
}}
|
||||
"""
|
||||
|
||||
# Write the generated code to a single C file
|
||||
with open(output_file, "w") as c_file:
|
||||
c_file.write(c_file_content)
|
||||
print(f"Generated C file with all VCU messages: {output_file}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbc_file_path = sys.argv[1]
|
||||
output_dir = sys.argv[2]
|
||||
|
||||
output_structs_file = f"{output_dir}/generated_tx_structs.h"
|
||||
output_globals_file = f"{output_dir}/generated_tx_globals.c"
|
||||
output_globals_header = f"{output_dir}/generated_tx_globals.h"
|
||||
output_tx_functions_file = f"{output_dir}/generated_tx_functions.c"
|
||||
output_tx_initialization = f"{output_dir}/generated_tx_initialization.c"
|
||||
output_enum_file = f"{output_dir}/generated_tx_enum.h"
|
||||
cycle_time = "10ms"
|
||||
output_c_file = f"{output_dir}/Transmit_All_VCU_Messages.c" # Replace with your desired output file name
|
||||
|
||||
generate_vcu_can_transmit_single_c_file(dbc_file_path, output_c_file)
|
||||
|
||||
|
||||
signals = parse_dbc_file(dbc_file_path)
|
||||
generate_tx_structs(signals, output_structs_file)
|
||||
generate_tx_globals(signals, output_globals_file, output_globals_header)
|
||||
generate_tx_functions(signals, output_tx_functions_file)
|
||||
generate_tx_initialization(signals, output_tx_initialization)
|
||||
tx_messages = parse_dbc_for_tx_messages(dbc_file_path)
|
||||
generate_tx_enum(tx_messages, output_enum_file, cycle_time)
|
||||
BIN
__pycache__/DBC_Converter_Data_Parsing.cpython-313.pyc
Normal file
BIN
__pycache__/DBC_Converter_Data_Parsing.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/Data_Parsing.cpython-313.pyc
Normal file
BIN
__pycache__/Data_Parsing.cpython-313.pyc
Normal file
Binary file not shown.
@ -2,6 +2,6 @@
|
||||
"theme": "light",
|
||||
"default_save_path": "C:/Users/MSI/Desktop",
|
||||
"file_paths": [],
|
||||
"last_opened_dir": "C:/Users/MSI/SynologyDrive/3min_be/한자연/!사업/초안전/#Debug/DBC",
|
||||
"last_opened_dir": "C:/Users/MSI/SynologyDrive/3min_be/한자연/!과제/초안전/#Debug/DBC",
|
||||
"channel_info": {}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user