uds.can.transport_interface.python_can
Implementation of UDS Transport Interface for CAN bus using python-can as bus manager.
Classes
Transport Interface for managing UDS on CAN with python-can package as bus handler. |
Module Contents
- class uds.can.transport_interface.python_can.PythonCanTransportInterface(network_manager, addressing_information, notifier=None, async_notifier=None, **configuration_params)[source]
Bases:
uds.can.transport_interface.common.AbstractCanTransportInterface
Transport Interface for managing UDS on CAN with python-can package as bus handler.
Note
Documentation for python-can package: https://python-can.readthedocs.io/
Create Transport Interface that uses python-can package to control CAN bus.
- Parameters:
network_manager (can.BusABC) – Python-can bus object for handling CAN network.
addressing_information (uds.can.addressing.AbstractCanAddressingInformation) – Addressing Information configuration of a simulated node that is taking part in DoCAN communication.
notifier (Optional[can.Notifier]) –
Python-can notifier object for reporting received and sent CAN Frames to listeners. Leave None to create new notifier when needed.
Warning
Only one notifier object shall be active at any time.
async_notifier (Optional[can.Notifier]) –
Python-can notifier object for reporting received and sent CAN Frames to async listeners. Leave None to create new notifier when needed.
Warning
Only one notifier object shall be active at any time.
configuration_params (Any) –
Additional configuration parameters.
- parameter n_as_timeout:
Timeout value for N_As time parameter.
- parameter n_ar_timeout:
Timeout value for N_Ar time parameter.
- parameter n_bs_timeout:
Timeout value for N_Bs time parameter.
- parameter n_br:
Value of N_Br time parameter to use in communication.
- parameter n_cs:
Value of N_Cs time parameter to use in communication.
- parameter n_cr_timeout:
Timeout value for N_Cr time parameter.
- parameter dlc:
Base CAN DLC value to use for CAN packets.
- parameter min_dlc:
Minimal CAN DLC to use for CAN Packets during Data Optimization.
- parameter use_data_optimization:
Information whether to use CAN Frame Data Optimization.
- parameter filler_byte:
Filler byte value to use for CAN Frame Data Padding.
- parameter flow_control_parameters_generator:
Generator with Flow Control parameters to use.
- parameter can_version:
Version of CAN protocol to be used for packets sending.
- parameter bitrate_switch:
Whether bitrate switch (BRS) shall be set in sent packets.
- _MIN_NOTIFIER_TIMEOUT: float = 0.001
Minimal timeout for notifiers that does not cause malfunctioning of listeners.
- network_manager: can.BusABC
Get network manager used by this Transport Interface.
Network manager handles Physical and Data layers (OSI Model) of the bus/network.
- property notifier: can.Notifier | None
Notifier used by python-can for reporting received and sent CAN Frames to listeners.
- Return type:
Optional[can.Notifier]
- property async_notifier: can.Notifier | None
Notifier used by python-can for reporting received and sent CAN Frames to async listeners.
- Return type:
Optional[can.Notifier]
- __rx_frames_buffer
- __tx_frames_buffer
- __fc_frames_buffer
- __async_rx_frames_buffer
- __async_tx_frames_buffer
- __async_fc_frames_buffer
- __setup_sync_listening()
Configure CAN frame notifier for synchronous communication.
- Return type:
None
- __setup_async_listening(loop)
Configure CAN frame notifier for asynchronous communication.
- Parameters:
loop (asyncio.AbstractEventLoop) – An
asyncioevent loop to use.- Return type:
None
- __teardown_sync_listening(suppress_warning=False)
Stop and remove CAN frame notifier for synchronous communication.
- Parameters:
suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.
- Return type:
None
- __teardown_async_listening(suppress_warning=False)
Stop and remove CAN frame notifier for asynchronous communication.
- Parameters:
suppress_warning (bool) – Do not warn about mixing Synchronous and Asynchronous implementation.
- Return type:
None
- static __validate_timeout(value)
Validate value of a timeout.
- Parameters:
value (Optional[uds.utilities.TimeMillisecondsAlias]) – Value of a timeout to check.
- Raises:
TypeError – Provided value is not int or float type.
ValueError – Provided value is a negative number.
- Return type:
None
- _send_cf_packets_block(cf_packets_block, delay, fc_transmission_timestamp)[source]
Send block of Consecutive Frame CAN packets.
- Parameters:
cf_packets_block (List[uds.can.packet.CanPacket]) – Consecutive Frame CAN packets to send.
delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].
fc_transmission_timestamp (float) – Transmission timestamp of the proceeding Flow Control packet.
- Returns:
Records with historic information about transmitted Consecutive Frame CAN packets.
- Return type:
Tuple[uds.can.packet.CanPacketRecord, Ellipsis]
- async _async_send_cf_packets_block(cf_packets_block, delay, fc_transmission_timestamp, loop)[source]
Send block of Consecutive Frame CAN packets asynchronously.
- Parameters:
cf_packets_block (List[uds.can.packet.CanPacket]) – Consecutive Frame CAN packets to send.
delay (uds.utilities.TimeMillisecondsAlias) – Minimal delay between sending following Consecutive Frames [ms].
fc_transmission_timestamp (float) – Transmission timestamp of the proceeding Flow Control packet.
loop (asyncio.AbstractEventLoop) – An asyncio event loop to use for scheduling this task.
- Returns:
Records with historic information about transmitted Consecutive Frame CAN packets.
- Return type:
Tuple[uds.can.packet.CanPacketRecord, Ellipsis]
- _wait_for_flow_control(last_packet_transmission_timestamp)[source]
Wait till Flow Control CAN Packet is received.
- Parameters:
last_packet_transmission_timestamp (float) – Timestamp when the last CAN Packet was transmitted.
- Returns:
Record with historic information about received Flow Control CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- async _async_wait_for_flow_control(last_packet_transmission_timestamp)[source]
Wait till Flow Control CAN Packet is received.
- Parameters:
last_packet_transmission_timestamp (float) – Timestamp when the last CAN Packet was transmitted.
- Returns:
Record with historic information about received Flow Control CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- _wait_for_rx_packet(buffer, timeout=None)[source]
Wait till CAN Packet is received.
- Parameters:
buffer (can.BufferedReader) – Listener to which CAN Packet would be delivered.
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.
- Raises:
TimeoutError – Timeout was reached before a CAN packet arrived.
- Returns:
Record with historic information about received CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- async _async_wait_for_rx_packet(buffer, timeout=None)[source]
Wait till CAN Packet is received.
- Parameters:
buffer (can.AsyncBufferedReader) – Listener to which CAN Packet would be delivered.
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.
- Raises:
TimeoutError – Timeout was reached before a CAN packet arrived.
- Returns:
Record with historic information about received CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- _wait_for_tx_frame(buffer, frame, timestamp)[source]
Wait for record of sent CAN frame.
- Parameters:
buffer (can.BufferedReader) – Listener to which CAN Frame would be delivered.
frame (can.Message) – Object of CAN frame that was scheduled for sending.
timestamp (float) – Timestamp when CAN frame transmission was started.
- Raises:
TimeoutError – Timeout was reached before a CAN frame was observed.
- Returns:
Record with historic information about sent CAN frame or None if not observed.
- Return type:
can.Message
- async _async_wait_for_tx_frame(buffer, frame, timestamp)[source]
Wait for record of sent CAN frame.
- Parameters:
buffer (can.AsyncBufferedReader) – Listener to which CAN Frame would be delivered.
frame (can.Message) – Object of CAN frame that was scheduled for sending.
timestamp (float) – Timestamp when CAN frame transmission was started.
- Raises:
TimeoutError – Timeout was reached before a CAN frame was observed.
- Returns:
Record with historic information about sent CAN frame or None if not observed.
- Return type:
can.Message
- _receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end)[source]
Receive block of Consecutive Frames.
- Parameters:
sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).
block_size (int) – Block Size value sent in the last Flow Control CAN packet.
remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.
timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.
- Raises:
TimeoutError – Timeout was reached. Either: - Consecutive Frame did not arrive before reaching N_Cr timeout - Diagnostic message reception
- Returns:
Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.
- Return type:
Union[uds.message.UdsMessageRecord, Tuple[uds.can.packet.CanPacketRecord, Ellipsis]]
- async _async_receive_cf_packets_block(sequence_number, block_size, remaining_data_length, timestamp_end, loop)[source]
Receive asynchronously block of Consecutive Frames.
- Parameters:
sequence_number (int) – Current Sequence Number (next Consecutive Frame shall have this value set).
block_size (int) – Block Size value sent in the last Flow Control CAN packet.
remaining_data_length (int) – Number of remaining data bytes to receive in UDS message.
timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.
loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.
- Returns:
Either: - Record of UDS message if reception was interrupted by a new UDS message transmission. - Tuple with records of received Consecutive Frames.
- Return type:
Union[uds.message.UdsMessageRecord, Tuple[uds.can.packet.CanPacketRecord, Ellipsis]]
- _receive_consecutive_frames(first_frame, timestamp_end)[source]
Receive Consecutive Frames after reception of First Frame.
- Parameters:
first_frame (uds.can.packet.CanPacketRecord) – First Frame that was received.
timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.
- Raises:
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.
- Returns:
Record of UDS message that was formed provided First Frame and received Consecutive Frames.
- Return type:
uds.message.UdsMessageRecord
- async _async_receive_consecutive_frames(first_frame, timestamp_end, loop)[source]
Receive asynchronously Consecutive Frames after reception of First Frame.
- Parameters:
first_frame (uds.can.packet.CanPacketRecord) – First Frame that was received.
timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.
loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.
- Raises:
TimeoutError – N_Cr timeout was reached.
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was sent.
NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.
- Returns:
Record of UDS message that was formed provided First Frame and received Consecutive Frames.
- Return type:
uds.message.UdsMessageRecord
- _message_receive_start(initial_packet, timestamp_end)[source]
Continue to receive message after receiving initial packet.
- Parameters:
initial_packet (uds.can.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.
timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.
- Raises:
NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.
- Returns:
Record of UDS message received.
- Return type:
uds.message.UdsMessageRecord
- async _async_message_receive_start(initial_packet, timestamp_end, loop)[source]
Continue to receive message asynchronously after receiving initial packet.
- Parameters:
initial_packet (uds.can.packet.CanPacketRecord) – Record of a packet initiating UDS message reception.
timestamp_end (Optional[uds.utilities.TimestampAlias]) – The final timestamp till when the reception must be completed.
loop (asyncio.AbstractEventLoop) – An asyncio event loop used for observing messages.
- Raises:
NotImplementedError – Unhandled CAN packet starting a new CAN message transmission was received.
- Returns:
Record of UDS message received.
- Return type:
uds.message.UdsMessageRecord
- clear_rx_frames_buffers()[source]
Clear buffers used for storing received CAN frames.
Warning
This will cause that all CAN packets received in a past are no longer accessible.
- Return type:
None
- clear_tx_frames_buffers()[source]
Clear buffers used for storing transmitted CAN frames with CAN packets.
- Return type:
None
- clear_fc_frames_buffers()[source]
Clear buffers used for storing received CAN frames with Flow Control CAN packets.
- Return type:
None
- static is_supported_network_manager(bus_manager)[source]
Check whether provided value is a bus manager that is supported by this Transport Interface.
- Parameters:
bus_manager (Any) – Value to check.
- Returns:
True if provided bus object is compatible with this Transport Interface, False otherwise.
- Return type:
- send_packet(packet)[source]
Transmit CAN packet.
Warning
Must not be called within an asynchronous function.
- Parameters:
packet (uds.can.packet.CanPacket) – CAN packet to send.
- Raises:
TypeError – Provided packet is not CAN packet.
- Returns:
Record with historic information about transmitted CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- async async_send_packet(packet, loop=None)[source]
Transmit asynchronously CAN packet.
- Parameters:
packet (uds.can.packet.CanPacket) – CAN packet to send.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Returns:
Record with historic information about transmitted CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- receive_packet(timeout=None)[source]
Receive CAN packet.
Warning
Must not be called within an asynchronous function.
- Parameters:
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.
- Returns:
Record with historic information about received CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- async async_receive_packet(timeout=None, loop=None)[source]
Receive asynchronously CAN packet.
- Parameters:
timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait. Leave None to wait forever.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop used for observing messages.
- Returns:
Record with historic information about received CAN packet.
- Return type:
uds.can.packet.CanPacketRecord
- send_message(message)[source]
Transmit UDS message over CAN.
Warning
Must not be called within an asynchronous function.
- Parameters:
message (uds.message.UdsMessage) – A message to send.
- Raises:
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.
TransmissionInterruptionError – A new UDS message transmission was started while sending this message.
NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.
- Returns:
Record with historic information about transmitted UDS message.
- Return type:
uds.message.UdsMessageRecord
- async async_send_message(message, loop=None)[source]
Transmit asynchronously UDS message over CAN.
- Parameters:
message (uds.message.UdsMessage) – A message to send.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.
- Raises:
OverflowError – Flow Control packet with Flow Status equal to OVERFLOW was received.
TransmissionInterruptionError – A new UDS message transmission was started while sending this message.
NotImplementedError – Flow Control CAN packet with unknown Flow Status was received.
- Returns:
Record with historic information about transmitted UDS message.
- Return type:
uds.message.UdsMessageRecord
- receive_message(start_timeout=None, end_timeout=None)[source]
Receive UDS message over CAN.
- Parameters:
start_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever.
end_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever.
- Raises:
MessageTransmissionNotStartedError – Timeout was exceeded before message reception started.
TimeoutError – Timeout was exceeded during message receiving (before all packets received).
- Returns:
Record with historic information about received UDS message.
- Return type:
uds.message.UdsMessageRecord
- async async_receive_message(start_timeout=None, end_timeout=None, loop=None)[source]
Receive asynchronously UDS message over CAN.
- Parameters:
start_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for the start of a message transmission. Leave None to wait forever.
end_timeout (Optional[uds.utilities.TimeMillisecondsAlias]) – Maximal time (in milliseconds) to wait for a message transmission to finish. Leave None to wait forever.
loop (Optional[asyncio.AbstractEventLoop]) – An asyncio event loop to use for scheduling this task.
- Raises:
MessageTransmissionNotStartedError – Timeout was exceeded before message reception started.
TimeoutError – Timeout was exceeded during message receiving (before all packets received).
- Returns:
Record with historic information about received UDS message.
- Return type:
uds.message.UdsMessageRecord