uds.can.transport_interface.python_can

Implementation of UDS Transport Interface for CAN bus using python-can as bus manager.

Classes

PythonCanTransportInterface

Transport Interface for managing UDS on CAN with python-can package as bus handler.

Module Contents

class uds.can.transport_interface.python_can.PythonCanTransportInterface(network_manager, addressing_information, notifier=None, async_notifier=None, **configuration_params)[source]

Bases: uds.can.transport_interface.common.AbstractCanTransportInterface

Inheritance diagram of uds.can.transport_interface.python_can.PythonCanTransportInterface

Transport Interface for managing UDS on CAN with python-can package as bus handler.

Note

Documentation for python-can package: https://python-can.readthedocs.io/

Create Transport Interface that uses python-can package to control CAN bus.

Parameters:
  • network_manager (can.BusABC) – Python-can bus object for handling CAN network.

  • addressing_information (uds.can.addressing.AbstractCanAddressingInformation) – Addressing Information configuration of a simulated node that is taking part in DoCAN communication.

  • notifier (Optional[can.Notifier]) –

    Python-can notifier object for reporting received and sent CAN Frames to listeners. Leave None to create new notifier when needed.

    Warning

    Only one notifier object shall be active at any time.

  • async_notifier (Optional[can.Notifier]) –

    Python-can notifier object for reporting received and sent CAN Frames to async listeners. Leave None to create new notifier when needed.

    Warning

    Only one notifier object shall be active at any time.

  • configuration_params (Any) –

    Additional configuration parameters.

    • parameter n_as_timeout:

      Timeout value for N_As time parameter.

    • parameter n_ar_timeout:

      Timeout value for N_Ar time parameter.

    • parameter n_bs_timeout:

      Timeout value for N_Bs time parameter.

    • parameter n_br:

      Value of N_Br time parameter to use in communication.

    • parameter n_cs:

      Value of N_Cs time parameter to use in communication.

    • parameter n_cr_timeout:

      Timeout value for N_Cr time parameter.

    • parameter dlc:

      Base CAN DLC value to use for CAN packets.

    • parameter min_dlc:

      Minimal CAN DLC to use for CAN Packets during Data Optimization.

    • parameter use_data_optimization:

      Information whether to use CAN Frame Data Optimization.

    • parameter filler_byte:

      Filler byte value to use for CAN Frame Data Padding.

    • parameter flow_control_parameters_generator:

      Generator with Flow Control parameters to use.

    • parameter can_version:

      Version of CAN protocol to be used for packets sending.

    • parameter bitrate_switch:

      Whether bitrate switch (BRS) shall be set in sent packets.

_TX_TOLERANCE: float = 0.01

Tolerance of CAN frames transmission.

_MAX_TX_WAIT: float = 0.005

Tolerance of CAN frames transmission.

_MAX_LISTENER_TIMEOUT: float = 4280.0

Maximal timeout value accepted by python-can listeners.

_MIN_NOTIFIER_TIMEOUT: float = 0.001

Minimal timeout for notifiers that does not cause malfunctioning of listeners.

network_manager: can.BusABC

Get network manager used by this Transport Interface.

Network manager handles Physical and Data layers (OSI Model) of the bus/network.

property notifier: can.Notifier | None

Notifier used by python-can for reporting received and sent CAN Frames to listeners.

Return type:

Optional[can.Notifier]

property async_notifier: can.Notifier | None

Notifier used by python-can for reporting received and sent CAN Frames to async listeners.

Return type:

Optional[can.Notifier]

__rx_frames_buffer
__tx_frames_buffer
__fc_frames_buffer
__async_rx_frames_buffer
__async_tx_frames_buffer
__async_fc_frames_buffer
__del__()[source]

Safely close all threads opened by this object.

Return type:

None

__setup_sync_listening()

Configure CAN frame notifier for synchronous communication.

Return type:

None

__setup_async_listening(loop)

Configure CAN frame notifier for asynchronous communication.

Parameters:

loop (asyncio.AbstractEventLoop) – An asyncio event loop to use.

Return type:

None

__teardown_sync_listening(suppress_warning=False)

Stop and remove CAN frame notifier for synchronous communication.

Parameters:

suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.

Return type:

None

__teardown_async_listening(suppress_warning=False)

Stop and remove CAN frame notifier for asynchronous communication.

Parameters:

suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.

Return type:

None

static __validate_timeout(value)

Validate value of a timeout.

Parameters:

