from Base.Arxml import Arxml from lxml import etree import logging, os, copy from pandas import DataFrame,ExcelWriter, merge, concat from typing import Type import threading import queue import time class Pd_Utils: @staticmethod def write_dataframe_to_xls(dataframe, xls_name, sheet_name='Sheet1', mode='w'): # print(f'write sheet{sheet_name}') if mode == 'a' and not(os.path.exists(xls_name)): mode = 'w' with ExcelWriter(xls_name, engine='openpyxl', mode=mode) as writer: # doctest: +SKIP # df = DataFrame(data, columns=columns_headers) # print(f'write excel {sheet_name}') dataframe.to_excel(writer, sheet_name=sheet_name) @staticmethod def write_list_to_csv(data_list, filename): ''' :param data_list: two dimensions list :param filename: csv file name :return: ''' with open(filename, 'w') as f: for line in data_list: for v in line: f.write(v) f.write(',') f.write('\n') class ContainerAttrItemBase: def __init__(self, container): # self._container_name = '' self._container = container self._attrs_dir_list = [] # self.set_container_name() self.get_container_attrs_dir_list(container) # def set_container_name(self): # raise NotImplementedError # # def get_container_name(self): # return self._container_name def get_container_attrs_dir_list(self, container): raise NotImplementedError def print_attr(self): # all_attrs ==> [{key1:value1},{key2:value2},...] length not less than 1 # each dir's values represent of one line in table, and dir's keys represent of table header all_attrs = [] all_attrs_name = [] for attr_dir in self._attrs_dir_list: # print(attr_dir) all_attrs.append(list(attr_dir.values())) if len(all_attrs_name) == 0: all_attrs_name = list(attr_dir.keys()) if len(all_attrs_name) == 0: logging.info('ERROR: Container{}: {} get empty attribute dir !'.format(self._container, Arxml.get_container_text(Arxml.get_first_match_element(self._container, './/SHORT-NAME')))) # print(all_attrs) return (all_attrs, all_attrs_name) class PduBase(ContainerAttrItemBase): def __init__(self, container): self._pdu_name = '' self._pdu_type = '' self._pdu_length = '' self.get_pdu_base_attr(container) super(PduBase, self).__init__(container) # print('PduBase init') def get_pdu_base_attr(self, container): self.get_pdu_name(container) self.get_pdu_type(container) self.get_pdu_length(container) def get_pdu_type(self, container): self._pdu_type = Arxml.get_element_tag(container) def get_pdu_name(self, container): self._pdu_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def get_pdu_length(self, container): self._pdu_length = Arxml.get_container_text(Arxml.get_first_match_element(container, './LENGTH')) def get_container_attrs_dir_list(self, container): self.get_pdu_attrs_dir_list(container) def get_pdu_attrs_dir_list(self, container): raise NotImplementedError def get_pdu_signal_mapping_attr(self, container): isigmal_mapping_attrs = [] isignal_to_ipdu_mappings = Arxml.get_all_match_element(container, './/I-SIGNAL-TO-I-PDU-MAPPING') if isignal_to_ipdu_mappings != None: for isignal_mapping in isignal_to_ipdu_mappings: isignal_ref = Arxml.get_split_end_value( Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-REF')) ) if isignal_ref == '': # IS not signal ref, but is signal group ref isignal_ref = Arxml.get_split_end_value( Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-GROUP-REF')) ) isignal_start_position = Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './START-POSITION')) isigmal_mapping_attrs.append([isignal_ref, isignal_start_position]) return isigmal_mapping_attrs class NMPdu(PduBase): def get_pdu_attrs_dir_list(self, container): isignal_mapping_attrs = self.get_pdu_signal_mapping_attr(container) for isignal_attr in isignal_mapping_attrs: attr_dir = dict( { 'PDU Name':self._pdu_name, 'PDU Type':self._pdu_type, 'PDU Length':self._pdu_length, 'ISignal Name':isignal_attr[0], 'ISignal Start Position':isignal_attr[1], } ) self._attrs_dir_list.append(attr_dir) # def get_nmpdu_signal_mapping_attr(self, container): # isigmal_mapping_attrs = [] # isignal_to_ipdu_mappings = Arxml.get_all_match_element(container, './/I-SIGNAL-TO-I-PDU-MAPPING') # # for isignal_mapping in isignal_to_ipdu_mappings: # isignal_ref = Arxml.get_split_end_value( # Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-REF')) # ) # isignal_start_position = Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './START-POSITION')) # # isigmal_mapping_attrs.append([isignal_ref, isignal_start_position]) # # return isigmal_mapping_attrs # class NPdu class NPdu(PduBase): def get_pdu_attrs_dir_list(self, container): attr_dir = dict( { 'PDU Name': self._pdu_name, 'PDU Type': self._pdu_type, 'PDU Length': self._pdu_length, } ) self._attrs_dir_list.append(attr_dir) class Dcm_IPdu(PduBase): def get_pdu_attrs_dir_list(self, container): attr_dir = dict( { 'PDU Name': self._pdu_name, 'PDU Type': self._pdu_type, 'PDU Length': self._pdu_length, } ) self._attrs_dir_list.append(attr_dir) class IPdu(PduBase): def __init__(self, container): super(IPdu, self).__init__(container) def get_pdu_attrs_dir_list(self, container): isignal_mapping_attrs = self.get_pdu_signal_mapping_attr(container) container_header_id_con = Arxml.get_first_match_element(container, './/HEADER-ID-SHORT-HEADER') container_header_id = hex(int(Arxml.get_container_text(container_header_id_con))) if container_header_id_con != None else 'NA' if len(isignal_mapping_attrs) == 0: attr_dir = dict( { 'PDU Name': self._pdu_name, 'PDU Type': self._pdu_type, 'PDU Length': self._pdu_length, 'Contained PDU ID': container_header_id, 'ISignal Name':'NA', 'ISignal Start Position':'NA', } ) self._attrs_dir_list.append(attr_dir) else: for isignal_attr in isignal_mapping_attrs: attr_dir = dict( { 'PDU Name': self._pdu_name, 'PDU Type': self._pdu_type, 'PDU Length': self._pdu_length, 'Contained PDU ID': container_header_id, 'ISignal Name': isignal_attr[0], 'ISignal Start Position': isignal_attr[1], } ) self._attrs_dir_list.append(attr_dir) class ContainerPdu(PduBase): def get_pdu_attrs_dir_list(self, container): contained_pdu_refs = self.get_contained_pdu_ref(container) for contained_pdu in contained_pdu_refs: attr_dir = dict( { 'PDU Name': self._pdu_name, 'PDU Type': self._pdu_type, 'PDU Length': self._pdu_length, 'Contained PDU': contained_pdu, } ) self._attrs_dir_list.append(attr_dir) def get_contained_pdu_ref(self, container): contained_pdu_refs = Arxml.get_all_match_element(container, './/CONTAINED-PDU-TRIGGERING-REF') contained_pdu_refs = [Arxml.get_split_end_value(Arxml.get_container_text(con)) for con in contained_pdu_refs] return contained_pdu_refs class Port(ContainerAttrItemBase): def __init__(self, container): super(Port, self).__init__(container) def get_container_attrs_dir_list(self, container): port_name = self.__get_port_name(container) port_type = self.__get_port_type(container) port_attrs = self.__get_all_signal_attrs(container) for attr in port_attrs: attr_dir = {} attr_dir['Port Name'] = port_name attr_dir['Port Direction'] = port_type attr_dir['Mapping Signal Type'] = attr[0] attr_dir['Signal Name'] = attr[1] if attr[1]!='' else port_name attr_dir['Signal Init Value'] = attr[2] self._attrs_dir_list.append(attr_dir) # print(self._attrs_dir_list) def __get_port_type(self, port_container:etree.Element): port_type = Arxml.get_element_tag(port_container) return port_type def __get_port_name(self, port_container): port_name = Arxml.get_container_text(Arxml.get_first_match_element(port_container, './SHORT-NAME')) return port_name def __get_init_value(self, container): init_value = Arxml.get_container_text(Arxml.get_first_match_element(container, './VALUE')) signalgroup_signal_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-LABEL')) return (init_value, signalgroup_signal_name) def __get_all_signal_attrs(self, port_container): numberical_value_containers = Arxml.get_all_match_element(port_container, './/NUMERICAL-VALUE-SPECIFICATION') number_size = len(numberical_value_containers) signal_attr = [] if 1 == number_size: port_signal_type = 'SIGNAL' init_value, _ = self.__get_init_value(numberical_value_containers[0]) signal_attr.append([port_signal_type, '', init_value]) elif number_size>1:# one container has more than 1 attr dict port_signal_type = 'SIGNAL_GROUP' for number in numberical_value_containers: init_value, signal_name = self.__get_init_value(number) if signal_name != '': # valid number container whith signal attribure if signal_name.endswith('_re'): signal_name = signal_name[:-3] signal_attr.append([port_signal_type, signal_name, init_value]) else: logging.error('Parse Signal Container ERROR!') return signal_attr class CompuMethod(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): cm_name = self.__get_cm_name(container) cm_attrs = self.__get_cm_attrs(container) for cm_attr in cm_attrs:# LINEAR compu method type, self._attrs_dir_list will return [] attr_dir = dict( { 'Compu Method Name':cm_name, # 'Type':cm_attr[0], 'Signal Codding Name':cm_attr[1], 'Signal Codding Value':cm_attr[2] } ) self._attrs_dir_list.append(attr_dir) def __get_cm_name(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def __get_cm_type(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './CATEGORY')) def __get_cm_attrs(self, container): cm_type = self.__get_cm_type(container) cm_attrs = [] # if cm_type == 'TEXTTABLE' or cm_type == 'SCALE_LINEAR_AND_TEXTTABLE': if cm_type == 'TEXTTABLE': compu_scales = Arxml.get_all_match_element(container, './/COMPU-SCALE') for compu_scale in compu_scales: cm_micro_codding_name = Arxml.get_container_text(Arxml.get_first_match_element(compu_scale, './/COMPU-CONST/VT')) cm_micro_codding_value = Arxml.get_container_text(Arxml.get_first_match_element(compu_scale, './LOWER-LIMIT')) cm_attrs.append([cm_type, cm_micro_codding_name, cm_micro_codding_value]) return cm_attrs class SystemSignal(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): ss_name = self.__get_ss_name(container) attr_dir = dict({'Signal Name':ss_name}) self._attrs_dir_list.append(attr_dir) def __get_ss_name(self, container): ss_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) return ss_name class SystemSignalGroup(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): ssg_name = self.__get_ssg_name(container) ssg_attrs = self.__get_ssg_attrs(container) for ssg_attr in ssg_attrs: attr_dir = dict({ 'Signal Name':ssg_attr[0], 'System Siganl Group':ssg_name }) self._attrs_dir_list.append(attr_dir) def __get_ssg_name(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def __get_ssg_attrs(self, container): ssg_attrs = [] ss_refs = Arxml.get_all_match_element(container, './/SYSTEM-SIGNAL-REF') for ss in ss_refs: ssg_attrs.append([Arxml.get_split_end_value(Arxml.get_container_text(ss))]) return ssg_attrs class GatewayMapping(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): raise NotImplementedError def get_source_attr(self, container, source_tag_pattern): source_signal = '' source_controller = '' source_signal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, source_tag_pattern)) ref_split_value = source_signal_ref.split('/') if len(ref_split_value) >= 4: source_signal = ref_split_value[-1] source_controller = ref_split_value[2] return (source_signal, source_controller) def get_target_attr(self, container, target_tag_pattern): target_signal = '' target_controller = '' target_signal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, target_tag_pattern)) ref_split_value = target_signal_ref.split('/') if len(ref_split_value) >= 4: target_signal = ref_split_value[-1] target_controller = ref_split_value[2] return (target_signal, target_controller) class SignalMapping(GatewayMapping): def get_container_attrs_dir_list(self, container): source_signal, source_controller = self.get_source_attr(container, './SOURCE-SIGNAL-REF') target_signal, target_controller = self.get_target_attr(container, './TARGET-SIGNAL-REF') att_dir = dict( { 'Source Signal Trigger':source_signal, 'Source Controller':source_controller, 'Target Signal Trigger':target_signal, 'Target Controller':target_controller } ) self._attrs_dir_list.append(att_dir) class PduMapping(GatewayMapping): def get_container_attrs_dir_list(self, container): source_pdu, source_controller = self.get_source_attr(container, './SOURCE-I-PDU-REF') target_pdu, target_controller = self.get_target_attr(container, './/TARGET-I-PDU-REF') att_dir = dict( { 'Source Pdu Trigger':source_pdu, 'Source Controller':source_controller, 'Target Pdu Trigger':target_pdu, 'Target Controller':target_controller } ) self._attrs_dir_list.append(att_dir) class ISignal(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): isignal_name = self.__get_isignal_name(container) isignal_system_signal = self.__get_isignal_system_siganl_ref(container) isignal_bit_length = self.__get_isignal_bit_length(container) isignal_type = self.__get_isignal_type(container) isignal_init_value = self.__get_isignal_init_value(container) attr_dir = dict( { 'ISignal Name': isignal_name, 'Signal Name': isignal_system_signal, 'ISignal Bit Length': isignal_bit_length, 'ISignal Type': isignal_type, 'ISignal Init Value': isignal_init_value, } ) self._attrs_dir_list.append(attr_dir) def __get_isignal_name(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def __get_isignal_system_siganl_ref(self, container): system_signal = Arxml.get_container_text(Arxml.get_first_match_element(container, './SYSTEM-SIGNAL-REF')) system_signal = Arxml.get_split_end_value(system_signal) return system_signal def __get_isignal_bit_length(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './LENGTH')) def __get_isignal_type(self, container): base_type_container = Arxml.get_first_match_element(container, './/BASE-TYPE-REF') return Arxml.get_split_end_value(Arxml.get_container_text(base_type_container)) def __get_isignal_init_value(self, container): init_value = Arxml.get_first_match_element(container, './INIT-VALUE//VALUE') return Arxml.get_container_text(init_value) class ISignalTriggering(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): isignal_triggering_name = self.__get_isignal_triggering_name(container) controller , isignal_port_direction = self.__get_isignal_port_controller_and_direction(container) isignal_ref = self.__get_isignal_ref(container) attr_dir = dict( { 'Signal Trigger': isignal_triggering_name, 'Controller':controller, 'Port Direction':isignal_port_direction, 'ISignal Name':isignal_ref } ) if attr_dir['ISignal Name'] != '': # exclude PDU ISignal trigger with isignal name is empty, for example ISTrigCCP_020ms_PDU02_2_0 self._attrs_dir_list.append(attr_dir) def __get_isignal_port_controller_and_direction(self, container): port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-SIGNAL-PORT-REF')) port_dir_ref_split = port_dir_ref.split('/') return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction) def __get_isignal_triggering_name(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def __get_isignal_ref(self, container): isignal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-SIGNAL-REF')) return Arxml.get_split_end_value(isignal_ref) class FrameTriggering(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): frame_triggering_name = self.__get_frame_triggering_name(container) controller, frame_port_direction = self.__get_frame_port_controller_and_direction(container) frame_ref = self.__get_frame_ref(container) pdu_ref = self.__get_pdu_ref(container) frame_id = self.__get_frame_id(container) attr_dir = dict( { 'Frame Trigger': frame_triggering_name, 'Controller': controller, 'Port Direction': frame_port_direction, 'Frame Name': frame_ref, 'Frame ID': frame_id, 'Pdu Trigger': pdu_ref } ) self._attrs_dir_list.append(attr_dir) def __get_frame_port_controller_and_direction(self, container): port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/FRAME-PORT-REF')) port_dir_ref_split = port_dir_ref.split('/') return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction) def __get_frame_triggering_name(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def __get_frame_ref(self, container): frame_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/FRAME-REF')) return Arxml.get_split_end_value(frame_ref) def __get_pdu_ref(self, container): pdu_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/PDU-TRIGGERING-REF')) # print(f"find pdu size is {len(Arxml.get_all_match_element(container, './/PDU-TRIGGERING-REF'))}") # all pdu size in a frame is 1 return Arxml.get_split_end_value(pdu_ref) def __get_frame_id(self, container): frame_id = Arxml.get_container_text(Arxml.get_first_match_element(container, './/IDENTIFIER')) frame_id = hex(int(frame_id)) return frame_id class PduTriggering(ContainerAttrItemBase): def get_container_attrs_dir_list(self, container): pdu_triggering_name = self.__get_pdu_triggering_name(container) controller, pdu_port_direction = self.__get_pdu_port_controller_and_direction(container) pdu_ref = self.__get_ipdu_ref(container) isignal_triggerings_ref = self.__get_isignal_triggerings_ref(container) for isg in isignal_triggerings_ref: attr_dir = dict( { 'Pdu Trigger': pdu_triggering_name, 'Controller': controller, 'Port Direction': pdu_port_direction, 'Pdu Name': pdu_ref, 'Signal Trigger': isg } ) self._attrs_dir_list.append(attr_dir) def __get_pdu_port_controller_and_direction(self, container): port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-PDU-PORT-REF')) port_dir_ref_split = port_dir_ref.split('/') return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction) def __get_pdu_triggering_name(self, container): return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME')) def __get_ipdu_ref(self, container): ipdu_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-PDU-REF')) return Arxml.get_split_end_value(ipdu_ref) def __get_isignal_triggerings_ref(self, container): isg_ref_ret = [] isignal_triggerings = Arxml.get_all_match_element(container, './/I-SIGNAL-TRIGGERING-REF') if isignal_triggerings is None: isg_ref_ret.append('NA') else: for isg in isignal_triggerings: isg_ref_ret.append(Arxml.get_split_end_value(Arxml.get_container_text(isg))) return isg_ref_ret class ContainerFactory: def __init__(self, container_item_base: Type[ContainerAttrItemBase]) -> None: self.container_item_factory = container_item_base def get_containers_mapping_dataframe(self, containers:list) -> DataFrame: df_data = [] all_attrs_name = None if len(containers) == 1: containers = containers[0] # special process for port container get for container in containers: container_item = self.container_item_factory(container) all_attrs, all_attrs_name_tmp = container_item.print_attr() df_data.extend(all_attrs) if all_attrs_name is None and len(all_attrs_name_tmp) != 0: #TO DO why? all_attrs_name = copy.deepcopy(all_attrs_name_tmp) # print(df_data) # print(len(df_data)) # print(len(df_data[0])) # print(all_attrs_name) df = DataFrame(df_data, columns=all_attrs_name) return df class ISignalGroup: pass class MatrixParser: def __init__(self, arxml_file): self.__arxml = Arxml(arxml_file_name=arxml_file) self.__output_port_xls = f"./output/OutputFile_Attributes_{os.path.basename(arxml_file).split('.')[0]}.xlsx" self.container_parsers = dict() self.all_dataframe = dict() self.do_container_register() self.init_thread_func_list() self.start_write_thread() self.start_gen_threads() self.join_gen_threads() self.do_post_transform_dataframe() # ana_df1 = self.analyse_dataFrame(self.all_dataframe['Port'], self.all_dataframe['SystemSignal'], self.all_dataframe['SystemSignalGroup']) ana_df2 = self.analyse_dataFrame2(self.all_dataframe['ISignal'], self.all_dataframe['ISignalTriggering']) ana_df3 = self.analyse_dataFrame3(self.all_dataframe['NMPdu'], self.all_dataframe['NPdu'], self.all_dataframe['IPdu'], self.all_dataframe['ContainerPdu'], self.all_dataframe['Dcm_IPdu']) ana_df4 = self.analyse_dataFrame4(ana_df2, ana_df3) # ana_df1.to_csv(f"./output/OutputFile_SignalPort_{os.path.basename(arxml_file).split('.')[0]}.txt", sep='\t', index=False,header=True) self.all_dataframe['CompuMethod'].to_csv(f"./output/OutputFile_CompuMethod_{os.path.basename(arxml_file).split('.')[0]}.txt", sep='\t', index=False,header=True) # self.write_queue.put(('Analyse1', ana_df1)) self.write_queue.put(('Analyse4', ana_df4)) self.join_write_thread() def init_thread_func_list(self): self.write_queue = queue.Queue() def write_thread_func(self, sheet_queue): # while True: # # try: # sheet_name, sheet_data = sheet_queue.get() # print(f'sheet_name is {sheet_name}') # print(f'df type is {type(sheet_data)}') # # except ValueError: # exit(0) # # Pd_Utils.write_dataframe_to_xls(sheet_data, self.__output_port_xls, sheet_name, mode='a') writer = ExcelWriter(self.__output_port_xls, engine='xlsxwriter') while True: try: sheet_name, sheet_data = sheet_queue.get() except ValueError: writer.save() exit(0) sheet_data.to_excel(writer, sheet_name=sheet_name) def gen_data_thread_func(self, container_type, sheet_queue): sheet_name = container_type.__name__ df = self.get_containers_attributes_dataframe(container_type) self.all_dataframe[sheet_name] = df logging.info(f'Analyse Matrix to get {sheet_name}') sheet_queue.put((sheet_name, df)) def start_gen_threads(self): self.threads = [] for container_type, containers_acquire_pattern in self.container_parsers.items(): thread = threading.Thread(target=self.gen_data_thread_func, name=f'thread_{container_type.__name__}', args=(container_type, self.write_queue)) self.threads.append(thread) for th in self.threads: th.start() time.sleep(0.1) def start_write_thread(self): self.write_thread = threading.Thread(target=self.write_thread_func, args=(self.write_queue, )) self.write_thread.start() time.sleep(0.5) def join_gen_threads(self): for th in self.threads: th.join() def join_write_thread(self): self.write_queue.put('STOP') self.write_thread.join() def analyse_dataFrame(self, port_attr_df, system_signal_attr_df, system_signal_group_attr_df): port_attr_df['Used By SWC'] = True df_merge1 = merge(system_signal_attr_df, port_attr_df.loc[:,['Signal Name', 'Used By SWC']], how='left') df_merge2 = merge(df_merge1, system_signal_group_attr_df, how='left') df_merge2.loc[df_merge2['Used By SWC'] != True,['Used By SWC']] = False df_merge2.fillna('NA', inplace=True) df_merge2.sort_values('Signal Name', inplace=True) # Pd_Utils.write_dataframe_to_xls(df_merge2, self.__output_port_xls, 'Analyse1', mode='a') # df_merge2.to_csv('Analyse1_SignalPort.txt', sep='\t', index=False,header=False) return df_merge2 def analyse_dataFrame2(self, isignal_attr_df, isignal_triggering_df): ''' merge isignal_attr_df to isignal_triggering_df, add Signal Name column :param isignal_attr_df: :param isignal_triggering_df: :return: ''' df_merge = merge(isignal_triggering_df, isignal_attr_df.loc[:,['ISignal Name', 'Signal Name']], on='ISignal Name', how='left') # Pd_Utils.write_dataframe_to_xls(df_merge, self.__output_port_xls, 'Analyse2', mode='a') return df_merge def analyse_dataFrame3(self, nmpdu_df, npdu_df, ipdu_df, containerPdu_df, dcm_iPdu_df): # 竖向拼接 df_merge1 = concat([nmpdu_df, npdu_df, ipdu_df, containerPdu_df, dcm_iPdu_df], sort=False) df_merge1 = df_merge1.fillna('NA') # Pd_Utils.write_dataframe_to_xls(df_merge1, self.__output_port_xls, 'Analyse3', mode='a') return df_merge1 def analyse_dataFrame4(self, ana_df2, ana_df3): ''' only pdu has isignal will keep in Analys dataframe4 :param ana_df2: :param ana_df3: :return: ''' # print(ana_df3.columns) # print((ana_df3['ISignal Name'] != 'NA')) ana_df3 = ana_df3[(ana_df3['ISignal Name'] != 'NA')] # df_merge = merge(ana_df3, ana_df2.loc[:,['ISignal Name', 'Signal Name', 'Controller', 'Port Direction']], on='ISignal Name', how='left') # df_merge = merge(ana_df2, ana_df3, on='ISignal Name', how='outer') # Pd_Utils.write_dataframe_to_xls(df_merge, self.__output_port_xls, 'Analyse4', mode='a') return df_merge def do_post_transform_dataframe(self): cm_df = self.all_dataframe['CompuMethod'] cm_df.sort_values(['Signal Codding Name', 'Compu Method Name'], inplace=True) def register_container_parser(self, container_type: Type[ContainerAttrItemBase], containers_acquire_pattern: str): self.container_parsers[container_type] = containers_acquire_pattern def get_containers_attributes_dataframe(self, container_type: Type[ContainerAttrItemBase]): containers = Arxml.get_all_match_element(self.__arxml.root, self.container_parsers[container_type]) container_factory = ContainerFactory(container_type) attribute_dataframe = container_factory.get_containers_mapping_dataframe(containers) return attribute_dataframe def do_container_register(self): container_parser = [ # (Port, './/COMPOSITION-SW-COMPONENT-TYPE/PORTS'), # (Port, './/APPLICATION-SW-COMPONENT-TYPE/PORTS'), (SystemSignal, './/SYSTEM-SIGNAL'), (SystemSignalGroup, './/SYSTEM-SIGNAL-GROUP'), (SignalMapping, './/SIGNAL-MAPPINGS/I-SIGNAL-MAPPING'), (PduMapping, './/I-PDU-MAPPINGS/I-PDU-MAPPING'), (ISignal, './/I-SIGNAL'), (ISignalTriggering, './/I-SIGNAL-TRIGGERING'), (FrameTriggering, './/CAN-FRAME-TRIGGERING'), (PduTriggering, './/PDU-TRIGGERING'), (CompuMethod, './/COMPU-METHOD'), (NMPdu, './/NM-PDU'), (NPdu, './/N-PDU'), (IPdu, './/I-SIGNAL-I-PDU'), (ContainerPdu, './/CONTAINER-I-PDU'), (Dcm_IPdu, './/DCM-I-PDU') ] for parser_container_type, parser_containers_acquire_pattern in container_parser: self.register_container_parser(parser_container_type, parser_containers_acquire_pattern) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) mp = MatrixParser(r'D:\01_Work\02_WP\EP39_EREV\branch\src\InputFiles\MA_20220921-qy1_EP39 EREV_SIMU+_GW-arxml-V01.arxml')