getUsefulInfo.py 4.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import pandas as pd
  2. from pandas import DataFrame
  3. import numpy as np
  4. # import dtale
  5. def get_agg_pdu_info_by_controller(pdu_name: str, pdu_triggering_df: DataFrame):
  6. # select pdu_name releated pdus in Pdu Triggering
  7. pdu_triggering_df.fillna('', inplace=True)
  8. criterion_select_pdus = pdu_triggering_df['Pdu Name'].str.contains(pdu_name)
  9. releated_pdus = pdu_triggering_df[criterion_select_pdus]
  10. pdu_info_agg = releated_pdus \
  11. .groupby('Controller') \
  12. .agg(
  13. PduName = ('Pdu Name', lambda x: ','.join(x.unique())),
  14. PortDirection = ('Port Direction', lambda x: ','.join(x.unique())),
  15. signal=('Signal Trigger', lambda x: ','.join(x))
  16. ) \
  17. .reset_index()
  18. return pdu_info_agg
  19. def get_normal_or_contained_pdu(pdu_name: str, aggregated_pdu_info: DataFrame, container_df: DataFrame):
  20. unique_pdu_list = aggregated_pdu_info['PduName'].unique()
  21. # get contained pdu's name, and add pdu name as normal can pdu name.
  22. cri_is_contained = container_df['Contained PDU'].apply(lambda x: x[5:]).isin(unique_pdu_list) # trip head 'PduTr'
  23. normal_contained_pdus = np.append(container_df[cri_is_contained]['PDU Name'].unique(), pdu_name)
  24. return normal_contained_pdus
  25. def get_frame_info_by_pdu_name_list(normal_contained_pdus, frame_df: DataFrame):
  26. cri_has_frame_info = frame_df['Pdu Trigger'].apply(lambda x: x[5:]).isin(normal_contained_pdus) # trip head 'PduTr'
  27. frame_info = frame_df[cri_has_frame_info]
  28. return frame_info
  29. def get_useful_info_by_pdu_name(pdu_name: str, frame_df, pdu_triggering_df, container_df, df_idpu):
  30. pdu_info_agg = get_agg_pdu_info_by_controller(pdu_name, pdu_triggering_df)
  31. # print(pdu_info_agg)
  32. normal_contained_pdus = get_normal_or_contained_pdu(pdu_name, pdu_info_agg, container_df)
  33. # print(normal_contained_pdus)
  34. frame_info = get_frame_info_by_pdu_name_list(normal_contained_pdus, frame_df)
  35. # print(frame_info.loc[:, ['Controller', 'Frame Name', 'Frame ID', 'Pdu Trigger']])
  36. # pdu_info_merged = pd.merge(pdu_info_agg, frame_info.loc[:, ['Frame Name', 'Frame ID', 'Pdu Trigger']], on='Controller')
  37. contained_info = get_contained_pdu_info(pdu_name, df_idpu)
  38. pdu_info_merged = pd.merge(pdu_info_agg, frame_info.loc[:, ['Controller', 'Frame Name', 'Frame ID', 'Pdu Trigger']], on='Controller')
  39. pdu_info_merged = pd.merge(pdu_info_merged, contained_info, on='PduName', how='outer')
  40. return pdu_info_merged
  41. def get_contained_pdu_info(pdu_name: str, ipdu_df):
  42. df_ipdu_contained = ipdu_df[ipdu_df['Contained PDU ID'].notna()]
  43. df_contained_info = df_ipdu_contained \
  44. .groupby('PDU Name') \
  45. .agg(
  46. ContainedPduID = ('Contained PDU ID', lambda x: ','.join(x.unique())),
  47. ) \
  48. .reset_index()
  49. df_contained_info.rename(columns={'PDU Name': 'PduName'}, inplace=True)
  50. # print(df_contained_info)
  51. criterion_select_pdus = df_contained_info['PduName'].str.contains(pdu_name)
  52. relate_contained_pdu = df_contained_info[criterion_select_pdus]
  53. return relate_contained_pdu
  54. def get_pdu_info_by_signal_name(signal_name: str, pdu_triggering_df):
  55. criterion_select_pdus = pdu_triggering_df['Signal Trigger'].str.contains(signal_name)
  56. releated_pdus = pdu_triggering_df[criterion_select_pdus]
  57. return releated_pdus
  58. def test():
  59. xlsx = pd.ExcelFile('OutputFile_Attributes_MA_20220921-qy1_EP39 EREV_SIMU+_GW-arxml-V01.xlsx')
  60. # print(xlsx.sheet_names)
  61. df_idpu = pd.read_excel(xlsx, sheet_name='IPdu', index_col=0)
  62. df_frame = pd.read_excel(xlsx, sheet_name='FrameTriggering', index_col=0)
  63. df_pdu_trig = pd.read_excel(xlsx, sheet_name='PduTriggering', index_col=0)
  64. df_container = pd.read_excel(xlsx, sheet_name='ContainerPdu', index_col=0)
  65. ipdu_dtyps = {'Contained PDU ID':str}
  66. df_ipdu = pd.read_excel(xlsx, sheet_name='IPdu', index_col=0, dtype=ipdu_dtyps)
  67. pdu_name = 'CCP_020ms_PDU02'
  68. #pdu_name = 'ECM_010ms_PDU00'
  69. # pdu_name = 'FVCM_ADCANFD_Sporadic_Container12'
  70. # pdu_name = 'SCS_020ms_PDU04'
  71. search_info = get_useful_info_by_pdu_name(pdu_name, df_frame, df_pdu_trig, df_container, df_ipdu)
  72. # dtale.show(search_info,open_browser=True)#
  73. print(search_info)
  74. signal_name = 'FICMACD'
  75. search_pdu_info = get_pdu_info_by_signal_name(signal_name, df_pdu_trig)
  76. print(search_pdu_info)