MatrixParser.py 32 KB


  1. from Base.Arxml import Arxml
  2. from lxml import etree
  3. import logging, os, copy
  4. from pandas import DataFrame,ExcelWriter, merge, concat
  5. from typing import Type
  6. import threading
  7. import queue
  8. import time
  9. class Pd_Utils:
  10. @staticmethod
  11. def write_dataframe_to_xls(dataframe, xls_name, sheet_name='Sheet1', mode='w'):
  12. # print(f'write sheet{sheet_name}')
  13. if mode == 'a' and not(os.path.exists(xls_name)):
  14. mode = 'w'
  15. with ExcelWriter(xls_name, engine='openpyxl', mode=mode) as writer: # doctest: +SKIP
  16. # df = DataFrame(data, columns=columns_headers)
  17. # print(f'write excel {sheet_name}')
  18. dataframe.to_excel(writer, sheet_name=sheet_name)
  19. @staticmethod
  20. def write_list_to_csv(data_list, filename):
  21. '''
  22. :param data_list: two dimensions list
  23. :param filename: csv file name
  24. :return:
  25. '''
  26. with open(filename, 'w') as f:
  27. for line in data_list:
  28. for v in line:
  29. f.write(v)
  30. f.write(',')
  31. f.write('\n')
  32. class ContainerAttrItemBase:
  33. def __init__(self, container):
  34. # self._container_name = ''
  35. self._container = container
  36. self._attrs_dir_list = []
  37. # self.set_container_name()
  38. self.get_container_attrs_dir_list(container)
  39. # def set_container_name(self):
  40. # raise NotImplementedError
  41. #
  42. # def get_container_name(self):
  43. # return self._container_name
  44. def get_container_attrs_dir_list(self, container):
  45. raise NotImplementedError
  46. def print_attr(self):
  47. # all_attrs ==> [{key1:value1},{key2:value2},...] length not less than 1
  48. # each dir's values represent of one line in table, and dir's keys represent of table header
  49. all_attrs = []
  50. all_attrs_name = []
  51. for attr_dir in self._attrs_dir_list:
  52. # print(attr_dir)
  53. all_attrs.append(list(attr_dir.values()))
  54. if len(all_attrs_name) == 0:
  55. all_attrs_name = list(attr_dir.keys())
  56. if len(all_attrs_name) == 0:
  57. logging.info('ERROR: Container{}: {} get empty attribute dir !'.format(self._container, Arxml.get_container_text(Arxml.get_first_match_element(self._container, './/SHORT-NAME'))))
  58. # print(all_attrs)
  59. return (all_attrs, all_attrs_name)
  60. class PduBase(ContainerAttrItemBase):
  61. def __init__(self, container):
  62. self._pdu_name = ''
  63. self._pdu_type = ''
  64. self._pdu_length = ''
  65. self.get_pdu_base_attr(container)
  66. super(PduBase, self).__init__(container)
  67. # print('PduBase init')
  68. def get_pdu_base_attr(self, container):
  69. self.get_pdu_name(container)
  70. self.get_pdu_type(container)
  71. self.get_pdu_length(container)
  72. def get_pdu_type(self, container):
  73. self._pdu_type = Arxml.get_element_tag(container)
  74. def get_pdu_name(self, container):
  75. self._pdu_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  76. def get_pdu_length(self, container):
  77. self._pdu_length = Arxml.get_container_text(Arxml.get_first_match_element(container, './LENGTH'))
  78. def get_container_attrs_dir_list(self, container):
  79. self.get_pdu_attrs_dir_list(container)
  80. def get_pdu_attrs_dir_list(self, container):
  81. raise NotImplementedError
  82. def get_pdu_signal_mapping_attr(self, container):
  83. isigmal_mapping_attrs = []
  84. isignal_to_ipdu_mappings = Arxml.get_all_match_element(container, './/I-SIGNAL-TO-I-PDU-MAPPING')
  85. if isignal_to_ipdu_mappings != None:
  86. for isignal_mapping in isignal_to_ipdu_mappings:
  87. isignal_ref = Arxml.get_split_end_value(
  88. Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-REF'))
  89. )
  90. if isignal_ref == '': # IS not signal ref, but is signal group ref
  91. isignal_ref = Arxml.get_split_end_value(
  92. Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-GROUP-REF'))
  93. )
  94. isignal_start_position = Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './START-POSITION'))
  95. isigmal_mapping_attrs.append([isignal_ref, isignal_start_position])
  96. return isigmal_mapping_attrs
  97. class NMPdu(PduBase):
  98. def get_pdu_attrs_dir_list(self, container):
  99. isignal_mapping_attrs = self.get_pdu_signal_mapping_attr(container)
  100. for isignal_attr in isignal_mapping_attrs:
  101. attr_dir = dict(
  102. {
  103. 'PDU Name':self._pdu_name,
  104. 'PDU Type':self._pdu_type,
  105. 'PDU Length':self._pdu_length,
  106. 'ISignal Name':isignal_attr[0],
  107. 'ISignal Start Position':isignal_attr[1],
  108. }
  109. )
  110. self._attrs_dir_list.append(attr_dir)
  111. # def get_nmpdu_signal_mapping_attr(self, container):
  112. # isigmal_mapping_attrs = []
  113. # isignal_to_ipdu_mappings = Arxml.get_all_match_element(container, './/I-SIGNAL-TO-I-PDU-MAPPING')
  114. #
  115. # for isignal_mapping in isignal_to_ipdu_mappings:
  116. # isignal_ref = Arxml.get_split_end_value(
  117. # Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './I-SIGNAL-REF'))
  118. # )
  119. # isignal_start_position = Arxml.get_container_text(Arxml.get_first_match_element(isignal_mapping, './START-POSITION'))
  120. #
  121. # isigmal_mapping_attrs.append([isignal_ref, isignal_start_position])
  122. #
  123. # return isigmal_mapping_attrs
  124. # class NPdu
  125. class NPdu(PduBase):
  126. def get_pdu_attrs_dir_list(self, container):
  127. attr_dir = dict(
  128. {
  129. 'PDU Name': self._pdu_name,
  130. 'PDU Type': self._pdu_type,
  131. 'PDU Length': self._pdu_length,
  132. }
  133. )
  134. self._attrs_dir_list.append(attr_dir)
  135. class Dcm_IPdu(PduBase):
  136. def get_pdu_attrs_dir_list(self, container):
  137. attr_dir = dict(
  138. {
  139. 'PDU Name': self._pdu_name,
  140. 'PDU Type': self._pdu_type,
  141. 'PDU Length': self._pdu_length,
  142. }
  143. )
  144. self._attrs_dir_list.append(attr_dir)
  145. class IPdu(PduBase):
  146. def __init__(self, container):
  147. super(IPdu, self).__init__(container)
  148. def get_pdu_attrs_dir_list(self, container):
  149. isignal_mapping_attrs = self.get_pdu_signal_mapping_attr(container)
  150. container_header_id_con = Arxml.get_first_match_element(container, './/HEADER-ID-SHORT-HEADER')
  151. container_header_id = Arxml.get_container_text(container_header_id_con) if container_header_id_con != None else 'NA'
  152. if len(isignal_mapping_attrs) == 0:
  153. attr_dir = dict(
  154. {
  155. 'PDU Name': self._pdu_name,
  156. 'PDU Type': self._pdu_type,
  157. 'PDU Length': self._pdu_length,
  158. 'Contained PDU ID': container_header_id,
  159. 'ISignal Name':'NA',
  160. 'ISignal Start Position':'NA',
  161. }
  162. )
  163. self._attrs_dir_list.append(attr_dir)
  164. else:
  165. for isignal_attr in isignal_mapping_attrs:
  166. attr_dir = dict(
  167. {
  168. 'PDU Name': self._pdu_name,
  169. 'PDU Type': self._pdu_type,
  170. 'PDU Length': self._pdu_length,
  171. 'Contained PDU ID': container_header_id,
  172. 'ISignal Name': isignal_attr[0],
  173. 'ISignal Start Position': isignal_attr[1],
  174. }
  175. )
  176. self._attrs_dir_list.append(attr_dir)
  177. class ContainerPdu(PduBase):
  178. def get_pdu_attrs_dir_list(self, container):
  179. contained_pdu_refs = self.get_contained_pdu_ref(container)
  180. for contained_pdu in contained_pdu_refs:
  181. attr_dir = dict(
  182. {
  183. 'PDU Name': self._pdu_name,
  184. 'PDU Type': self._pdu_type,
  185. 'PDU Length': self._pdu_length,
  186. 'Contained PDU': contained_pdu,
  187. }
  188. )
  189. self._attrs_dir_list.append(attr_dir)
  190. def get_contained_pdu_ref(self, container):
  191. contained_pdu_refs = Arxml.get_all_match_element(container, './/CONTAINED-PDU-TRIGGERING-REF')
  192. contained_pdu_refs = [Arxml.get_split_end_value(Arxml.get_container_text(con)) for con in contained_pdu_refs]
  193. return contained_pdu_refs
  194. class Port(ContainerAttrItemBase):
  195. def __init__(self, container):
  196. super(Port, self).__init__(container)
  197. def get_container_attrs_dir_list(self, container):
  198. port_name = self.__get_port_name(container)
  199. port_type = self.__get_port_type(container)
  200. port_attrs = self.__get_all_signal_attrs(container)
  201. for attr in port_attrs:
  202. attr_dir = {}
  203. attr_dir['Port Name'] = port_name
  204. attr_dir['Port Direction'] = port_type
  205. attr_dir['Mapping Signal Type'] = attr[0]
  206. attr_dir['Signal Name'] = attr[1] if attr[1]!='' else port_name
  207. attr_dir['Signal Init Value'] = attr[2]
  208. self._attrs_dir_list.append(attr_dir)
  209. # print(self._attrs_dir_list)
  210. def __get_port_type(self, port_container:etree.Element):
  211. port_type = Arxml.get_element_tag(port_container)
  212. return port_type
  213. def __get_port_name(self, port_container):
  214. port_name = Arxml.get_container_text(Arxml.get_first_match_element(port_container, './SHORT-NAME'))
  215. return port_name
  216. def __get_init_value(self, container):
  217. init_value = Arxml.get_container_text(Arxml.get_first_match_element(container, './VALUE'))
  218. signalgroup_signal_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-LABEL'))
  219. return (init_value, signalgroup_signal_name)
  220. def __get_all_signal_attrs(self, port_container):
  221. numberical_value_containers = Arxml.get_all_match_element(port_container, './/NUMERICAL-VALUE-SPECIFICATION')
  222. number_size = len(numberical_value_containers)
  223. signal_attr = []
  224. if 1 == number_size:
  225. port_signal_type = 'SIGNAL'
  226. init_value, _ = self.__get_init_value(numberical_value_containers[0])
  227. signal_attr.append([port_signal_type, '', init_value])
  228. elif number_size>1:# one container has more than 1 attr dict
  229. port_signal_type = 'SIGNAL_GROUP'
  230. for number in numberical_value_containers:
  231. init_value, signal_name = self.__get_init_value(number)
  232. if signal_name != '': # valid number container whith signal attribure
  233. if signal_name.endswith('_re'):
  234. signal_name = signal_name[:-3]
  235. signal_attr.append([port_signal_type, signal_name, init_value])
  236. else:
  237. logging.error('Parse Signal Container ERROR!')
  238. return signal_attr
  239. class CompuMethod(ContainerAttrItemBase):
  240. def get_container_attrs_dir_list(self, container):
  241. cm_name = self.__get_cm_name(container)
  242. cm_attrs = self.__get_cm_attrs(container)
  243. for cm_attr in cm_attrs:# LINEAR compu method type, self._attrs_dir_list will return []
  244. attr_dir = dict(
  245. {
  246. 'Compu Method Name':cm_name,
  247. # 'Type':cm_attr[0],
  248. 'Signal Codding Name':cm_attr[1],
  249. 'Signal Codding Value':cm_attr[2]
  250. }
  251. )
  252. self._attrs_dir_list.append(attr_dir)
  253. def __get_cm_name(self, container):
  254. return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  255. def __get_cm_type(self, container):
  256. return Arxml.get_container_text(Arxml.get_first_match_element(container, './CATEGORY'))
  257. def __get_cm_attrs(self, container):
  258. cm_type = self.__get_cm_type(container)
  259. cm_attrs = []
  260. # if cm_type == 'TEXTTABLE' or cm_type == 'SCALE_LINEAR_AND_TEXTTABLE':
  261. if cm_type == 'TEXTTABLE':
  262. compu_scales = Arxml.get_all_match_element(container, './/COMPU-SCALE')
  263. for compu_scale in compu_scales:
  264. cm_micro_codding_name = Arxml.get_container_text(Arxml.get_first_match_element(compu_scale, './/COMPU-CONST/VT'))
  265. cm_micro_codding_value = Arxml.get_container_text(Arxml.get_first_match_element(compu_scale, './LOWER-LIMIT'))
  266. cm_attrs.append([cm_type, cm_micro_codding_name, cm_micro_codding_value])
  267. return cm_attrs
  268. class SystemSignal(ContainerAttrItemBase):
  269. def get_container_attrs_dir_list(self, container):
  270. ss_name = self.__get_ss_name(container)
  271. attr_dir = dict({'Signal Name':ss_name})
  272. self._attrs_dir_list.append(attr_dir)
  273. def __get_ss_name(self, container):
  274. ss_name = Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  275. return ss_name
  276. class SystemSignalGroup(ContainerAttrItemBase):
  277. def get_container_attrs_dir_list(self, container):
  278. ssg_name = self.__get_ssg_name(container)
  279. ssg_attrs = self.__get_ssg_attrs(container)
  280. for ssg_attr in ssg_attrs:
  281. attr_dir = dict({
  282. 'Signal Name':ssg_attr[0],
  283. 'System Siganl Group':ssg_name
  284. })
  285. self._attrs_dir_list.append(attr_dir)
  286. def __get_ssg_name(self, container):
  287. return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  288. def __get_ssg_attrs(self, container):
  289. ssg_attrs = []
  290. ss_refs = Arxml.get_all_match_element(container, './/SYSTEM-SIGNAL-REF')
  291. for ss in ss_refs:
  292. ssg_attrs.append([Arxml.get_split_end_value(Arxml.get_container_text(ss))])
  293. return ssg_attrs
  294. class GatewayMapping(ContainerAttrItemBase):
  295. def get_container_attrs_dir_list(self, container):
  296. raise NotImplementedError
  297. def get_source_attr(self, container, source_tag_pattern):
  298. source_signal = ''
  299. source_controller = ''
  300. source_signal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, source_tag_pattern))
  301. ref_split_value = source_signal_ref.split('/')
  302. if len(ref_split_value) >= 4:
  303. source_signal = ref_split_value[-1]
  304. source_controller = ref_split_value[2]
  305. return (source_signal, source_controller)
  306. def get_target_attr(self, container, target_tag_pattern):
  307. target_signal = ''
  308. target_controller = ''
  309. target_signal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, target_tag_pattern))
  310. ref_split_value = target_signal_ref.split('/')
  311. if len(ref_split_value) >= 4:
  312. target_signal = ref_split_value[-1]
  313. target_controller = ref_split_value[2]
  314. return (target_signal, target_controller)
  315. class SignalMapping(GatewayMapping):
  316. def get_container_attrs_dir_list(self, container):
  317. source_signal, source_controller = self.get_source_attr(container, './SOURCE-SIGNAL-REF')
  318. target_signal, target_controller = self.get_target_attr(container, './TARGET-SIGNAL-REF')
  319. att_dir = dict(
  320. {
  321. 'Source Signal Trigger':source_signal,
  322. 'Source Controller':source_controller,
  323. 'Target Signal Trigger':target_signal,
  324. 'Target Controller':target_controller
  325. }
  326. )
  327. self._attrs_dir_list.append(att_dir)
  328. class PduMapping(GatewayMapping):
  329. def get_container_attrs_dir_list(self, container):
  330. source_pdu, source_controller = self.get_source_attr(container, './SOURCE-I-PDU-REF')
  331. target_pdu, target_controller = self.get_target_attr(container, './/TARGET-I-PDU-REF')
  332. att_dir = dict(
  333. {
  334. 'Source Pdu Trigger':source_pdu,
  335. 'Source Controller':source_controller,
  336. 'Target Pdu Trigger':target_pdu,
  337. 'Target Controller':target_controller
  338. }
  339. )
  340. self._attrs_dir_list.append(att_dir)
  341. class ISignal(ContainerAttrItemBase):
  342. def get_container_attrs_dir_list(self, container):
  343. isignal_name = self.__get_isignal_name(container)
  344. isignal_system_signal = self.__get_isignal_system_siganl_ref(container)
  345. isignal_bit_length = self.__get_isignal_bit_length(container)
  346. isignal_type = self.__get_isignal_type(container)
  347. isignal_init_value = self.__get_isignal_init_value(container)
  348. attr_dir = dict(
  349. {
  350. 'ISignal Name': isignal_name,
  351. 'Signal Name': isignal_system_signal,
  352. 'ISignal Bit Length': isignal_bit_length,
  353. 'ISignal Type': isignal_type,
  354. 'ISignal Init Value': isignal_init_value,
  355. }
  356. )
  357. self._attrs_dir_list.append(attr_dir)
  358. def __get_isignal_name(self, container):
  359. return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  360. def __get_isignal_system_siganl_ref(self, container):
  361. system_signal = Arxml.get_container_text(Arxml.get_first_match_element(container, './SYSTEM-SIGNAL-REF'))
  362. system_signal = Arxml.get_split_end_value(system_signal)
  363. return system_signal
  364. def __get_isignal_bit_length(self, container):
  365. return Arxml.get_container_text(Arxml.get_first_match_element(container, './LENGTH'))
  366. def __get_isignal_type(self, container):
  367. base_type_container = Arxml.get_first_match_element(container, './/BASE-TYPE-REF')
  368. return Arxml.get_split_end_value(Arxml.get_container_text(base_type_container))
  369. def __get_isignal_init_value(self, container):
  370. init_value = Arxml.get_first_match_element(container, './INIT-VALUE//VALUE')
  371. return Arxml.get_container_text(init_value)
  372. class ISignalTriggering(ContainerAttrItemBase):
  373. def get_container_attrs_dir_list(self, container):
  374. isignal_triggering_name = self.__get_isignal_triggering_name(container)
  375. controller , isignal_port_direction = self.__get_isignal_port_controller_and_direction(container)
  376. isignal_ref = self.__get_isignal_ref(container)
  377. attr_dir = dict(
  378. {
  379. 'Signal Trigger': isignal_triggering_name,
  380. 'Controller':controller,
  381. 'Port Direction':isignal_port_direction,
  382. 'ISignal Name':isignal_ref
  383. }
  384. )
  385. if attr_dir['ISignal Name'] != '': # exclude PDU ISignal trigger with isignal name is empty, for example ISTrigCCP_020ms_PDU02_2_0
  386. self._attrs_dir_list.append(attr_dir)
  387. def __get_isignal_port_controller_and_direction(self, container):
  388. port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-SIGNAL-PORT-REF'))
  389. port_dir_ref_split = port_dir_ref.split('/')
  390. return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction)
  391. def __get_isignal_triggering_name(self, container):
  392. return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  393. def __get_isignal_ref(self, container):
  394. isignal_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-SIGNAL-REF'))
  395. return Arxml.get_split_end_value(isignal_ref)
  396. class FrameTriggering(ContainerAttrItemBase):
  397. def get_container_attrs_dir_list(self, container):
  398. frame_triggering_name = self.__get_frame_triggering_name(container)
  399. controller, frame_port_direction = self.__get_frame_port_controller_and_direction(container)
  400. frame_ref = self.__get_frame_ref(container)
  401. pdu_ref = self.__get_pdu_ref(container)
  402. frame_id = self.__get_frame_id(container)
  403. attr_dir = dict(
  404. {
  405. 'Frame Trigger': frame_triggering_name,
  406. 'Controller': controller,
  407. 'Port Direction': frame_port_direction,
  408. 'Frame Name': frame_ref,
  409. 'Frame ID': frame_id,
  410. 'Pdu Trigger': pdu_ref
  411. }
  412. )
  413. self._attrs_dir_list.append(attr_dir)
  414. def __get_frame_port_controller_and_direction(self, container):
  415. port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/FRAME-PORT-REF'))
  416. port_dir_ref_split = port_dir_ref.split('/')
  417. return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction)
  418. def __get_frame_triggering_name(self, container):
  419. return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  420. def __get_frame_ref(self, container):
  421. frame_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/FRAME-REF'))
  422. return Arxml.get_split_end_value(frame_ref)
  423. def __get_pdu_ref(self, container):
  424. pdu_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/PDU-TRIGGERING-REF'))
  425. # print(f"find pdu size is {len(Arxml.get_all_match_element(container, './/PDU-TRIGGERING-REF'))}") # all pdu size in a frame is 1
  426. return Arxml.get_split_end_value(pdu_ref)
  427. def __get_frame_id(self, container):
  428. frame_id = Arxml.get_container_text(Arxml.get_first_match_element(container, './/IDENTIFIER'))
  429. frame_id = hex(int(frame_id))
  430. return frame_id
  431. class PduTriggering(ContainerAttrItemBase):
  432. def get_container_attrs_dir_list(self, container):
  433. pdu_triggering_name = self.__get_pdu_triggering_name(container)
  434. controller, pdu_port_direction = self.__get_pdu_port_controller_and_direction(container)
  435. pdu_ref = self.__get_ipdu_ref(container)
  436. isignal_triggerings_ref = self.__get_isignal_triggerings_ref(container)
  437. for isg in isignal_triggerings_ref:
  438. attr_dir = dict(
  439. {
  440. 'Pdu Trigger': pdu_triggering_name,
  441. 'Controller': controller,
  442. 'Port Direction': pdu_port_direction,
  443. 'Pdu Name': pdu_ref,
  444. 'Signal Trigger': isg
  445. }
  446. )
  447. self._attrs_dir_list.append(attr_dir)
  448. def __get_pdu_port_controller_and_direction(self, container):
  449. port_dir_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-PDU-PORT-REF'))
  450. port_dir_ref_split = port_dir_ref.split('/')
  451. return (port_dir_ref_split[-2], port_dir_ref_split[-1]) # (controller, direction)
  452. def __get_pdu_triggering_name(self, container):
  453. return Arxml.get_container_text(Arxml.get_first_match_element(container, './SHORT-NAME'))
  454. def __get_ipdu_ref(self, container):
  455. ipdu_ref = Arxml.get_container_text(Arxml.get_first_match_element(container, './/I-PDU-REF'))
  456. return Arxml.get_split_end_value(ipdu_ref)
  457. def __get_isignal_triggerings_ref(self, container):
  458. isg_ref_ret = []
  459. isignal_triggerings = Arxml.get_all_match_element(container, './/I-SIGNAL-TRIGGERING-REF')
  460. if isignal_triggerings is None:
  461. isg_ref_ret.append('NA')
  462. else:
  463. for isg in isignal_triggerings:
  464. isg_ref_ret.append(Arxml.get_split_end_value(Arxml.get_container_text(isg)))
  465. return isg_ref_ret
  466. class ContainerFactory:
  467. def __init__(self, container_item_base: Type[ContainerAttrItemBase]) -> None:
  468. self.container_item_factory = container_item_base
  469. def get_containers_mapping_dataframe(self, containers:list) -> DataFrame:
  470. df_data = []
  471. all_attrs_name = None
  472. if len(containers) == 1:
  473. containers = containers[0] # special process for port container get <Arxml.get_first_match_element>
  474. for container in containers:
  475. container_item = self.container_item_factory(container)
  476. all_attrs, all_attrs_name_tmp = container_item.print_attr()
  477. df_data.extend(all_attrs)
  478. if all_attrs_name is None and len(all_attrs_name_tmp) != 0: #TO DO why?
  479. all_attrs_name = copy.deepcopy(all_attrs_name_tmp)
  480. # print(df_data)
  481. # print(len(df_data))
  482. # print(len(df_data[0]))
  483. # print(all_attrs_name)
  484. df = DataFrame(df_data, columns=all_attrs_name)
  485. return df
  486. class ISignalGroup:
  487. pass
  488. class MatrixParser:
  489. def __init__(self, arxml_file):
  490. self.__arxml = Arxml(arxml_file_name=arxml_file)
  491. self.__output_port_xls = f"./output/OutputFile_Attributes_{os.path.basename(arxml_file).split('.')[0]}.xlsx"
  492. self.container_parsers = dict()
  493. self.all_dataframe = dict()
  494. self.do_container_register()
  495. self.init_thread_func_list()
  496. self.start_write_thread()
  497. self.start_gen_threads()
  498. self.join_gen_threads()
  499. self.do_post_transform_dataframe()
  500. # ana_df1 = self.analyse_dataFrame(self.all_dataframe['Port'], self.all_dataframe['SystemSignal'], self.all_dataframe['SystemSignalGroup'])
  501. ana_df2 = self.analyse_dataFrame2(self.all_dataframe['ISignal'], self.all_dataframe['ISignalTriggering'])
  502. ana_df3 = self.analyse_dataFrame3(self.all_dataframe['NMPdu'], self.all_dataframe['NPdu'], self.all_dataframe['IPdu'], self.all_dataframe['ContainerPdu'], self.all_dataframe['Dcm_IPdu'])
  503. ana_df4 = self.analyse_dataFrame4(ana_df2, ana_df3)
  504. # ana_df1.to_csv(f"./output/OutputFile_SignalPort_{os.path.basename(arxml_file).split('.')[0]}.txt", sep='\t', index=False,header=True)
  505. self.all_dataframe['CompuMethod'].to_csv(f"./output/OutputFile_CompuMethod_{os.path.basename(arxml_file).split('.')[0]}.txt", sep='\t', index=False,header=True)
  506. # self.write_queue.put(('Analyse1', ana_df1))
  507. self.write_queue.put(('Analyse4', ana_df4))
  508. self.join_write_thread()
  509. def init_thread_func_list(self):
  510. self.write_queue = queue.Queue()
  511. def write_thread_func(self, sheet_queue):
  512. # while True:
  513. #
  514. # try:
  515. # sheet_name, sheet_data = sheet_queue.get()
  516. # print(f'sheet_name is {sheet_name}')
  517. # print(f'df type is {type(sheet_data)}')
  518. #
  519. # except ValueError:
  520. # exit(0)
  521. #
  522. # Pd_Utils.write_dataframe_to_xls(sheet_data, self.__output_port_xls, sheet_name, mode='a')
  523. writer = ExcelWriter(self.__output_port_xls, engine='xlsxwriter')
  524. while True:
  525. try:
  526. sheet_name, sheet_data = sheet_queue.get()
  527. except ValueError:
  528. writer.save()
  529. exit(0)
  530. sheet_data.to_excel(writer, sheet_name=sheet_name)
  531. def gen_data_thread_func(self, container_type, sheet_queue):
  532. sheet_name = container_type.__name__
  533. df = self.get_containers_attributes_dataframe(container_type)
  534. self.all_dataframe[sheet_name] = df
  535. logging.info(f'Analyse Matrix to get {sheet_name}')
  536. sheet_queue.put((sheet_name, df))
  537. def start_gen_threads(self):
  538. self.threads = []
  539. for container_type, containers_acquire_pattern in self.container_parsers.items():
  540. thread = threading.Thread(target=self.gen_data_thread_func,
  541. name=f'thread_{container_type.__name__}',
  542. args=(container_type, self.write_queue))
  543. self.threads.append(thread)
  544. for th in self.threads:
  545. th.start()
  546. time.sleep(0.1)
  547. def start_write_thread(self):
  548. self.write_thread = threading.Thread(target=self.write_thread_func, args=(self.write_queue, ))
  549. self.write_thread.start()
  550. time.sleep(0.5)
  551. def join_gen_threads(self):
  552. for th in self.threads:
  553. th.join()
  554. def join_write_thread(self):
  555. self.write_queue.put('STOP')
  556. self.write_thread.join()
  557. def analyse_dataFrame(self, port_attr_df, system_signal_attr_df, system_signal_group_attr_df):
  558. port_attr_df['Used By SWC'] = True
  559. df_merge1 = merge(system_signal_attr_df, port_attr_df.loc[:,['Signal Name', 'Used By SWC']], how='left')
  560. df_merge2 = merge(df_merge1, system_signal_group_attr_df, how='left')
  561. df_merge2.loc[df_merge2['Used By SWC'] != True,['Used By SWC']] = False
  562. df_merge2.fillna('NA', inplace=True)
  563. df_merge2.sort_values('Signal Name', inplace=True)
  564. # Pd_Utils.write_dataframe_to_xls(df_merge2, self.__output_port_xls, 'Analyse1', mode='a')
  565. # df_merge2.to_csv('Analyse1_SignalPort.txt', sep='\t', index=False,header=False)
  566. return df_merge2
  567. def analyse_dataFrame2(self, isignal_attr_df, isignal_triggering_df):
  568. '''
  569. merge isignal_attr_df to isignal_triggering_df, add Signal Name column
  570. :param isignal_attr_df:
  571. :param isignal_triggering_df:
  572. :return:
  573. '''
  574. df_merge = merge(isignal_triggering_df, isignal_attr_df.loc[:,['ISignal Name', 'Signal Name']], on='ISignal Name', how='left')
  575. # Pd_Utils.write_dataframe_to_xls(df_merge, self.__output_port_xls, 'Analyse2', mode='a')
  576. return df_merge
  577. def analyse_dataFrame3(self, nmpdu_df, npdu_df, ipdu_df, containerPdu_df, dcm_iPdu_df):
  578. # 竖向拼接
  579. df_merge1 = concat([nmpdu_df, npdu_df, ipdu_df, containerPdu_df, dcm_iPdu_df], sort=False)
  580. df_merge1 = df_merge1.fillna('NA')
  581. # Pd_Utils.write_dataframe_to_xls(df_merge1, self.__output_port_xls, 'Analyse3', mode='a')
  582. return df_merge1
  583. def analyse_dataFrame4(self, ana_df2, ana_df3):
  584. '''
  585. only pdu has isignal will keep in Analys dataframe4
  586. :param ana_df2:
  587. :param ana_df3:
  588. :return:
  589. '''
  590. # print(ana_df3.columns)
  591. # print((ana_df3['ISignal Name'] != 'NA'))
  592. ana_df3 = ana_df3[(ana_df3['ISignal Name'] != 'NA')] #
  593. df_merge = merge(ana_df3, ana_df2.loc[:,['ISignal Name', 'Signal Name', 'Controller', 'Port Direction']], on='ISignal Name', how='left')
  594. # df_merge = merge(ana_df2, ana_df3, on='ISignal Name', how='outer')
  595. # Pd_Utils.write_dataframe_to_xls(df_merge, self.__output_port_xls, 'Analyse4', mode='a')
  596. return df_merge
  597. def do_post_transform_dataframe(self):
  598. cm_df = self.all_dataframe['CompuMethod']
  599. cm_df.sort_values(['Signal Codding Name', 'Compu Method Name'], inplace=True)
  600. def register_container_parser(self, container_type: Type[ContainerAttrItemBase],
  601. containers_acquire_pattern: str):
  602. self.container_parsers[container_type] = containers_acquire_pattern
  603. def get_containers_attributes_dataframe(self, container_type: Type[ContainerAttrItemBase]):
  604. containers = Arxml.get_all_match_element(self.__arxml.root, self.container_parsers[container_type])
  605. container_factory = ContainerFactory(container_type)
  606. attribute_dataframe = container_factory.get_containers_mapping_dataframe(containers)
  607. return attribute_dataframe
  608. def do_container_register(self):
  609. container_parser = [
  610. # (Port, './/COMPOSITION-SW-COMPONENT-TYPE/PORTS'),
  611. # (Port, './/APPLICATION-SW-COMPONENT-TYPE/PORTS'),
  612. (SystemSignal, './/SYSTEM-SIGNAL'),
  613. (SystemSignalGroup, './/SYSTEM-SIGNAL-GROUP'),
  614. (SignalMapping, './/SIGNAL-MAPPINGS/I-SIGNAL-MAPPING'),
  615. (PduMapping, './/I-PDU-MAPPINGS/I-PDU-MAPPING'),
  616. (ISignal, './/I-SIGNAL'),
  617. (ISignalTriggering, './/I-SIGNAL-TRIGGERING'),
  618. (FrameTriggering, './/CAN-FRAME-TRIGGERING'),
  619. (PduTriggering, './/PDU-TRIGGERING'),
  620. (CompuMethod, './/COMPU-METHOD'),
  621. (NMPdu, './/NM-PDU'),
  622. (NPdu, './/N-PDU'),
  623. (IPdu, './/I-SIGNAL-I-PDU'),
  624. (ContainerPdu, './/CONTAINER-I-PDU'),
  625. (Dcm_IPdu, './/DCM-I-PDU')
  626. ]
  627. for parser_container_type, parser_containers_acquire_pattern in container_parser:
  628. self.register_container_parser(parser_container_type, parser_containers_acquire_pattern)
  629. if __name__ == '__main__':
  630. logging.basicConfig(level=logging.DEBUG)
  631. mp = MatrixParser(r'D:\01_Work\02_WP\EP39_EREV\branch\src\InputFiles\MA_20220921-qy1_EP39 EREV_SIMU+_GW-arxml-V01.arxml')