Source code for uds.can.transport_interface.common

"""Definition of UDS Transport Interface for CAN bus."""

__all__ = ["AbstractCanTransportInterface"]

from abc import ABC
from typing import Any, Optional, Tuple
from warnings import warn

from uds.addressing import TransmissionDirection
from uds.message import UdsMessageRecord
from uds.transport_interface import AbstractTransportInterface
from uds.utilities import TimeMillisecondsAlias, ValueWarning

from ..addressing import AbstractCanAddressingInformation
from ..frame import CanVersion
from ..packet import AbstractFlowControlParametersGenerator, CanPacketType, DefaultFlowControlParametersGenerator
from ..segmenter import CanSegmenter


[docs] class AbstractCanTransportInterface(AbstractTransportInterface, ABC): """ Abstract definition of Transport Interface for managing Diagnostics on CAN. CAN Transport Interfaces are meant to handle UDS middle layers (Transport and Network) on CAN bus. """ N_AS_TIMEOUT: TimeMillisecondsAlias = 1000 """Timeout value of :ref:`N_As <knowledge-base-can-n-as>` time parameter according to ISO 15765-2.""" N_AR_TIMEOUT: TimeMillisecondsAlias = 1000 """Timeout value of :ref:`N_Ar <knowledge-base-can-n-ar>` time parameter according to ISO 15765-2.""" N_BS_TIMEOUT: TimeMillisecondsAlias = 1000 """Timeout value of :ref:`N_Bs <knowledge-base-can-n-bs>` time parameter according to ISO 15765-2.""" N_CR_TIMEOUT: TimeMillisecondsAlias = 1000 """Timeout value of :ref:`N_Cr <knowledge-base-can-n-cr>` time parameter according to ISO 15765-2.""" DEFAULT_N_BR: TimeMillisecondsAlias = 0 """Default value for :ref:`N_Br <knowledge-base-can-n-br>` time parameter.""" DEFAULT_N_CS: Optional[TimeMillisecondsAlias] = None """Default value for :ref:`N_Cs <knowledge-base-can-n-cs>` time parameter.""" DEFAULT_FLOW_CONTROL_PARAMETERS = DefaultFlowControlParametersGenerator() """Default values generator for :ref:`Flow Control <knowledge-base-can-flow-control>` parameters (:ref:`Flow Status <knowledge-base-can-flow-status>`, :ref:`Block Size <knowledge-base-can-block-size>`, :ref:`Separation Time minimum <knowledge-base-can-st-min>`).""" def __init__(self, network_manager: Any, addressing_information: AbstractCanAddressingInformation, n_as_timeout: TimeMillisecondsAlias = N_AS_TIMEOUT, n_ar_timeout: TimeMillisecondsAlias = N_AR_TIMEOUT, n_bs_timeout: TimeMillisecondsAlias = N_BS_TIMEOUT, n_cr_timeout: TimeMillisecondsAlias = N_CR_TIMEOUT, n_br: TimeMillisecondsAlias = DEFAULT_N_BR, n_cs: Optional[TimeMillisecondsAlias] = DEFAULT_N_CS, flow_control_parameters_generator: AbstractFlowControlParametersGenerator = DEFAULT_FLOW_CONTROL_PARAMETERS, can_version: CanVersion = CanVersion.CLASSIC_CAN, bitrate_switch: bool = False, **segmenter_configuration: Any) -> None: """ Create Transport Interface (an object for handling UDS Transport and Network layers). :param network_manager: An object that handles CAN bus (Physical and Data layers of OSI Model). :param addressing_information: Addressing Information configuration of a simulated node that is taking part in DoCAN communication. :param n_as_timeout: Timeout value for :ref:`N_As <knowledge-base-can-n-as>` time parameter. :param n_ar_timeout: Timeout value for :ref:`N_Ar <knowledge-base-can-n-ar>` time parameter. :param n_bs_timeout: Timeout value for :ref:`N_Bs <knowledge-base-can-n-bs>` time parameter. :param n_cr_timeout: Timeout value for :ref:`N_Cr <knowledge-base-can-n-cr>` time parameter. :param n_br: Value of :ref:`N_Br <knowledge-base-can-n-br>` time parameter to use in communication. :param n_cs: Value of :ref:`N_Cs <knowledge-base-can-n-cs>` time parameter to use in communication. :param flow_control_parameters_generator: Generator with Flow Control parameters to use. :param can_version: Version of CAN protocol to be used for packets sending. :param bitrate_switch: Whether bitrate switch (BRS) shall be set in sent packets. :param segmenter_configuration: Configuration parameters for CAN Segmenter. - :parameter dlc: Base CAN DLC value to use for CAN packets. - :parameter min_dlc: Minimal CAN DLC to use for CAN Packets during Data Optimization. - :parameter use_data_optimization: Information whether to use :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>`. - :parameter filler_byte: Filler byte value to use for :ref:`CAN Frame Data Padding <knowledge-base-can-frame-data-padding>`. """ super().__init__(network_manager=network_manager) self.__n_ar_measured: Optional[TimeMillisecondsAlias] = None self.__n_as_measured: Optional[TimeMillisecondsAlias] = None self.__n_bs_measured: Optional[Tuple[TimeMillisecondsAlias, ...]] = None self.__n_cr_measured: Optional[Tuple[TimeMillisecondsAlias, ...]] = None self.n_as_timeout = n_as_timeout self.n_ar_timeout = n_ar_timeout self.n_bs_timeout = n_bs_timeout self.n_cr_timeout = n_cr_timeout self.n_br = n_br self.n_cs = n_cs self.flow_control_parameters_generator = flow_control_parameters_generator self.segmenter = CanSegmenter(addressing_information=addressing_information, **segmenter_configuration) self.can_version = can_version self.bitrate_switch = bitrate_switch # General @property def segmenter(self) -> CanSegmenter: """Get the segmenter used by this CAN Transport Interface.""" return self.__segmenter @segmenter.setter def segmenter(self, value: CanSegmenter) -> None: """ Set segmenter value for this Transport Interface. :param value: CAN Segmenter value to set. :raise TypeError: Provided value is not CAN Segmenter. """ if not isinstance(value, CanSegmenter): raise TypeError(f"Provided value is not CAN Segmenter type. Actual type: {type(value)}.") self.__segmenter = value # Communication parameters @property def can_version(self) -> CanVersion: """Get version of CAN protocol to be used for packets sending.""" return self.__can_version @can_version.setter def can_version(self, value: CanVersion) -> None: """ Set version of CAN protocol to be used for packets sending. .. warning:: Value cross-check with other attributes (e.g. DLC) is not performed. .. note:: Frames with DLC > 8 will always be sent as CAN FD, regardless of this value, as Classic CAN cannot support DLC values greater than 8. :param value: Value to set. """ self.__can_version = CanVersion.validate_member(value) @property def bitrate_switch(self) -> bool: """Get value of bitrate switch (BRS) to be used for packets sending.""" return self.__bitrate_switch @bitrate_switch.setter def bitrate_switch(self, value: bool) -> None: """ Set value of bitrate switch (BRS) to be used for packets sending. .. note:: This value will be ignored if CLASSICAL CAN is used. :param value: Value to set. """ self.__bitrate_switch = bool(value) @property def dlc(self) -> int: """ Value of base CAN DLC to use for output CAN packets. .. note:: All output CAN packets will have this DLC value set unless :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` is used. """ return self.segmenter.dlc @dlc.setter def dlc(self, value: int) -> None: """ Set value of base CAN DLC to use for output CAN packets. :param value: Value to set. """ self.segmenter.dlc = value @property def min_dlc(self) -> Optional[int]: """ Value of minimal CAN DLC to use for CAN Packets during Data Optimization. .. note:: Output CAN Packets (created by :meth:`~uds.segmentation.can_segmenter.CanSegmenter.segmentation`) will never have DLC smaller than this value even if :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` is used. """ return self.segmenter.min_dlc @min_dlc.setter def min_dlc(self, value: Optional[int]) -> None: """ Set value of minimal CAN DLC to use for CAN Packets during Data Optimization. :param value: Value to set. """ self.segmenter.min_dlc = value @property def use_data_optimization(self) -> bool: """ Information whether to use CAN Frame Data Optimization during CAN packets creation. .. seealso:: :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` """ return self.segmenter.use_data_optimization @use_data_optimization.setter def use_data_optimization(self, value: bool) -> None: """ Set whether to use CAN Frame Data Optimization during CAN packets creation. .. seealso:: :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` :param value: Value to set. """ self.segmenter.use_data_optimization = value @property def filler_byte(self) -> int: """ Filler byte value to use for output CAN Frame Data Padding during segmentation. .. seealso:: :ref:`CAN Frame Data Padding <knowledge-base-can-frame-data-padding>` """ return self.segmenter.filler_byte @filler_byte.setter def filler_byte(self, value: int) -> None: """ Set value of filler byte to use for output CAN Frame Data Padding. .. seealso:: :ref:`CAN Frame Data Padding <knowledge-base-can-frame-data-padding>` :param value: Value to set. """ self.segmenter.filler_byte = value @property def flow_control_parameters_generator(self) -> AbstractFlowControlParametersGenerator: """Get generator of Flow Control parameters (Flow Status, Block Size, Separation Time minimum).""" return self.__flow_control_parameters_generator @flow_control_parameters_generator.setter def flow_control_parameters_generator(self, value: AbstractFlowControlParametersGenerator) -> None: """ Set value of Flow Control parameters (Flow Status, Block Size, Separation Time minimum) generator. :param value: Value to set. """ if not isinstance(value, AbstractFlowControlParametersGenerator): raise TypeError("Provided Flow Control parameters generator value has incorrect type. " f"Actual type: {type(value)}.") self.__flow_control_parameters_generator = value # Time parameter - CAN Network Layer @property def n_as_timeout(self) -> TimeMillisecondsAlias: """Timeout value for :ref:`N_As <knowledge-base-can-n-as>` time parameter.""" return self.__n_as_timeout @n_as_timeout.setter def n_as_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for :ref:`N_As <knowledge-base-can-n-as>` time parameter. :param value: Value of timeout to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is less or equal to 0. """ if not isinstance(value, (int, float)): raise TypeError(f"Provided time parameter value must be int or float type. Actual type: {type(value)}.") if value <= 0: raise ValueError(f"Provided time parameter value must be greater than 0. Actual value: {value}") if value != self.N_AS_TIMEOUT: warn(message="Non-default value of N_As timeout was set.", category=ValueWarning) self.__n_as_timeout = value @property def n_as_measured(self) -> Optional[TimeMillisecondsAlias]: """ Get the last measured value of :ref:`N_As <knowledge-base-can-n-as>` time parameter. .. note:: The last measurement comes from the last transmission of Single Frame or First Fame CAN Packet using either :meth:`~uds.can.transport_interface.common.AbstractCanTransportInterface.send_packet` or :meth:`~uds.can.transport_interface.common.AbstractCanTransportInterface.async_send_packet` method. :return: Time in milliseconds or None if the value was never measured. """ return self.__n_as_measured @property def n_ar_timeout(self) -> TimeMillisecondsAlias: """Timeout value for :ref:`N_Ar <knowledge-base-can-n-ar>` time parameter.""" return self.__n_ar_timeout @n_ar_timeout.setter def n_ar_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for :ref:`N_Ar <knowledge-base-can-n-ar>` time parameter. :param value: Value of timeout to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is less or equal to 0. """ if not isinstance(value, (int, float)): raise TypeError(f"Provided time parameter value must be int or float type. Actual type: {type(value)}.") if value <= 0: raise ValueError(f"Provided time parameter value must be greater than 0. Actual value: {value}") if value != self.N_AR_TIMEOUT: warn(message="Non-default value of N_Ar timeout was set.", category=ValueWarning) self.__n_ar_timeout = value @property def n_ar_measured(self) -> Optional[TimeMillisecondsAlias]: """ Get the last measured value of :ref:`N_Ar <knowledge-base-can-n-ar>` time parameter. .. note:: The last measurement comes from the last transmission of Flow Control CAN Packet using either :meth:`~uds.can.transport_interface.common.AbstractCanTransportInterface.receive_packet` or :meth:`~uds.can.transport_interface.common.AbstractCanTransportInterface.async_receive_packet` method. :return: Time in milliseconds or None if the value was never measured. """ return self.__n_ar_measured @property def n_bs_timeout(self) -> TimeMillisecondsAlias: """Timeout value for :ref:`N_Bs <knowledge-base-can-n-bs>` time parameter.""" return self.__n_bs_timeout @n_bs_timeout.setter def n_bs_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for :ref:`N_Bs <knowledge-base-can-n-bs>` time parameter. :param value: Value of timeout to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is less or equal to 0. """ if not isinstance(value, (int, float)): raise TypeError(f"Provided time parameter value must be int or float type. Actual type: {type(value)}.") if value <= 0: raise ValueError(f"Provided time parameter value must be greater than 0. Actual value: {value}") if value != self.N_BS_TIMEOUT: warn(message="Non-default value of N_Bs timeout was set.", category=ValueWarning) self.__n_bs_timeout = value @property def n_bs_measured(self) -> Optional[Tuple[TimeMillisecondsAlias, ...]]: """ Get the last measured values of :ref:`N_Bs <knowledge-base-can-n-bs>` time parameter. .. note:: The last measurement comes from the last transmission of UDS message using either :meth:`~uds.transport_interface.addressing.AbstractCanTransportInterface.send_message` or :meth:`~uds.transport_interface.addressing.AbstractCanTransportInterface.async_send_message` method. :return: Tuple with times in milliseconds or None if the values could not be measured. """ return self.__n_bs_measured @property def n_br(self) -> TimeMillisecondsAlias: """ Get the value of :ref:`N_Br <knowledge-base-can-n-br>` time parameter which is currently set. .. note:: The actual (observed on the bus) value will be slightly longer as it also includes computation and CAN Interface delays. """ return self.__n_br @n_br.setter def n_br(self, value: TimeMillisecondsAlias) -> None: """ Set the value of :ref:`N_Br <knowledge-base-can-n-br>` time parameter to use. :param value: The value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is out of range (0 <= value < MAX N_Br). """ if not isinstance(value, (int, float)): raise TypeError(f"Provided time parameter value must be int or float type. Actual type: {type(value)}.") if not 0 <= value < self.n_br_max: raise ValueError("Provided time parameter value is out of range. " f"Expected: 0 <= value < {self.n_br_max}. Actual value: {value}.") self.__n_br = value @property def n_br_max(self) -> TimeMillisecondsAlias: """ Get the maximum valid value of :ref:`N_Br <knowledge-base-can-n-br>` time parameter. .. warning:: To assess maximal value of :ref:`N_Br <knowledge-base-can-n-br>`, the actual value of :ref:`N_Ar <knowledge-base-can-n-ar>` time parameter is required. Either the latest measured value of :ref:`N_Ar <knowledge-base-can-n-ar>` would be used, or 0ms would be assumed (if there are no measurement result). """ n_ar_measured = 0 if self.n_ar_measured is None else self.n_ar_measured return 0.9 * self.n_bs_timeout - n_ar_measured @property def n_cs(self) -> Optional[TimeMillisecondsAlias]: """ Get the value of :ref:`N_Cs <knowledge-base-can-n-cs>` time parameter which is currently set. .. note:: The actual (observed on the bus) value will be slightly longer as it also includes computation and CAN Interface delays. """ return self.__n_cs @n_cs.setter def n_cs(self, value: Optional[TimeMillisecondsAlias]) -> None: """ Set the value of :ref:`N_Cs <knowledge-base-can-n-cs>` time parameter to use. :param value: The value to set. - None - use timing compatible with :ref:`STmin <knowledge-base-can-st-min>` value received in a preceding :ref:`Flow Control CAN packet <knowledge-base-can-flow-control>` - int/float type - timing value to be used regardless of a received :ref:`STmin <knowledge-base-can-st-min>` value :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is out of range (0 <= value < MAX N_Cs). """ if value is not None: if not isinstance(value, (int, float)): raise TypeError(f"Provided time parameter value must be int or float type. Actual type: {type(value)}.") if not 0 <= value < self.n_cs_max: raise ValueError("Provided time parameter value is out of range. " f"Expected: 0 <= value < {self.n_cs_max}. Actual value: {value}.") self.__n_cs = value @property def n_cs_max(self) -> TimeMillisecondsAlias: """ Get the maximum valid value of :ref:`N_Cs <knowledge-base-can-n-cs>` time parameter. .. warning:: To assess maximal value of :ref:`N_Cs <knowledge-base-can-n-cs>`, the actual value of :ref:`N_As <knowledge-base-can-n-as>` time parameter is required. Either the latest measured value of :ref:`N_As <knowledge-base-can-n-as>` would be used, or 0ms would be assumed (if there are no measurement result). """ n_as_measured = 0 if self.n_as_measured is None else self.n_as_measured return 0.9 * self.n_cr_timeout - n_as_measured @property def n_cr_timeout(self) -> TimeMillisecondsAlias: """Timeout value for :ref:`N_Cr <knowledge-base-can-n-cr>` time parameter.""" return self.__n_cr_timeout @n_cr_timeout.setter def n_cr_timeout(self, value: TimeMillisecondsAlias) -> None: """ Set timeout value for :ref:`N_Cr <knowledge-base-can-n-cr>` time parameter. :param value: Value of timeout to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is less or equal to 0. """ if not isinstance(value, (int, float)): raise TypeError(f"Provided time parameter value must be int or float type. Actual type: {type(value)}.") if value <= 0: raise ValueError(f"Provided time parameter value must be greater than 0. Actual value: {value}") if value != self.N_CR_TIMEOUT: warn(message="Non-default value of N_Cr timeout was set.", category=ValueWarning) self.__n_cr_timeout = value @property def n_cr_measured(self) -> Optional[Tuple[TimeMillisecondsAlias, ...]]: """ Get the last measured values of :ref:`N_Cr <knowledge-base-can-n-cr>` time parameter. .. note:: The last measurement comes from the last reception of UDS message using either :meth:`~uds.transport_interface.addressing.AbstractCanTransportInterface.receive_message` or :meth:`~uds.transport_interface.addressing.AbstractCanTransportInterface.async_receive_message` method. :return: Tuple with times in milliseconds or None if the values could not be measured. """ return self.__n_cr_measured
[docs] def _update_n_ar_measured(self, value: TimeMillisecondsAlias) -> None: """ Update measured values of :ref:`N_Ar <knowledge-base-can-n-ar>`. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is out of range. """ if not isinstance(value, (int, float)): raise TypeError(f"Provided value is not int or float type. Actual type: {type(value)}.") if value < 0: raise ValueError(f"Provided time parameter cannot be a negative number. Actual value: {value}") if value > self.n_ar_timeout: warn("Measured value of N_Ar was greater than N_Ar timeout.", category=ValueWarning) self.__n_ar_measured = value
[docs] def _update_n_as_measured(self, value: TimeMillisecondsAlias) -> None: """ Update measured values of :ref:`N_As <knowledge-base-can-n-as>`. :param value: Value to set. :raise TypeError: Provided value is not int or float type. :raise ValueError: Provided value is out of range. """ if not isinstance(value, (int, float)): raise TypeError(f"Provided value is not int or float type. Actual type: {type(value)}.") if value < 0: raise ValueError(f"Provided time parameter cannot be a negative number. Actual value: {value}") if value > self.n_as_timeout: warn("Measured value of N_As was greater than N_As timeout.", category=ValueWarning) self.__n_as_measured = value
[docs] def _update_n_bs_measured(self, message_record: UdsMessageRecord) -> None: """ Update measured values of :ref:`N_Bs <knowledge-base-can-n-bs>` according to timestamps of CAN packet records. :param message_record: Record of UDS message transmitted over CAN. :raise TypeError: Provided value is not UDS message record. :raise ValueError: Provided UDS message record was not transmitted. """ if not isinstance(message_record, UdsMessageRecord): raise TypeError(f"Provided value is not UDS Message Record type. Actual type: {type(message_record)}.") if message_record.direction != TransmissionDirection.TRANSMITTED: raise ValueError("Provided UDS Message Record was not transmitted.") if len(message_record.packets_records) == 1: self.__n_bs_measured = None else: n_bs_measured = [] for i, packet_record in enumerate(message_record.packets_records[1:]): if packet_record.packet_type == CanPacketType.FLOW_CONTROL: n_bs = (packet_record.transmission_timestamp - message_record.packets_records[i].transmission_timestamp) n_bs_measured.append(round(n_bs * 1000, 3)) self.__n_bs_measured = tuple(n_bs_measured)
[docs] def _update_n_cr_measured(self, message_record: UdsMessageRecord) -> None: """ Update measured values of :ref:`N_Cr <knowledge-base-can-n-cr>` according to timestamps of CAN packet records. :param message_record: Record of UDS message received over CAN. :raise TypeError: Provided value is not UDS message record. :raise ValueError: Provided UDS message record was not received. """ if not isinstance(message_record, UdsMessageRecord): raise TypeError(f"Provided value is not UDS Message Record type. Actual type: {type(message_record)}.") if message_record.direction != TransmissionDirection.RECEIVED: raise ValueError("Provided UDS Message Record was not received.") if len(message_record.packets_records) == 1: self.__n_cr_measured = None else: n_cr_measured = [] for i, packet_record in enumerate(message_record.packets_records[1:]): if packet_record.packet_type == CanPacketType.CONSECUTIVE_FRAME: n_cr = (packet_record.transmission_timestamp - message_record.packets_records[i].transmission_timestamp) n_cr_measured.append(round(n_cr * 1000, 3)) self.__n_cr_measured = tuple(n_cr_measured)
[docs] def clear_measurements(self) -> None: """Clear measured values of CAN communication parameters.""" self.__n_ar_measured = None self.__n_as_measured = None self.__n_bs_measured = None self.__n_cr_measured = None