uds.can.transport_interface.python_can ====================================== .. py:module:: uds.can.transport_interface.python_can .. autoapi-nested-parse:: Implementation of UDS Transport Interface for CAN bus using python-can as bus manager. Classes ------- .. autoapisummary:: uds.can.transport_interface.python_can.PyCanTransportInterface Module Contents --------------- .. py:class:: PyCanTransportInterface(network_manager, addressing_information, notifier = None, async_notifier = None, **configuration_params) Bases: :py:obj:`uds.can.transport_interface.common.AbstractCanTransportInterface` .. autoapi-inheritance-diagram:: uds.can.transport_interface.python_can.PyCanTransportInterface :parts: 1 :private-bases: Transport Interface for managing UDS on CAN with python-can package as bus handler. .. note:: Documentation for python-can package: https://python-can.readthedocs.io/ Create Transport Interface that uses python-can package to control CAN bus. :param network_manager: Python-can bus object for handling CAN network. :param addressing_information: Addressing Information configuration of a simulated node that is taking part in DoCAN communication. :param notifier: Python-can notifier object for reporting received and sent CAN Frames to listeners. Leave None to create new notifier when needed. .. warning:: Only one notifier object shall be active at any time. :param async_notifier: Python-can notifier object for reporting received and sent CAN Frames to async listeners. Leave None to create new notifier when needed. .. warning:: Only one notifier object shall be active at any time. :param configuration_params: Additional configuration parameters. - :parameter n_as_timeout: Timeout value for :ref:`N_As ` time parameter. - :parameter n_ar_timeout: Timeout value for :ref:`N_Ar ` time parameter. - :parameter n_bs_timeout: Timeout value for :ref:`N_Bs ` time parameter. - :parameter n_br: Value of :ref:`N_Br ` time parameter to use in communication. - :parameter n_cs: Value of :ref:`N_Cs ` time parameter to use in communication. - :parameter n_cr_timeout: Timeout value for :ref:`N_Cr ` time parameter. - :parameter dlc: Base CAN DLC value to use for CAN packets. - :parameter min_dlc: Minimal CAN DLC to use for CAN Packets during Data Optimization. - :parameter use_data_optimization: Information whether to use :ref:`CAN Frame Data Optimization `. - :parameter filler_byte: Filler byte value to use for :ref:`CAN Frame Data Padding `. - :parameter flow_control_parameters_generator: Generator with Flow Control parameters to use. - :parameter can_version: Version of CAN protocol to be used for packets sending. - :parameter bitrate_switch: Whether bitrate switch (BRS) shall be set in sent packets. .. py:attribute:: _MAX_LISTENER_TIMEOUT :type: float :value: 4280.0 Maximal timeout value accepted by python-can listeners. .. py:attribute:: _MIN_NOTIFIER_TIMEOUT :type: float :value: 0.001 Minimal timeout for notifiers that does not cause malfunctioning of listeners. .. py:attribute:: network_manager :type: can.BusABC Get network manager used by this Transport Interface. Network manager handles Physical and Data layers (OSI Model) of the bus/network. .. py:property:: notifier :type: Optional[can.Notifier] Notifier used by python-can for reporting received and sent CAN Frames to listeners. .. py:property:: async_notifier :type: Optional[can.Notifier] Notifier used by python-can for reporting received and sent CAN Frames to async listeners. .. py:attribute:: __rx_frames_buffer .. py:attribute:: __tx_frames_buffer .. py:attribute:: __async_rx_frames_buffer .. py:attribute:: __async_tx_frames_buffer .. py:method:: __del__() Safely close all threads opened by this object. .. py:method:: __setup_sync_listening() Configure CAN frame notifier for synchronous communication. .. py:method:: __setup_async_listening(loop) Configure CAN frame notifier for asynchronous communication. :param loop: An :mod:`asyncio` event loop to use. .. py:method:: __teardown_sync_listening(suppress_warning = False) Stop and remove CAN frame notifier for synchronous communication. :param suppress_warning: Do not warn about mixing Synchronous and Asynchronous implementation. .. py:method:: __teardown_async_listening(suppress_warning = False) Stop and remove CAN frame notifier for asynchronous communication. :param suppress_warning: Do not warn about mixing Synchronous and Asynchronous implementation. .. py:method:: __validate_timeout(value) :staticmethod: Validate value of a timeout. :param value: Value of a timeout to check. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is a negative number. .. py:method:: _send_cf_packets_block(cf_packets_block, delay, fc_transmission_timestamp) Send block of Consecutive Frame CAN packets. :param cf_packets_block: Consecutive Frame CAN packets to send. :param delay: Minimal delay between sending following Consecutive Frames [ms]. :param fc_transmission_timestamp: Transmission timestamp of the proceeding Flow Control packet. :return: Records with historic information about transmitted Consecutive Frame CAN packets. .. py:method:: _async_send_cf_packets_block(cf_packets_block, delay, fc_transmission_timestamp, loop) :async: Send block of Consecutive Frame CAN packets asynchronously. :param cf_packets_block: Consecutive Frame CAN packets to send. :param delay: Minimal delay between sending following Consecutive Frames [ms]. :param fc_transmission_timestamp: Transmission timestamp of the proceeding Flow Control packet. :param loop: An asyncio event loop to use for scheduling this task. :return: Records with historic information about transmitted Consecutive Frame CAN packets. .. py:method:: _wait_for_flow_control(last_packet_transmission_timestamp) Wait till Flow Control CAN Packet is received. :param last_packet_transmission_timestamp: Timestamp when the last CAN Packet was transmitted. :return: Record with historic information about received Flow Control CAN packet. .. py:method:: _async_wait_for_flow_control(last_packet_transmission_timestamp) :async: Wait till Flow Control CAN Packet is received. :param last_packet_transmission_timestamp: Timestamp when the last CAN Packet was transmitted. :return: Record with historic information about received Flow Control CAN packet. .. py:method:: _wait_for_packet(buffer, timeout = None) Wait till CAN Packet is received. :param buffer: Listener to which CAN Packet would be delivered. :param timeout: Maximal time (in milliseconds) to wait. Leave None to wait forever. :raise TimeoutError: Timeout was reached before a CAN packet arrived. :return: Record with historic information about received CAN packet. .. py:method:: _async_wait_for_packet(buffer, timeout = None) :async: Wait till CAN Packet is received. :param buffer: Listener to which CAN Packet would be delivered. :param timeout: Maximal time (in milliseconds) to wait. Leave None to wait forever. :raise TimeoutError: Timeout was reached before a CAN packet arrived. :return: Record with historic information about received CAN packet. .. py:method:: _receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end) Receive block of :ref:`Consecutive Frames `. :param sequence_number: Current :ref:`Sequence Number ` (next Consecutive Frame shall have this value set). :param block_size: :ref:`Block Size ` value sent in the last :ref:`Flow Control CAN packet `. :param remaining_data_length: Number of remaining data bytes to receive in UDS message. :param timestamp_end: The final timestamp till when the reception must be completed. :raise TimeoutError: Timeout was reached. Either: - Consecutive Frame did not arrive before reaching N_Cr timeout - Diagnostic message reception :return: Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames. .. py:method:: _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end, loop) :async: Receive asynchronously block of :ref:`Consecutive Frames `. :param sequence_number: Current :ref:`Sequence Number ` (next Consecutive Frame shall have this value set). :param block_size: :ref:`Block Size ` value sent in the last :ref:`Flow Control CAN packet `. :param remaining_data_length: Number of remaining data bytes to receive in UDS message. :param timestamp_end: The final timestamp till when the reception must be completed. :param loop: An asyncio event loop used for observing messages. :return: Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames. .. py:method:: _receive_consecutive_frames(first_frame, timestamp_end) Receive Consecutive Frames after reception of First Frame. :param first_frame: :ref:`First Frame ` that was received. :param timestamp_end: The final timestamp till when the reception must be completed. :raise OverflowError: Flow Control packet with :ref:`Flow Status ` equal to OVERFLOW was sent. :return: Record of UDS message that was formed provided First Frame and received Consecutive Frames. .. py:method:: _async_receive_consecutive_frames(first_frame, timestamp_end, loop) :async: Receive asynchronously Consecutive Frames after reception of First Frame. :param first_frame: :ref:`First Frame ` that was received. :param timestamp_end: The final timestamp till when the reception must be completed. :param loop: An asyncio event loop used for observing messages. :raise TimeoutError: :ref:`N_Cr ` timeout was reached. :raise OverflowError: Flow Control packet with :ref:`Flow Status ` equal to OVERFLOW was sent. :raise NotImplementedError: Unhandled CAN packet starting a new CAN message transmission was received. :return: Record of UDS message that was formed provided First Frame and received Consecutive Frames. .. py:method:: _message_receive_start(initial_packet, timestamp_end) Continue to receive message after receiving initial packet. :param initial_packet: Record of a packet initiating UDS message reception. :param timestamp_end: The final timestamp till when the reception must be completed. :raise NotImplementedError: Unhandled CAN packet starting a new CAN message transmission was received. :return: Record of UDS message received. .. py:method:: _async_message_receive_start(initial_packet, timestamp_end, loop) :async: Continue to receive message asynchronously after receiving initial packet. :param initial_packet: Record of a packet initiating UDS message reception. :param timestamp_end: The final timestamp till when the reception must be completed. :param loop: An asyncio event loop used for observing messages. :raise NotImplementedError: Unhandled CAN packet starting a new CAN message transmission was received. :return: Record of UDS message received. .. py:method:: clear_rx_frames_buffers() Clear buffers used for storing received CAN frames. .. warning:: This will cause that all CAN packets received in a past are no longer accessible. .. py:method:: clear_tx_frames_buffers() Clear buffers used for storing transmitted CAN frames. .. py:method:: is_supported_network_manager(bus_manager) :staticmethod: Check whether provided value is a bus manager that is supported by this Transport Interface. :param bus_manager: Value to check. :return: True if provided bus object is compatible with this Transport Interface, False otherwise. .. py:method:: send_packet(packet) Transmit CAN packet. .. warning:: Must not be called within an asynchronous function. :param packet: CAN packet to send. :raise TypeError: Provided packet is not CAN packet. :return: Record with historic information about transmitted CAN packet. .. py:method:: async_send_packet(packet, loop = None) :async: Transmit asynchronously CAN packet. :param packet: CAN packet to send. :param loop: An asyncio event loop used for observing messages. :return: Record with historic information about transmitted CAN packet. .. py:method:: receive_packet(timeout = None) Receive CAN packet. .. warning:: Must not be called within an asynchronous function. :param timeout: Maximal time (in milliseconds) to wait. Leave None to wait forever. :return: Record with historic information about received CAN packet. .. py:method:: async_receive_packet(timeout = None, loop = None) :async: Receive asynchronously CAN packet. :param timeout: Maximal time (in milliseconds) to wait. Leave None to wait forever. :param loop: An asyncio event loop used for observing messages. :return: Record with historic information about received CAN packet. .. py:method:: send_message(message) Transmit UDS message over CAN. .. warning:: Must not be called within an asynchronous function. :param message: A message to send. :raise OverflowError: Flow Control packet with Flow Status equal to OVERFLOW was received. :raise TransmissionInterruptionError: A new UDS message transmission was started while sending this message. :raise NotImplementedError: Flow Control CAN packet with unknown Flow Status was received. :return: Record with historic information about transmitted UDS message. .. py:method:: async_send_message(message, loop = None) :async: Transmit asynchronously UDS message over CAN. :param message: A message to send. :param loop: An asyncio event loop to use for scheduling this task. :raise OverflowError: Flow Control packet with Flow Status equal to OVERFLOW was received. :raise TransmissionInterruptionError: A new UDS message transmission was started while sending this message. :raise NotImplementedError: Flow Control CAN packet with unknown Flow Status was received. :return: Record with historic information about transmitted UDS message. .. py:method:: receive_message(start_timeout = None, end_timeout = None) Receive UDS message over CAN. :param start_timeout: Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever. :param end_timeout: Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever. :raise MessageTransmissionNotStartedError: Timeout was exceeded before message reception started. :raise TimeoutError: Timeout was exceeded during message receiving (before all packets received). :return: Record with historic information about received UDS message. .. py:method:: async_receive_message(start_timeout = None, end_timeout = None, loop = None) :async: Receive asynchronously UDS message over CAN. :param start_timeout: Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever. :param end_timeout: Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever. :param loop: An asyncio event loop to use for scheduling this task. :raise MessageTransmissionNotStartedError: Timeout was exceeded before message reception started. :raise TimeoutError: Timeout was exceeded during message receiving (before all packets received). :return: Record with historic information about received UDS message.