uds.transport_interface.can_transport_interface.python_can

Implementation of UDS Transport Interface for CAN bus using python-can as bus manager.

Classes

PyCanTransportInterface

Transport Interface for managing UDS on CAN with python-can package as bus handler.

Module Contents

class uds.transport_interface.can_transport_interface.python_can.PyCanTransportInterface(can_bus_manager, addressing_information, **kwargs)[source]

Bases: uds.transport_interface.can_transport_interface.common.AbstractCanTransportInterface

Inheritance diagram of uds.transport_interface.can_transport_interface.python_can.PyCanTransportInterface

Transport Interface for managing UDS on CAN with python-can package as bus handler.

Note

Documentation for python-can package: https://python-can.readthedocs.io/

Create python-can Transport Interface.

Parameters:
  • can_bus_manager (can.BusABC) –

    Python-can bus object for handling CAN.

    Warning

    Bus must have capability of receiving transmitted frames (receive_own_messages=True set).

  • addressing_information (uds.can.AbstractCanAddressingInformation) – Addressing Information of CAN Transport Interface.

  • kwargs (Any) –

    Optional arguments that are specific for CAN bus.

    • parameter n_as_timeout:

      Timeout value for N_As time parameter.

    • parameter n_ar_timeout:

      Timeout value for N_Ar time parameter.

    • parameter n_bs_timeout:

      Timeout value for N_Bs time parameter.

    • parameter n_br:

      Value of N_Br time parameter to use in communication.

    • parameter n_cs:

      Value of N_Cs time parameter to use in communication.

    • parameter n_cr_timeout:

      Timeout value for N_Cr time parameter.

    • parameter dlc:

      Base CAN DLC value to use for CAN packets.

    • parameter use_data_optimization:

      Information whether to use CAN Frame Data Optimization.

    • parameter filler_byte:

      Filler byte value to use for CAN Frame Data Padding.

    • parameter flow_control_parameters_generator:

      Generator with Flow Control parameters to use.

_MAX_LISTENER_TIMEOUT: float = 4280000.0

Maximal timeout value accepted by python-can listeners.

_MIN_NOTIFIER_TIMEOUT: float = 0.001

Minimal timeout for notifiers that does not cause malfunctioning of listeners.

__n_as_measured: uds.utilities.TimeMillisecondsAlias | None = None
__n_ar_measured: uds.utilities.TimeMillisecondsAlias | None = None
__frames_buffer
__notifier: can.Notifier | None = None
__async_frames_buffer
__async_notifier: can.Notifier | None = None
__del__()[source]

Safely close all threads open by this object.

property n_as_measured: uds.utilities.TimeMillisecondsAlias | None

Get the last measured value of N_As time parameter.

Note

The last measurement comes from the last transmission of Single Frame or First Fame CAN Packet using either send_packet() or async_send_packet() method.

Returns:

Time in milliseconds or None if the value was never measured.

Return type:

Optional[uds.utilities.TimeMillisecondsAlias]

property n_ar_measured: uds.utilities.TimeMillisecondsAlias | None

Get the last measured value of N_Ar time parameter.

Note

The last measurement comes from the last transmission of Flow Control CAN Packet using either send_packet() or async_send_packet() method.

Returns:

Time in milliseconds or None if the value was never measured.

Return type:

Optional[uds.utilities.TimeMillisecondsAlias]

__teardown_notifier(suppress_warning=False)

Stop and remove CAN frame notifier for synchronous communication.

Parameters:

suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.

Return type:

None

__teardown_async_notifier(suppress_warning=False)

Stop and remove CAN frame notifier for asynchronous communication.

Parameters:

suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.

Return type:

None

__setup_notifier()

Configure CAN frame notifier for synchronous communication.

Return type:

None

__setup_async_notifier(loop)

Configure CAN frame notifier for asynchronous communication.

Parameters:

loop (asyncio.AbstractEventLoop) – An asyncio event loop to use.

Return type:

None

_send_cf_packets_block(cf_packets_block, delay)[source]

Send block of Consecutive Frame CAN packets.

Parameters:
  • cf_packets_block (List[uds.packet.CanPacket]) – Consecutive Frame CAN packets to send.

  • delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].

Raises:

TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

Returns:

Records with historic information about transmitted Consecutive Frame CAN packets.

Return type:

Tuple[uds.packet.CanPacketRecord, Ellipsis]

async _async_send_cf_packets_block(cf_packets_block, delay, loop=None)[source]

Send block of Consecutive Frame CAN packets asynchronously.

Parameters:
  • cf_packets_block (List[uds.packet.CanPacket]) – Consecutive Frame CAN packets to send.

  • delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:

TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

Returns:

Records with historic information about transmitted Consecutive Frame CAN packets.

Return type:

Tuple[uds.packet.CanPacketRecord, Ellipsis]

_message_receive_start(initial_packet)[source]

Continue to receive message after receiving initial packet.

Parameters:

initial_packet (uds.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.

Raises:

NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message received.

Return type:

uds.message.UdsMessageRecord

async _async_message_receive_start(initial_packet, loop=None)[source]

Continue to receive message asynchronously after receiving initial packet.

Parameters:
  • initial_packet (uds.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Raises:

NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message received.

Return type:

uds.message.UdsMessageRecord

_receive_cf_packets_block(sequence_number, block_size, remaining_data_length)[source]

Receive block of Consecutive Frames.

Parameters:
  • sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).

  • block_size (int) – Block Size value sent in the last Flow Control CAN packet.

  • remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.

Returns:

Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.

Return type:

Union[uds.message.UdsMessageRecord, Tuple[uds.packet.CanPacketRecord, Ellipsis]]

async _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, loop=None)[source]

Receive asynchronously block of Consecutive Frames.

Parameters:
  • sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).

  • block_size (int) – Block Size value sent in the last Flow Control CAN packet.

  • remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Returns:

Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.

Return type:

Union[uds.message.UdsMessageRecord, Tuple[uds.packet.CanPacketRecord, Ellipsis]]

_receive_consecutive_frames(first_frame)[source]

Receive Consecutive Frames after reception of First Frame.

Parameters:

first_frame (uds.packet.CanPacketRecord) – First Frame that was received.

Raises:
  • TimeoutErrorN_Cr timeout was reached.

  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.

Returns:

Record of UDS message that was formed provided First Frame and received Consecutive Frames.

Return type:

uds.message.UdsMessageRecord

async _async_receive_consecutive_frames(first_frame, loop=None)[source]

Receive asynchronously Consecutive Frames after reception of First Frame.

Parameters:
  • first_frame (uds.packet.CanPacketRecord) – First Frame that was received.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Raises:
  • TimeoutErrorN_Cr timeout was reached.

  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.

  • NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message that was formed provided First Frame and received Consecutive Frames.

Return type:

uds.message.UdsMessageRecord

clear_frames_buffers()[source]

Clear buffers with transmitted and received frames.

Warning

This will cause that all CAN packets received in a past are no longer accessible.

Return type:

None

static is_supported_bus_manager(bus_manager)[source]

Check whether provided value is a bus manager that is supported by this Transport Interface.

Parameters:

bus_manager (Any) – Value to check.

Returns:

True if provided bus object is compatible with this Transport Interface, False otherwise.

Return type:

bool

send_packet(packet)[source]

Transmit CAN packet.

Warning

Must not be called within an asynchronous function.

Parameters:

packet (uds.packet.CanPacket) – CAN packet to send.

Raises:
  • TypeError – Provided packet is not CAN packet.

  • TimeoutError – Timeout was reached before a CAN packet could be transmitted.

Returns:

Record with historic information about transmitted CAN packet.

Return type:

uds.packet.CanPacketRecord

async async_send_packet(packet, loop=None)[source]

Transmit asynchronously CAN packet.

Parameters:
  • packet (uds.packet.CanPacket) – CAN packet to send.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Raises:
  • TypeError – Provided packet is not CAN packet.

  • TimeoutError – Timeout was reached before a CAN packet could be transmitted.

Returns:

Record with historic information about transmitted CAN packet.

Return type:

uds.packet.CanPacketRecord

receive_packet(timeout=None)[source]

Receive CAN packet.

Warning

Must not be called within an asynchronous function.

Parameters:

timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait.

Raises:
  • TypeError – Provided timeout value is not None neither int nor float type.

  • ValueError – Provided timeout value is less or equal 0.

  • TimeoutError – Timeout was reached before a CAN packet was received.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.packet.CanPacketRecord

async async_receive_packet(timeout=None, loop=None)[source]

Receive asynchronously CAN packet.

Parameters:
  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Raises:
  • TypeError – Provided timeout value is not None neither int nor float type.

  • ValueError – Provided timeout value is less or equal 0.

  • TimeoutError – Timeout was reached before a CAN packet was received.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.packet.CanPacketRecord

send_message(message)[source]

Transmit UDS message over CAN.

Warning

Must not be called within an asynchronous function.

Parameters:

message (uds.message.UdsMessage) – A message to send.

Raises:
  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.

  • TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

  • NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.

Returns:

Record with historic information about transmitted UDS message.

Return type:

uds.message.UdsMessageRecord

async async_send_message(message, loop=None)[source]

Transmit asynchronously UDS message over CAN.

Parameters:
  • message (uds.message.UdsMessage) – A message to send.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:
  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.

  • TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

  • NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.

Returns:

Record with historic information about transmitted UDS message.

Return type:

uds.message.UdsMessageRecord

receive_message(timeout=None)[source]

Receive UDS message over CAN.

Warning

Must not be called within an asynchronous function.

Parameters:

timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for UDS message transmission to start. This means that receiving might last longer if First Frame was received within provided time.

Raises:
  • TypeError – Provided timeout value is not None neither int nor float type.

  • ValueError – Provided timeout value is less or equal 0.

  • TimeoutError – Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached.

Returns:

Record with historic information about received UDS message.

Return type:

uds.message.UdsMessageRecord

async async_receive_message(timeout=None, loop=None)[source]

Receive asynchronously UDS message over CAN.

Parameters:
  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for UDS message transmission to start. This means that receiving might last longer if First Frame was received within provided time.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:
  • TypeError – Provided timeout value is not None neither int nor float type.

  • ValueError – Provided timeout value is less or equal 0.

  • TimeoutError – Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached.

Returns:

Record with historic information about received UDS message.

Return type:

uds.message.UdsMessageRecord