value (Optional[uds.utilities.TimeMillisecondsAlias]) – Value of a timeout to check.

Raises:
  • TypeError – Provided value is not int or float type.

  • ValueError – Provided value is a negative number.

Return type:

None

_send_cf_packets_block(cf_packets_block, delay, fc_transmission_timestamp)[source]

Send block of Consecutive Frame CAN packets.

Parameters:
  • cf_packets_block (List[uds.can.packet.CanPacket]) – Consecutive Frame CAN packets to send.

  • delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].

  • fc_transmission_timestamp (float) – Transmission timestamp of the proceeding Flow Control packet.

Returns:

Records with historic information about transmitted Consecutive Frame CAN packets.

Return type:

Tuple[uds.can.packet.CanPacketRecord, Ellipsis]

async _async_send_cf_packets_block(cf_packets_block, delay, fc_transmission_timestamp, loop)[source]

Send block of Consecutive Frame CAN packets asynchronously.

Parameters:
  • cf_packets_block (List[uds.can.packet.CanPacket]) – Consecutive Frame CAN packets to send.

  • delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].

  • fc_transmission_timestamp (float) – Transmission timestamp of the proceeding Flow Control packet.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop to use for scheduling this task.

Returns:

Records with historic information about transmitted Consecutive Frame CAN packets.

Return type:

Tuple[uds.can.packet.CanPacketRecord, Ellipsis]

_wait_for_flow_control(last_packet_transmission_timestamp)[source]

Wait till Flow Control CAN Packet is received.

Parameters:

last_packet_transmission_timestamp (float) – Timestamp when the last CAN Packet was transmitted.

Returns:

Record with historic information about received Flow Control CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async _async_wait_for_flow_control(last_packet_transmission_timestamp)[source]

Wait till Flow Control CAN Packet is received.

Parameters:

last_packet_transmission_timestamp (float) – Timestamp when the last CAN Packet was transmitted.

Returns:

Record with historic information about received Flow Control CAN packet.

Return type:

uds.can.packet.CanPacketRecord

_wait_for_rx_packet(buffer, timeout=None)[source]

Wait till CAN Packet is received.

Parameters:
  • buffer (can.BufferedReader) – Listener to which CAN Packet would be delivered.

  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

Raises:

TimeoutError – Timeout was reached before a CAN packet arrived.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async _async_wait_for_rx_packet(buffer, timeout=None)[source]

Wait till CAN Packet is received.

Parameters:
  • buffer (can.AsyncBufferedReader) – Listener to which CAN Packet would be delivered.

  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

Raises:

TimeoutError – Timeout was reached before a CAN packet arrived.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

_wait_for_tx_frame(buffer, frame, timestamp)[source]

Wait for record of sent CAN frame.

Parameters:
  • buffer (can.BufferedReader) – Listener to which CAN Frame would be delivered.

  • frame (can.Message) – Object of CAN frame that was scheduled for sending.

  • timestamp (float) – Timestamp when CAN frame transmission was started.

Raises:

TimeoutError – Timeout was reached before a CAN frame was observed.

Returns:

Record with historic information about sent CAN frame or None if not observed.

Return type:

can.Message

async _async_wait_for_tx_frame(buffer, frame, timestamp)[source]

Wait for record of sent CAN frame.

Parameters:
  • buffer (can.AsyncBufferedReader) – Listener to which CAN Frame would be delivered.

  • frame (can.Message) – Object of CAN frame that was scheduled for sending.

  • timestamp (float) – Timestamp when CAN frame transmission was started.

Raises:

TimeoutError – Timeout was reached before a CAN frame was observed.

Returns:

Record with historic information about sent CAN frame or None if not observed.

Return type:

can.Message

_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end)[source]

Receive block of Consecutive Frames.

Parameters:
  • sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).

  • block_size (int) – Block Size value sent in the last Flow Control CAN packet.

  • remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

Raises:

TimeoutError – Timeout was reached. Either: - Consecutive Frame did not arrive before reaching N_Cr timeout - Diagnostic message reception

Returns:

Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.

Return type:

Union[uds.message.UdsMessageRecord, Tuple[uds.can.packet.CanPacketRecord, Ellipsis]]

async _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end, loop)[source]

Receive asynchronously block of Consecutive Frames.

Parameters:
  • sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).

  • block_size (int) – Block Size value sent in the last Flow Control CAN packet.

  • remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.

Returns:

Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.

Return type:

