uds.can.transport_interface.python_can

Implementation of UDS Transport Interface for CAN bus using python-can as bus manager.

Classes

PyCanTransportInterface

Transport Interface for managing UDS on CAN with python-can package as bus handler.

Module Contents

class uds.can.transport_interface.python_can.PyCanTransportInterface(network_manager, addressing_information, **configuration_params)[source]

Bases: uds.can.transport_interface.common.AbstractCanTransportInterface

Inheritance diagram of uds.can.transport_interface.python_can.PyCanTransportInterface

Transport Interface for managing UDS on CAN with python-can package as bus handler.

Note

Documentation for python-can package: https://python-can.readthedocs.io/

Create Transport Interface that uses python-can package to control CAN bus.

Parameters:
  • network_manager (can.BusABC) – Python-can bus object for handling CAN network.

  • addressing_information (uds.can.addressing.AbstractCanAddressingInformation) – Addressing Information configuration of a simulated node that is taking part in DoCAN communication.

  • configuration_params (Any) –

    Additional configuration parameters.

    • parameter n_as_timeout:

      Timeout value for N_As time parameter.

    • parameter n_ar_timeout:

      Timeout value for N_Ar time parameter.

    • parameter n_bs_timeout:

      Timeout value for N_Bs time parameter.

    • parameter n_br:

      Value of N_Br time parameter to use in communication.

    • parameter n_cs:

      Value of N_Cs time parameter to use in communication.

    • parameter n_cr_timeout:

      Timeout value for N_Cr time parameter.

    • parameter dlc:

      Base CAN DLC value to use for CAN packets.

    • parameter min_dlc:

      min_dlc: Minimal CAN DLC to use for CAN Packets during Data Optimization.

    • parameter use_data_optimization:

      Information whether to use CAN Frame Data Optimization.

    • parameter filler_byte:

      Filler byte value to use for CAN Frame Data Padding.

    • parameter flow_control_parameters_generator:

      Generator with Flow Control parameters to use.

    • parameter can_version:

      Version of CAN protocol to be used for packets sending.

_MAX_LISTENER_TIMEOUT: float = 4280.0

Maximal timeout value accepted by python-can listeners.

_MIN_NOTIFIER_TIMEOUT: float = 0.001

Minimal timeout for notifiers that does not cause malfunctioning of listeners.

network_manager: can.BusABC

Get network manager used by this Transport Interface.

Network manager handles Physical and Data layers (OSI Model) of the bus/network.

__notifier: can.Notifier | None = None
__async_notifier: can.Notifier | None = None
__rx_frames_buffer
__async_rx_frames_buffer
__tx_frames_buffer
__async_tx_frames_buffer
__del__()[source]

Safely close all threads opened by this object.

Return type:

None

__teardown_notifier(suppress_warning=False)

Stop and remove CAN frame notifier for synchronous communication.

Parameters:

suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.

Return type:

None

__teardown_async_notifier(suppress_warning=False)

Stop and remove CAN frame notifier for asynchronous communication.

Parameters:

suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.

Return type:

None

__setup_notifier()

Configure CAN frame notifier for synchronous communication.

Return type:

None

__setup_async_notifier(loop)

Configure CAN frame notifier for asynchronous communication.

Parameters:

loop (asyncio.AbstractEventLoop) – An asyncio event loop to use.

Return type:

None

static __validate_timeout(value)

Validate value of a timeout.

Parameters:

value (Optional[uds.utilities.TimeMillisecondsAlias]) – Value of a timeout to check.

Raises:
  • TypeError – Provided value is not int or float type.

  • ValueError – Provided value is a negative number.

Return type:

None

_send_cf_packets_block(cf_packets_block, delay, fc_transmission_time)[source]

Send block of Consecutive Frame CAN packets.

Parameters:
  • cf_packets_block (List[uds.can.packet.CanPacket]) – Consecutive Frame CAN packets to send.

  • delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].

  • fc_transmission_time (datetime.datetime) – Transmission time of the proceeding Flow Control packet.

Returns:

Records with historic information about transmitted Consecutive Frame CAN packets.

Return type:

Tuple[uds.can.packet.CanPacketRecord, Ellipsis]

async _async_send_cf_packets_block(cf_packets_block, delay, fc_transmission_time, loop)[source]

Send block of Consecutive Frame CAN packets asynchronously.

Parameters:
  • cf_packets_block (List[uds.can.packet.CanPacket]) – Consecutive Frame CAN packets to send.

  • delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].

  • fc_transmission_time (datetime.datetime) – Transmission time of the proceeding Flow Control packet.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop to use for scheduling this task.

Returns:

Records with historic information about transmitted Consecutive Frame CAN packets.

Return type:

Tuple[uds.can.packet.CanPacketRecord, Ellipsis]

_wait_for_flow_control(last_packet_transmission_time)[source]

Wait till Flow Control CAN Packet is received.

Parameters:

last_packet_transmission_time (datetime.datetime) – Moment of time when the last CAN Packet was transmitted.

Returns:

Record with historic information about received Flow Control CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async _async_wait_for_flow_control(last_packet_transmission_time)[source]

Wait till Flow Control CAN Packet is received.

Parameters:

last_packet_transmission_time (datetime.datetime) – Moment of time when the last CAN Packet was transmitted.

Returns:

Record with historic information about received Flow Control CAN packet.

Return type:

uds.can.packet.CanPacketRecord

_wait_for_packet(buffer, timeout=None)[source]

