uds.can.transport_interface.python_can ====================================== .. py:module:: uds.can.transport_interface.python_can .. autoapi-nested-parse:: Implementation of UDS Transport Interface for CAN bus using python-can as bus manager. Classes ------- .. autoapisummary:: uds.can.transport_interface.python_can.PyCanTransportInterface Module Contents --------------- .. py:class:: PyCanTransportInterface(network_manager, addressing_information, **configuration_params) Bases: :py:obj:`uds.can.transport_interface.common.AbstractCanTransportInterface` .. autoapi-inheritance-diagram:: uds.can.transport_interface.python_can.PyCanTransportInterface :parts: 1 :private-bases: Transport Interface for managing UDS on CAN with python-can package as bus handler. .. note:: Documentation for python-can package: https://python-can.readthedocs.io/ Create Transport Interface that uses python-can package to control CAN bus. :param network_manager: Python-can bus object for handling CAN network. .. warning:: Bus must have capability of observing transmitted frames. :param addressing_information: Addressing Information of UDS entity simulated by this CAN Transport Interface. :param configuration_params: Additional configuration parameters. - :parameter n_as_timeout: Timeout value for :ref:`N_As ` time parameter. - :parameter n_ar_timeout: Timeout value for :ref:`N_Ar ` time parameter. - :parameter n_bs_timeout: Timeout value for :ref:`N_Bs ` time parameter. - :parameter n_br: Value of :ref:`N_Br ` time parameter to use in communication. - :parameter n_cs: Value of :ref:`N_Cs ` time parameter to use in communication. - :parameter n_cr_timeout: Timeout value for :ref:`N_Cr ` time parameter. - :parameter dlc: Base CAN DLC value to use for CAN packets. - :parameter use_data_optimization: Information whether to use :ref:`CAN Frame Data Optimization `. - :parameter filler_byte: Filler byte value to use for :ref:`CAN Frame Data Padding `. - :parameter flow_control_parameters_generator: Generator with Flow Control parameters to use. .. py:attribute:: _MAX_LISTENER_TIMEOUT :type: float :value: 4280000.0 Maximal timeout value accepted by python-can listeners. .. py:attribute:: _MIN_NOTIFIER_TIMEOUT :type: float :value: 0.001 Minimal timeout for notifiers that does not cause malfunctioning of listeners. .. py:attribute:: __frames_buffer .. py:attribute:: __async_frames_buffer .. py:attribute:: __notifier :type: Optional[can.Notifier] :value: None .. py:attribute:: __async_notifier :type: Optional[can.Notifier] :value: None .. py:method:: __del__() Safely close all threads opened by this object. .. py:method:: __teardown_notifier(suppress_warning = False) Stop and remove CAN frame notifier for synchronous communication. :param suppress_warning: Do not warn about mixing Synchronous and Asynchronous implementation. .. py:method:: __teardown_async_notifier(suppress_warning = False) Stop and remove CAN frame notifier for asynchronous communication. :param suppress_warning: Do not warn about mixing Synchronous and Asynchronous implementation. .. py:method:: __setup_notifier() Configure CAN frame notifier for synchronous communication. .. py:method:: __setup_async_notifier(loop) Configure CAN frame notifier for asynchronous communication. :param loop: An :mod:`asyncio` event loop to use. .. py:method:: _send_cf_packets_block(cf_packets_block, delay) Send block of Consecutive Frame CAN packets. :param cf_packets_block: Consecutive Frame CAN packets to send. :param delay: Minimal delay between sending following Consecutive Frames [ms]. :raise TransmissionInterruptionError: A new UDS message transmission was started while sending this message. :return: Records with historic information about transmitted Consecutive Frame CAN packets. .. py:method:: _async_send_cf_packets_block(cf_packets_block, delay, loop = None) :async: Send block of Consecutive Frame CAN packets asynchronously. :param cf_packets_block: Consecutive Frame CAN packets to send. :param delay: Minimal delay between sending following Consecutive Frames [ms]. :param loop: An asyncio event loop to use for scheduling this task. :raise TransmissionInterruptionError: A new UDS message transmission was started while sending this message. :return: Records with historic information about transmitted Consecutive Frame CAN packets. .. py:method:: _message_receive_start(initial_packet) Continue to receive message after receiving initial packet. :param initial_packet: Record of a packet initiating UDS message reception. :raise NotImplementedError: Unhandled CAN packet starting a new CAN message transmission was received. :return: Record of UDS message received. .. py:method:: _async_message_receive_start(initial_packet, loop = None) :async: Continue to receive message asynchronously after receiving initial packet. :param initial_packet: Record of a packet initiating UDS message reception. :param loop: An asyncio event loop used for observing messages. :raise NotImplementedError: Unhandled CAN packet starting a new CAN message transmission was received. :return: Record of UDS message received. .. py:method:: _receive_cf_packets_block(sequence_number, block_size, remaining_data_length) Receive block of :ref:`Consecutive Frames `. :param sequence_number: Current :ref:`Sequence Number ` (next Consecutive Frame shall have this value set). :param block_size: :ref:`Block Size ` value sent in the last :ref:`Flow Control CAN packet `. :param remaining_data_length: Number of remaining data bytes to receive in UDS message. :return: Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames. .. py:method:: _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, loop = None) :async: Receive asynchronously block of :ref:`Consecutive Frames `. :param sequence_number: Current :ref:`Sequence Number ` (next Consecutive Frame shall have this value set). :param block_size: :ref:`Block Size ` value sent in the last :ref:`Flow Control CAN packet `. :param remaining_data_length: Number of remaining data bytes to receive in UDS message. :param loop: An asyncio event loop used for observing messages. :return: Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames. .. py:method:: _receive_consecutive_frames(first_frame) Receive Consecutive Frames after reception of First Frame. :param first_frame: :ref:`First Frame ` that was received. :raise TimeoutError: :ref:`N_Cr ` timeout was reached. :raise OverflowError: Flow Control packet with :ref:`Flow Status ` equal to OVERFLOW was sent. :return: Record of UDS message that was formed provided First Frame and received Consecutive Frames. .. py:method:: _async_receive_consecutive_frames(first_frame, loop = None) :async: Receive asynchronously Consecutive Frames after reception of First Frame. :param first_frame: :ref:`First Frame ` that was received. :param loop: An asyncio event loop used for observing messages. :raise TimeoutError: :ref:`N_Cr ` timeout was reached. :raise OverflowError: Flow Control packet with :ref:`Flow Status ` equal to OVERFLOW was sent. :raise NotImplementedError: Unhandled CAN packet starting a new CAN message transmission was received. :return: Record of UDS message that was formed provided First Frame and received Consecutive Frames. .. py:method:: clear_frames_buffers() Clear buffers with transmitted and received frames. .. warning:: This will cause that all CAN packets received in a past are no longer accessible. .. py:method:: is_supported_network_manager(bus_manager) :staticmethod: Check whether provided value is a bus manager that is supported by this Transport Interface. :param bus_manager: Value to check. :return: True if provided bus object is compatible with this Transport Interface, False otherwise. .. py:method:: send_packet(packet) Transmit CAN packet. .. warning:: Must not be called within an asynchronous function. :param packet: CAN packet to send. :raise TypeError: Provided packet is not CAN packet. :raise TimeoutError: Timeout was reached before a CAN packet could be transmitted. :return: Record with historic information about transmitted CAN packet. .. py:method:: async_send_packet(packet, loop = None) :async: Transmit asynchronously CAN packet. :param packet: CAN packet to send. :param loop: An asyncio event loop used for observing messages. :raise TypeError: Provided packet is not CAN packet. :raise TimeoutError: Timeout was reached before a CAN packet could be transmitted. :return: Record with historic information about transmitted CAN packet. .. py:method:: receive_packet(timeout = None) Receive CAN packet. .. warning:: Must not be called within an asynchronous function. :param timeout: Maximal time (in milliseconds) to wait. :raise TypeError: Provided timeout value is not None neither int nor float type. :raise ValueError: Provided timeout value is less or equal 0. :raise TimeoutError: Timeout was reached before a CAN packet was received. :return: Record with historic information about received CAN packet. .. py:method:: async_receive_packet(timeout = None, loop = None) :async: Receive asynchronously CAN packet. :param timeout: Maximal time (in milliseconds) to wait. :param loop: An asyncio event loop used for observing messages. :raise TypeError: Provided timeout value is not None neither int nor float type. :raise ValueError: Provided timeout value is less or equal 0. :raise TimeoutError: Timeout was reached before a CAN packet was received. :return: Record with historic information about received CAN packet. .. py:method:: send_message(message) Transmit UDS message over CAN. .. warning:: Must not be called within an asynchronous function. :param message: A message to send. :raise OverflowError: Flow Control packet with Flow Status equal to OVERFLOW was received. :raise TransmissionInterruptionError: A new UDS message transmission was started while sending this message. :raise NotImplementedError: Flow Control CAN packet with unknown Flow Status was received. :return: Record with historic information about transmitted UDS message. .. py:method:: async_send_message(message, loop = None) :async: Transmit asynchronously UDS message over CAN. :param message: A message to send. :param loop: An asyncio event loop to use for scheduling this task. :raise OverflowError: Flow Control packet with Flow Status equal to OVERFLOW was received. :raise TransmissionInterruptionError: A new UDS message transmission was started while sending this message. :raise NotImplementedError: Flow Control CAN packet with unknown Flow Status was received. :return: Record with historic information about transmitted UDS message. .. py:method:: receive_message(timeout = None) Receive UDS message over CAN. .. warning:: Must not be called within an asynchronous function. :param timeout: Maximal time (in milliseconds) to wait for UDS message transmission to start. This means that receiving might last longer if First Frame was received within provided time. :raise TypeError: Provided timeout value is not None neither int nor float type. :raise ValueError: Provided timeout value is less or equal 0. :raise TimeoutError: Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached. :return: Record with historic information about received UDS message. .. py:method:: async_receive_message(timeout = None, loop = None) :async: Receive asynchronously UDS message over CAN. :param timeout: Maximal time (in milliseconds) to wait for UDS message transmission to start. This means that receiving might last longer if First Frame was received within provided time. :param loop: An asyncio event loop to use for scheduling this task. :raise TypeError: Provided timeout value is not None neither int nor float type. :raise ValueError: Provided timeout value is less or equal 0. :raise TimeoutError: Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached. :return: Record with historic information about received UDS message.