Union[uds.message.UdsMessageRecord, Tuple[uds.can.packet.CanPacketRecord, Ellipsis]]

_receive_consecutive_frames(first_frame, timestamp_end)[source]

Receive Consecutive Frames after reception of First Frame.

Parameters:
  • first_frame (uds.can.packet.CanPacketRecord) – First Frame that was received.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

Raises:

OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.

Returns:

Record of UDS message that was formed provided First Frame and received Consecutive Frames.

Return type:

uds.message.UdsMessageRecord

async _async_receive_consecutive_frames(first_frame, timestamp_end, loop)[source]

Receive asynchronously Consecutive Frames after reception of First Frame.

Parameters:
  • first_frame (uds.can.packet.CanPacketRecord) – First Frame that was received.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.

Raises:
Returns:

Record of UDS message that was formed provided First Frame and received Consecutive Frames.

Return type:

uds.message.UdsMessageRecord

_message_receive_start(initial_packet, timestamp_end)[source]

Continue to receive message after receiving initial packet.

Parameters:
  • initial_packet (uds.can.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

Raises:

NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message received.

Return type:

uds.message.UdsMessageRecord

async _async_message_receive_start(initial_packet, timestamp_end, loop)[source]

Continue to receive message asynchronously after receiving initial packet.

Parameters:
  • initial_packet (uds.can.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.

  • timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.

  • loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.

Raises:

NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.

Returns:

Record of UDS message received.

Return type:

uds.message.UdsMessageRecord

clear_rx_frames_buffers()[source]

Clear buffers used for storing received CAN frames.

Warning

This will cause that all CAN packets received in a past are no longer accessible.

Return type:

None

clear_tx_frames_buffers()[source]

Clear buffers used for storing transmitted CAN frames with CAN packets.

Return type:

None

clear_fc_frames_buffers()[source]

Clear buffers used for storing received CAN frames with Flow Control CAN packets.

Return type:

None

static is_supported_network_manager(bus_manager)[source]

Check whether provided value is a bus manager that is supported by this Transport Interface.

Parameters:

bus_manager (Any) – Value to check.

Returns:

True if provided bus object is compatible with this Transport Interface, False otherwise.

Return type:

bool

send_packet(packet)[source]

Transmit CAN packet.

Warning

Must not be called within an asynchronous function.

Parameters:

packet (uds.can.packet.CanPacket) – CAN packet to send.

Raises:

TypeError – Provided packet is not CAN packet.

Returns:

Record with historic information about transmitted CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async async_send_packet(packet, loop=None)[source]

Transmit asynchronously CAN packet.

Parameters:
  • packet (uds.can.packet.CanPacket) – CAN packet to send.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Returns:

Record with historic information about transmitted CAN packet.

Return type:

uds.can.packet.CanPacketRecord

receive_packet(timeout=None)[source]

Receive CAN packet.

Warning

Must not be called within an asynchronous function.

Parameters:

timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

async async_receive_packet(timeout=None, loop=None)[source]

Receive asynchronously CAN packet.

Parameters:
  • timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.

Returns:

Record with historic information about received CAN packet.

Return type:

uds.can.packet.CanPacketRecord

send_message(message)[source]

Transmit UDS message over CAN.

Warning

Must not be called within an asynchronous function.

Parameters:

message (uds.message.UdsMessage) – A message to send.

Raises:
  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.

  • TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

  • NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.

Returns:

Record with historic information about transmitted UDS message.

Return type:

uds.message.UdsMessageRecord

async async_send_message(message, loop=None)[source]

Transmit asynchronously UDS message over CAN.

Parameters:
  • message (uds.message.UdsMessage) – A message to send.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:
  • OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.

  • TransmissionInterruptionError – A new UDS message transmission was started while sending this message.

  • NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.

Returns:

Record with historic information about transmitted UDS message.

Return type:

uds.message.UdsMessageRecord

receive_message(start_timeout=None, end_timeout=None)[source]

Receive UDS message over CAN.

Parameters:
  • start_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever.

  • end_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever.

Raises:
Returns:

Record with historic information about received UDS message.

Return type:

uds.message.UdsMessageRecord

async async_receive_message(start_timeout=None, end_timeout=None, loop=None)[source]

Receive asynchronously UDS message over CAN.

Parameters:
  • start_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever.

  • end_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever.

  • loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.

Raises:
Returns:

Record with historic information about received UDS message.

Return type:

uds.message.UdsMessageRecord