|
- from Base.Arxml import Arxml
- from lxml import etree
- import logging, os, copy
- from pandas import DataFrame,ExcelWriter, merge, concat
- from typing import Type
- import threading
- import queue
- import time
- class Pd_Utils:
- @staticmethod
- def write_dataframe_to_xls(dataframe, xls_name, sheet_name='Sheet1', mode='w'):
- # print(f'write sheet{sheet_name}')
- if mode == 'a' and not(os.path.exists(xls_name)):
- mode = 'w'
- with ExcelWriter(xls_name, engine='openpyxl', mode=mode) as writer: # doctest: +SKIP
- # df = DataFrame(data, columns=columns_headers)
- # print(f'write excel {sheet_name}')
- dataframe.to_excel(writer, sheet_name=sheet_name)
- @staticmethod
- def write_list_to_csv(data_list, filename):
- '''
- :param data_list: two dimensions list
- :param filename: csv file name
- :return:
- '''
- with open(filename, 'w') as f:
- for line in data_list:
- for v in line:
- f.write(v)
- f.write(',')
- f.write('\n')
- class ContainerAttrItemBase:
- def __init__(self, container):
- # self._container_name = ''
- self._container = container
- self._attrs_dir_list = []
- # self.set_container_name()
- self.get_container_attrs_dir_list(container)
- # def set_container_name(self):
- # raise NotImplementedError
- #
- # def get_container_name(self):
- # return self._container_name
- def get_container_attrs_dir_list(self, container):
- raise NotImplementedError
- def print_attr(self):
- # all_attrs ==> [{key1:value1},{key2:value2},...] length not less than 1
- # each dir's values represent of one line in table, and dir's keys represent of table header
- all_attrs = []
- all_attrs_name = []
- for attr_dir in self._attrs_dir_list:
- # print(attr_dir)
- all_attrs.append(list(attr_dir.values()))
- if len(all_attrs_name) == 0:
- all_attrs_name = list(attr_dir.keys())
- if len(all_attrs_name) == 0:
- logging.info('ERROR: Container{}: {} get empty attribute dir !'.format(self._container, Arxml.get_container_text(Arxml.get_first_match_element(self._container, './/SHORT-NAME'))))
- # print(all_attrs)
- return (all_attrs, all_attrs_name)
- class PduBase(ContainerAttrItemBase):
- def __init__(self, container):
- self._pdu_name = ''
- self._pdu_type = ''
- self._pdu_length = ''
- self.get_pdu_base_attr(container)
- super(PduBase, self).__init__(container)
- # print('PduBase init')
- def get_pdu_base_attr(self, container):
- self.get_pdu_name(container)
- self.get_pdu_type(container)
- self.get_pdu_length(container)
- def get_pdu_type(self, container):
- self._pdu_type = Arxml.get_element_tag(container)
- def get_pdu_name(self, container):
- self._pdu_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def get_pdu_length(self, container):
- self._pdu_length = Arxml.get_container_text(Arxml.get_first_match_element(container, './LENGTH'))
- def get_container_attrs_dir_list(self, container):
- self.get_pdu_attrs_dir_list(container)
- def get_pdu_attrs_dir_list(self, container):
- raise NotImplementedError
- def get_pdu_signal_mapping_attr(self, container):
- isigmal_mapping_attrs = []
- isignal_to_ipdu_mappings = Arxml.get_all_match_element(container, './/I-SIGNAL-TO-I-PDU-MAPPING')
- if isignal_to_ipdu_mappings != None:
- for isignal_mapping in isignal_to_ipdu_mappings:
- isignal_ref = Arxml.get_split_end_value(
- Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-REF'))
- )
- if isignal_ref == '': # IS not signal ref, but is signal group ref
- isignal_ref = Arxml.get_split_end_value(
- Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-GROUP-REF'))
- )
- isignal_start_position = Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './START-POSITION'))
- isigmal_mapping_attrs.append([isignal_ref, isignal_start_position])
- return isigmal_mapping_attrs
- class NMPdu(PduBase):
- def get_pdu_attrs_dir_list(self, container):
- isignal_mapping_attrs = self.get_pdu_signal_mapping_attr(container)
- for isignal_attr in isignal_mapping_attrs:
- attr_dir = dict(
- {
- 'PDU Name':self._pdu_name,
- 'PDU Type':self._pdu_type,
- 'PDU Length':self._pdu_length,
- 'ISignal Name':isignal_attr[0],
- 'ISignal Start Position':isignal_attr[1],
- }
- )
- self._attrs_dir_list.append(attr_dir)
- # def get_nmpdu_signal_mapping_attr(self, container):
- # isigmal_mapping_attrs = []
- # isignal_to_ipdu_mappings = Arxml.get_all_match_element(container, './/I-SIGNAL-TO-I-PDU-MAPPING')
- #
- # for isignal_mapping in isignal_to_ipdu_mappings:
- # isignal_ref = Arxml.get_split_end_value(
- # Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-REF'))
- # )
- # isignal_start_position = Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './START-POSITION'))
- #
- # isigmal_mapping_attrs.append([isignal_ref, isignal_start_position])
- #
- # return isigmal_mapping_attrs
- # class NPdu
- class NPdu(PduBase):
- def get_pdu_attrs_dir_list(self, container):
- attr_dir = dict(
- {
- 'PDU Name': self._pdu_name,
- 'PDU Type': self._pdu_type,
- 'PDU Length': self._pdu_length,
- }
- )
- self._attrs_dir_list.append(attr_dir)
- class Dcm_IPdu(PduBase):
- def get_pdu_attrs_dir_list(self, container):
- attr_dir = dict(
- {
- 'PDU Name': self._pdu_name,
- 'PDU Type': self._pdu_type,
- 'PDU Length': self._pdu_length,
- }
- )
- self._attrs_dir_list.append(attr_dir)
- class IPdu(PduBase):
- def __init__(self, container):
- super(IPdu, self).__init__(container)
- def get_pdu_attrs_dir_list(self, container):
- isignal_mapping_attrs = self.get_pdu_signal_mapping_attr(container)
- container_header_id_con = Arxml.get_first_match_element(container, './/HEADER-ID-SHORT-HEADER')
- container_header_id = Arxml.get_container_text(container_header_id_con) if container_header_id_con != None else 'NA'
- if len(isignal_mapping_attrs) == 0:
- attr_dir = dict(
- {
- 'PDU Name': self._pdu_name,
- 'PDU Type': self._pdu_type,
- 'PDU Length': self._pdu_length,
- 'Contained PDU ID': container_header_id,
- 'ISignal Name':'NA',
- 'ISignal Start Position':'NA',
- }
- )
- self._attrs_dir_list.append(attr_dir)
- else:
- for isignal_attr in isignal_mapping_attrs:
- attr_dir = dict(
- {
- 'PDU Name': self._pdu_name,
- 'PDU Type': self._pdu_type,
- 'PDU Length': self._pdu_length,
- 'Contained PDU ID': container_header_id,
- 'ISignal Name': isignal_attr[0],
- 'ISignal Start Position': isignal_attr[1],
- }
- )
- self._attrs_dir_list.append(attr_dir)
- class ContainerPdu(PduBase):
- def get_pdu_attrs_dir_list(self, container):
- contained_pdu_refs = self.get_contained_pdu_ref(container)
- for contained_pdu in contained_pdu_refs:
- attr_dir = dict(
- {
- 'PDU Name': self._pdu_name,
- 'PDU Type': self._pdu_type,
- 'PDU Length': self._pdu_length,
- 'Contained PDU': contained_pdu,
- }
- )
- self._attrs_dir_list.append(attr_dir)
- def get_contained_pdu_ref(self, container):
- contained_pdu_refs = Arxml.get_all_match_element(container, './/CONTAINED-PDU-TRIGGERING-REF')
- contained_pdu_refs = [Arxml.get_split_end_value(Arxml.get_container_text(con)) for con in contained_pdu_refs]
- return contained_pdu_refs
- class Port(ContainerAttrItemBase):
- def __init__(self, container):
- super(Port, self).__init__(container)
- def get_container_attrs_dir_list(self, container):
- port_name = self.__get_port_name(container)
- port_type = self.__get_port_type(container)
- port_attrs = self.__get_all_signal_attrs(container)
- for attr in port_attrs:
- attr_dir = {}
- attr_dir['Port Name'] = port_name
- attr_dir['Port Direction'] = port_type
- attr_dir['Mapping Signal Type'] = attr[0]
- attr_dir['Signal Name'] = attr[1] if attr[1]!='' else port_name
- attr_dir['Signal Init Value'] = attr[2]
- self._attrs_dir_list.append(attr_dir)
- # print(self._attrs_dir_list)
- def __get_port_type(self, port_container:etree.Element):
- port_type = Arxml.get_element_tag(port_container)
- return port_type
- def __get_port_name(self, port_container):
- port_name = Arxml.get_container_text(Arxml.get_first_match_element(port_container, './SHORT-NAME'))
- return port_name
- def __get_init_value(self, container):
- init_value = Arxml.get_container_text(Arxml.get_first_match_element(container, './VALUE'))
- signalgroup_signal_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-LABEL'))
- return (init_value, signalgroup_signal_name)
- def __get_all_signal_attrs(self, port_container):
- numberical_value_containers = Arxml.get_all_match_element(port_container, './/NUMERICAL-VALUE-SPECIFICATION')
- number_size = len(numberical_value_containers)
- signal_attr = []
- if 1 == number_size:
- port_signal_type = 'SIGNAL'
- init_value, _ = self.__get_init_value(numberical_value_containers[0])
- signal_attr.append([port_signal_type, '', init_value])
- elif number_size>1:# one container has more than 1 attr dict
- port_signal_type = 'SIGNAL_GROUP'
- for number in numberical_value_containers:
- init_value, signal_name = self.__get_init_value(number)
- if signal_name != '': # valid number container whith signal attribure
- if signal_name.endswith('_re'):
- signal_name = signal_name[:-3]
- signal_attr.append([port_signal_type, signal_name, init_value])
- else:
- logging.error('Parse Signal Container ERROR!')
- return signal_attr
- class CompuMethod(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- cm_name = self.__get_cm_name(container)
- cm_attrs = self.__get_cm_attrs(container)
- for cm_attr in cm_attrs:# LINEAR compu method type, self._attrs_dir_list will return []
- attr_dir = dict(
- {
- 'Compu Method Name':cm_name,
- # 'Type':cm_attr[0],
- 'Signal Codding Name':cm_attr[1],
- 'Signal Codding Value':cm_attr[2]
- }
- )
- self._attrs_dir_list.append(attr_dir)
- def __get_cm_name(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def __get_cm_type(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './CATEGORY'))
- def __get_cm_attrs(self, container):
- cm_type = self.__get_cm_type(container)
- cm_attrs = []
- # if cm_type == 'TEXTTABLE' or cm_type == 'SCALE_LINEAR_AND_TEXTTABLE':
- if cm_type == 'TEXTTABLE':
- compu_scales = Arxml.get_all_match_element(container, './/COMPU-SCALE')
- for compu_scale in compu_scales:
- cm_micro_codding_name = Arxml.get_container_text(Arxml.get_first_match_element(compu_scale, './/COMPU-CONST/VT'))
- cm_micro_codding_value = Arxml.get_container_text(Arxml.get_first_match_element(compu_scale, './LOWER-LIMIT'))
- cm_attrs.append([cm_type, cm_micro_codding_name, cm_micro_codding_value])
- return cm_attrs
- class SystemSignal(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- ss_name = self.__get_ss_name(container)
- attr_dir = dict({'Signal Name':ss_name})
- self._attrs_dir_list.append(attr_dir)
- def __get_ss_name(self, container):
- ss_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- return ss_name
- class SystemSignalGroup(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- ssg_name = self.__get_ssg_name(container)
- ssg_attrs = self.__get_ssg_attrs(container)
- for ssg_attr in ssg_attrs:
- attr_dir = dict({
- 'Signal Name':ssg_attr[0],
- 'System Siganl Group':ssg_name
- })
- self._attrs_dir_list.append(attr_dir)
- def __get_ssg_name(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def __get_ssg_attrs(self, container):
- ssg_attrs = []
- ss_refs = Arxml.get_all_match_element(container, './/SYSTEM-SIGNAL-REF')
- for ss in ss_refs:
- ssg_attrs.append([Arxml.get_split_end_value(Arxml.get_container_text(ss))])
- return ssg_attrs
- class GatewayMapping(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- raise NotImplementedError
- def get_source_attr(self, container, source_tag_pattern):
- source_signal = ''
- source_controller = ''
- source_signal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, source_tag_pattern))
- ref_split_value = source_signal_ref.split('/')
- if len(ref_split_value) >= 4:
- source_signal = ref_split_value[-1]
- source_controller = ref_split_value[2]
- return (source_signal, source_controller)
- def get_target_attr(self, container, target_tag_pattern):
- target_signal = ''
- target_controller = ''
- target_signal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, target_tag_pattern))
- ref_split_value = target_signal_ref.split('/')
- if len(ref_split_value) >= 4:
- target_signal = ref_split_value[-1]
- target_controller = ref_split_value[2]
- return (target_signal, target_controller)
- class SignalMapping(GatewayMapping):
- def get_container_attrs_dir_list(self, container):
- source_signal, source_controller = self.get_source_attr(container, './SOURCE-SIGNAL-REF')
- target_signal, target_controller = self.get_target_attr(container, './TARGET-SIGNAL-REF')
- att_dir = dict(
- {
- 'Source Signal Trigger':source_signal,
- 'Source Controller':source_controller,
- 'Target Signal Trigger':target_signal,
- 'Target Controller':target_controller
- }
- )
- self._attrs_dir_list.append(att_dir)
- class PduMapping(GatewayMapping):
- def get_container_attrs_dir_list(self, container):
- source_pdu, source_controller = self.get_source_attr(container, './SOURCE-I-PDU-REF')
- target_pdu, target_controller = self.get_target_attr(container, './/TARGET-I-PDU-REF')
- att_dir = dict(
- {
- 'Source Pdu Trigger':source_pdu,
- 'Source Controller':source_controller,
- 'Target Pdu Trigger':target_pdu,
- 'Target Controller':target_controller
- }
- )
- self._attrs_dir_list.append(att_dir)
- class ISignal(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- isignal_name = self.__get_isignal_name(container)
- isignal_system_signal = self.__get_isignal_system_siganl_ref(container)
- isignal_bit_length = self.__get_isignal_bit_length(container)
- isignal_type = self.__get_isignal_type(container)
- isignal_init_value = self.__get_isignal_init_value(container)
- attr_dir = dict(
- {
- 'ISignal Name': isignal_name,
- 'Signal Name': isignal_system_signal,
- 'ISignal Bit Length': isignal_bit_length,
- 'ISignal Type': isignal_type,
- 'ISignal Init Value': isignal_init_value,
- }
- )
- self._attrs_dir_list.append(attr_dir)
- def __get_isignal_name(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def __get_isignal_system_siganl_ref(self, container):
- system_signal = Arxml.get_container_text(Arxml.get_first_match_element(container, './SYSTEM-SIGNAL-REF'))
- system_signal = Arxml.get_split_end_value(system_signal)
- return system_signal
- def __get_isignal_bit_length(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './LENGTH'))
- def __get_isignal_type(self, container):
- base_type_container = Arxml.get_first_match_element(container, './/BASE-TYPE-REF')
- return Arxml.get_split_end_value(Arxml.get_container_text(base_type_container))
- def __get_isignal_init_value(self, container):
- init_value = Arxml.get_first_match_element(container, './INIT-VALUE//VALUE')
- return Arxml.get_container_text(init_value)
- class ISignalTriggering(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- isignal_triggering_name = self.__get_isignal_triggering_name(container)
- controller , isignal_port_direction = self.__get_isignal_port_controller_and_direction(container)
- isignal_ref = self.__get_isignal_ref(container)
- attr_dir = dict(
- {
- 'Signal Trigger': isignal_triggering_name,
- 'Controller':controller,
- 'Port Direction':isignal_port_direction,
- 'ISignal Name':isignal_ref
- }
- )
- if attr_dir['ISignal Name'] != '': # exclude PDU ISignal trigger with isignal name is empty, for example ISTrigCCP_020ms_PDU02_2_0
- self._attrs_dir_list.append(attr_dir)
- def __get_isignal_port_controller_and_direction(self, container):
- port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-SIGNAL-PORT-REF'))
- port_dir_ref_split = port_dir_ref.split('/')
- return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction)
- def __get_isignal_triggering_name(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def __get_isignal_ref(self, container):
- isignal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-SIGNAL-REF'))
- return Arxml.get_split_end_value(isignal_ref)
- class FrameTriggering(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- frame_triggering_name = self.__get_frame_triggering_name(container)
- controller, frame_port_direction = self.__get_frame_port_controller_and_direction(container)
- frame_ref = self.__get_frame_ref(container)
- pdu_ref = self.__get_pdu_ref(container)
- frame_id = self.__get_frame_id(container)
- attr_dir = dict(
- {
- 'Frame Trigger': frame_triggering_name,
- 'Controller': controller,
- 'Port Direction': frame_port_direction,
- 'Frame Name': frame_ref,
- 'Frame ID': frame_id,
- 'Pdu Trigger': pdu_ref
- }
- )
- self._attrs_dir_list.append(attr_dir)
- def __get_frame_port_controller_and_direction(self, container):
- port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/FRAME-PORT-REF'))
- port_dir_ref_split = port_dir_ref.split('/')
- return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction)
- def __get_frame_triggering_name(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def __get_frame_ref(self, container):
- frame_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/FRAME-REF'))
- return Arxml.get_split_end_value(frame_ref)
- def __get_pdu_ref(self, container):
- pdu_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/PDU-TRIGGERING-REF'))
- # print(f"find pdu size is {len(Arxml.get_all_match_element(container, './/PDU-TRIGGERING-REF'))}") # all pdu size in a frame is 1
- return Arxml.get_split_end_value(pdu_ref)
- def __get_frame_id(self, container):
- frame_id = Arxml.get_container_text(Arxml.get_first_match_element(container, './/IDENTIFIER'))
- frame_id = hex(int(frame_id))
- return frame_id
- class PduTriggering(ContainerAttrItemBase):
- def get_container_attrs_dir_list(self, container):
- pdu_triggering_name = self.__get_pdu_triggering_name(container)
- controller, pdu_port_direction = self.__get_pdu_port_controller_and_direction(container)
- pdu_ref = self.__get_ipdu_ref(container)
- isignal_triggerings_ref = self.__get_isignal_triggerings_ref(container)
- for isg in isignal_triggerings_ref:
- attr_dir = dict(
- {
- 'Pdu Trigger': pdu_triggering_name,
- 'Controller': controller,
- 'Port Direction': pdu_port_direction,
- 'Pdu Name': pdu_ref,
- 'Signal Trigger': isg
- }
- )
- self._attrs_dir_list.append(attr_dir)
- def __get_pdu_port_controller_and_direction(self, container):
- port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-PDU-PORT-REF'))
- port_dir_ref_split = port_dir_ref.split('/')
- return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction)
- def __get_pdu_triggering_name(self, container):
- return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
- def __get_ipdu_ref(self, container):
- ipdu_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-PDU-REF'))
- return Arxml.get_split_end_value(ipdu_ref)
- def __get_isignal_triggerings_ref(self, container):
- isg_ref_ret = []
- isignal_triggerings = Arxml.get_all_match_element(container, './/I-SIGNAL-TRIGGERING-REF')
- if isignal_triggerings is None:
- isg_ref_ret.append('NA')
- else:
- for isg in isignal_triggerings:
- isg_ref_ret.append(Arxml.get_split_end_value(Arxml.get_container_text(isg)))
- return isg_ref_ret
- class ContainerFactory:
- def __init__(self, container_item_base: Type[ContainerAttrItemBase]) -> None:
- self.container_item_factory = container_item_base
- def get_containers_mapping_dataframe(self, containers:list) -> DataFrame:
- df_data = []
- all_attrs_name = None
- if len(containers) == 1:
- containers = containers[0] # special process for port container get <Arxml.get_first_match_element>
- for container in containers:
- container_item = self.container_item_factory(container)
- all_attrs, all_attrs_name_tmp = container_item.print_attr()
- df_data.extend(all_attrs)
- if all_attrs_name is None and len(all_attrs_name_tmp) != 0: #TO DO why?
- all_attrs_name = copy.deepcopy(all_attrs_name_tmp)
- # print(df_data)
- # print(len(df_data))
- # print(len(df_data[0]))
- # print(all_attrs_name)
- df = DataFrame(df_data, columns=all_attrs_name)
- return df
- class ISignalGroup:
- pass
- class MatrixParser:
- def __init__(self, arxml_file):
- self.__arxml = Arxml(arxml_file_name=arxml_file)
- self.__output_port_xls = f"./output/OutputFile_Attributes_{os.path.basename(arxml_file).split('.')[0]}.xlsx"
- self.container_parsers = dict()
- self.all_dataframe = dict()
- self.do_container_register()
- self.init_thread_func_list()
- self.start_write_thread()
- self.start_gen_threads()
- self.join_gen_threads()
- self.do_post_transform_dataframe()
- # ana_df1 = self.analyse_dataFrame(self.all_dataframe['Port'], self.all_dataframe['SystemSignal'], self.all_dataframe['SystemSignalGroup'])
- ana_df2 = self.analyse_dataFrame2(self.all_dataframe['ISignal'], self.all_dataframe['ISignalTriggering'])
- ana_df3 = self.analyse_dataFrame3(self.all_dataframe['NMPdu'], self.all_dataframe['NPdu'], self.all_dataframe['IPdu'], self.all_dataframe['ContainerPdu'], self.all_dataframe['Dcm_IPdu'])
- ana_df4 = self.analyse_dataFrame4(ana_df2, ana_df3)
- # ana_df1.to_csv(f"./output/OutputFile_SignalPort_{os.path.basename(arxml_file).split('.')[0]}.txt", sep='\t', index=False,header=True)
- self.all_dataframe['CompuMethod'].to_csv(f"./output/OutputFile_CompuMethod_{os.path.basename(arxml_file).split('.')[0]}.txt", sep='\t', index=False,header=True)
- # self.write_queue.put(('Analyse1', ana_df1))
- self.write_queue.put(('Analyse4', ana_df4))
- self.join_write_thread()
- def init_thread_func_list(self):
- self.write_queue = queue.Queue()
- def write_thread_func(self, sheet_queue):
- # while True:
- #
- # try:
- # sheet_name, sheet_data = sheet_queue.get()
- # print(f'sheet_name is {sheet_name}')
- # print(f'df type is {type(sheet_data)}')
- #
- # except ValueError:
- # exit(0)
- #
- # Pd_Utils.write_dataframe_to_xls(sheet_data, self.__output_port_xls, sheet_name, mode='a')
- writer = ExcelWriter(self.__output_port_xls, engine='xlsxwriter')
- while True:
- try:
- sheet_name, sheet_data = sheet_queue.get()
- except ValueError:
- writer.save()
- exit(0)
- sheet_data.to_excel(writer, sheet_name=sheet_name)
- def gen_data_thread_func(self, container_type, sheet_queue):
- sheet_name = container_type.__name__
- df = self.get_containers_attributes_dataframe(container_type)
- self.all_dataframe[sheet_name] = df
- logging.info(f'Analyse Matrix to get {sheet_name}')
- sheet_queue.put((sheet_name, df))
- def start_gen_threads(self):
- self.threads = []
- for container_type, containers_acquire_pattern in self.container_parsers.items():
- thread = threading.Thread(target=self.gen_data_thread_func,
- name=f'thread_{container_type.__name__}',
- args=(container_type, self.write_queue))
- self.threads.append(thread)
- for th in self.threads:
- th.start()
- time.sleep(0.1)
- def start_write_thread(self):
- self.write_thread = threading.Thread(target=self.write_thread_func, args=(self.write_queue, ))
- self.write_thread.start()
- time.sleep(0.5)
- def join_gen_threads(self):
- for th in self.threads:
- th.join()
- def join_write_thread(self):
- self.write_queue.put('STOP')
- self.write_thread.join()
- def analyse_dataFrame(self, port_attr_df, system_signal_attr_df, system_signal_group_attr_df):
- port_attr_df['Used By SWC'] = True
- df_merge1 = merge(system_signal_attr_df, port_attr_df.loc[:,['Signal Name', 'Used By SWC']], how='left')
- df_merge2 = merge(df_merge1, system_signal_group_attr_df, how='left')
- df_merge2.loc[df_merge2['Used By SWC'] != True,['Used By SWC']] = False
- df_merge2.fillna('NA', inplace=True)
- df_merge2.sort_values('Signal Name', inplace=True)
- # Pd_Utils.write_dataframe_to_xls(df_merge2, self.__output_port_xls, 'Analyse1', mode='a')
- # df_merge2.to_csv('Analyse1_SignalPort.txt', sep='\t', index=False,header=False)
- return df_merge2
- def analyse_dataFrame2(self, isignal_attr_df, isignal_triggering_df):
- '''
- merge isignal_attr_df to isignal_triggering_df, add Signal Name column
- :param isignal_attr_df:
- :param isignal_triggering_df:
- :return:
- '''
- df_merge = merge(isignal_triggering_df, isignal_attr_df.loc[:,['ISignal Name', 'Signal Name']], on='ISignal Name', how='left')
- # Pd_Utils.write_dataframe_to_xls(df_merge, self.__output_port_xls, 'Analyse2', mode='a')
- return df_merge
- def analyse_dataFrame3(self, nmpdu_df, npdu_df, ipdu_df, containerPdu_df, dcm_iPdu_df):
- # 竖向拼接
- df_merge1 = concat([nmpdu_df, npdu_df, ipdu_df, containerPdu_df, dcm_iPdu_df], sort=False)
- df_merge1 = df_merge1.fillna('NA')
- # Pd_Utils.write_dataframe_to_xls(df_merge1, self.__output_port_xls, 'Analyse3', mode='a')
- return df_merge1
- def analyse_dataFrame4(self, ana_df2, ana_df3):
- '''
- only pdu has isignal will keep in Analys dataframe4
- :param ana_df2:
- :param ana_df3:
- :return:
- '''
- # print(ana_df3.columns)
- # print((ana_df3['ISignal Name'] != 'NA'))
- ana_df3 = ana_df3[(ana_df3['ISignal Name'] != 'NA')] #
- df_merge = merge(ana_df3, ana_df2.loc[:,['ISignal Name', 'Signal Name', 'Controller', 'Port Direction']], on='ISignal Name', how='left')
- # df_merge = merge(ana_df2, ana_df3, on='ISignal Name', how='outer')
- # Pd_Utils.write_dataframe_to_xls(df_merge, self.__output_port_xls, 'Analyse4', mode='a')
- return df_merge
- def do_post_transform_dataframe(self):
- cm_df = self.all_dataframe['CompuMethod']
- cm_df.sort_values(['Signal Codding Name', 'Compu Method Name'], inplace=True)
- def register_container_parser(self, container_type: Type[ContainerAttrItemBase],
- containers_acquire_pattern: str):
- self.container_parsers[container_type] = containers_acquire_pattern
- def get_containers_attributes_dataframe(self, container_type: Type[ContainerAttrItemBase]):
- containers = Arxml.get_all_match_element(self.__arxml.root, self.container_parsers[container_type])
- container_factory = ContainerFactory(container_type)
- attribute_dataframe = container_factory.get_containers_mapping_dataframe(containers)
- return attribute_dataframe
- def do_container_register(self):
- container_parser = [
- # (Port, './/COMPOSITION-SW-COMPONENT-TYPE/PORTS'),
- # (Port, './/APPLICATION-SW-COMPONENT-TYPE/PORTS'),
- (SystemSignal, './/SYSTEM-SIGNAL'),
- (SystemSignalGroup, './/SYSTEM-SIGNAL-GROUP'),
- (SignalMapping, './/SIGNAL-MAPPINGS/I-SIGNAL-MAPPING'),
- (PduMapping, './/I-PDU-MAPPINGS/I-PDU-MAPPING'),
- (ISignal, './/I-SIGNAL'),
- (ISignalTriggering, './/I-SIGNAL-TRIGGERING'),
- (FrameTriggering, './/CAN-FRAME-TRIGGERING'),
- (PduTriggering, './/PDU-TRIGGERING'),
- (CompuMethod, './/COMPU-METHOD'),
- (NMPdu, './/NM-PDU'),
- (NPdu, './/N-PDU'),
- (IPdu, './/I-SIGNAL-I-PDU'),
- (ContainerPdu, './/CONTAINER-I-PDU'),
- (Dcm_IPdu, './/DCM-I-PDU')
- ]
- for parser_container_type, parser_containers_acquire_pattern in container_parser:
- self.register_container_parser(parser_container_type, parser_containers_acquire_pattern)
- if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- mp = MatrixParser(r'D:\01_Work\02_WP\EP39_EREV\branch\src\InputFiles\MA_20220921-qy1_EP39 EREV_SIMU+_GW-arxml-V01.arxml')
|