Wait till CAN Packet is received.

Parameters:
  • buffer (can.BufferedReader) – Listener to which CAN Packet would be delivered.

  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

Raises:

TimeoutError – Timeout was reached before a CAN packet arrived.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async _async_wait_for_packet(buffer, timeout=None)[source]

Wait till CAN Packet is received.

Parameters:
  • buffer (can.AsyncBufferedReader) – Listener to which CAN Packet would be delivered.

  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

Raises:

TimeoutError – Timeout was reached before a CAN packet arrived.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end)[source]

Receive block of Consecutive Frames.

Parameters:
  • sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).

  • block_size (int) – Block Size value sent in the last Flow Control CAN packet.

  • remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

Raises:

TimeoutError – Timeout was reached. Either: - Consecutive Frame did not arrive before reaching N_Cr timeout - Diagnostic message reception

Returns:

Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.

Return type:

Union[uds.message.UdsMessageRecord, Tuple[uds.can.packet.CanPacketRecord, Ellipsis]]

async _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end, loop)[source]

Receive asynchronously block of Consecutive Frames.

Parameters:
  • sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).

  • block_size (int) – Block Size value sent in the last Flow Control CAN packet.

  • remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.

Returns:

Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.

Return type:

Union[uds.message.UdsMessageRecord, Tuple[uds.can.packet.CanPacketRecord, Ellipsis]]

_receive_consecutive_frames(first_frame, timestamp_end)[source]

Receive Consecutive Frames after reception of First Frame.

Parameters:
  • first_frame (uds.can.packet.CanPacketRecord) – First Frame that was received.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

Raises:

OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.

Returns:

Record of UDS message that was formed provided First Frame and received Consecutive Frames.

Return type:

uds.message.UdsMessageRecord

async _async_receive_consecutive_frames(first_frame, timestamp_end, loop)[source]

Receive asynchronously Consecutive Frames after reception of First Frame.

Parameters:
  • first_frame (uds.can.packet.CanPacketRecord) – First Frame that was received.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.

Raises:
Returns:

Record of UDS message that was formed provided First Frame and received Consecutive Frames.

Return type:

uds.message.UdsMessageRecord

_message_receive_start(initial_packet, timestamp_end)[source]

Continue to receive message after receiving initial packet.

Parameters:
  • initial_packet (uds.can.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

Raises:

NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message received.

Return type:

uds.message.UdsMessageRecord

async _async_message_receive_start(initial_packet, timestamp_end, loop)[source]

Continue to receive message asynchronously after receiving initial packet.

Parameters:
  • initial_packet (uds.can.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.

Raises:

NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message received.

Return type:

uds.message.UdsMessageRecord

clear_rx_frames_buffers()[source]

Clear buffers used for storing received CAN frames.

Warning

This will cause that all CAN packets received in a past are no longer accessible.

Return type:

None

clear_tx_frames_buffers()[source]

Clear buffers used for storing transmitted CAN frames.

Return type:

None

static is_supported_network_manager(bus_manager)[source]

Check whether provided value is a bus manager that is supported by this Transport Interface.

Parameters:

bus_manager (Any) – Value to check.

Returns:

True if provided bus object is compatible with this Transport Interface, False otherwise.

Return type:

bool

send_packet(packet)[source]

Transmit CAN packet.

Warning

Must not be called within an asynchronous function.

Parameters:

packet (uds.can.packet.CanPacket) – CAN packet to send.

Raises:

TypeError – Provided packet is not CAN packet.

Returns:

Record with historic information about transmitted CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async async_send_packet(packet, loop=None)[source]

Transmit asynchronously CAN packet.

Parameters:
  • packet (uds.can.packet.CanPacket) – CAN packet to send.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Returns:

Record with historic information about transmitted CAN packet.

Return type:

uds.can.packet.CanPacketRecord

receive_packet(timeout=None)[source]

Receive CAN packet.

Warning

Must not be called within an asynchronous function.

Parameters:

timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async async_receive_packet(timeout=None, loop=None)[source]

Receive asynchronously CAN packet.

Parameters:
  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

send_message(message)[source]

Transmit UDS message over CAN.

Warning

Must not be called within an asynchronous function.

Parameters:

message (uds.message.UdsMessage) – A message to send.

Raises:
  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.

  • TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

  • NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.

Returns:

Record with historic information about transmitted UDS message.

Return type:

uds.message.UdsMessageRecord

async async_send_message(message, loop=None)[source]

Transmit asynchronously UDS message over CAN.

Parameters:
  • message (uds.message.UdsMessage) – A message to send.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:
  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.

  • TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

  • NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.

Returns:

Record with historic information about transmitted UDS message.

Return type:

uds.message.UdsMessageRecord

receive_message(start_timeout=None, end_timeout=None)[source]

Receive UDS message over CAN.

Parameters:
  • start_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever.

  • end_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever.

Raises:

TimeoutError – Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached.

Returns:

Record with historic information about received UDS message.

Return type:

uds.message.UdsMessageRecord

async async_receive_message(start_timeout=None, end_timeout=None, loop=None)[source]

Receive asynchronously UDS message over CAN.

Parameters:
  • start_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever.

  • end_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:

TimeoutError – Timeout was reached. Either Single Frame / First Frame not received within [timeout] ms or N_As, N_Ar, N_Bs, N_Cr timeout reached.

Returns:

Record with historic information about received UDS message.

Return type:

uds.message.UdsMessageRecord