Source code for uds.can.segmenter

"""Segmentation specific for CAN bus."""

__all__ = ["CanSegmenter"]

from typing import Optional, Tuple, Type, Union
from warnings import warn

from uds.addressing import AbstractAddressingInformation, AddressingType
from uds.message import UdsMessage, UdsMessageRecord
from uds.packet import AbstractPacket, AbstractPacketRecord
from uds.segmentation import AbstractSegmenter, SegmentationError
from uds.utilities import ValueWarning, validate_raw_byte

from .addressing import AbstractCanAddressingInformation, CanAddressingFormat
from .frame import DEFAULT_FILLER_BYTE, CanDlcHandler
from .packet import (
    MAX_LONG_FF_DL_VALUE,
    MAX_SHORT_FF_DL_VALUE,
    CanFlowStatus,
    CanPacket,
    CanPacketRecord,
    CanPacketsContainersSequence,
    CanPacketType,
    get_consecutive_frame_max_payload_size,
    get_consecutive_frame_min_dlc,
    get_first_frame_payload_size,
    get_single_frame_min_dlc,
)


[docs] class CanSegmenter(AbstractSegmenter): """Segmenter class that provides utilities for segmentation and desegmentation specific for CAN bus.""" def __init__(self, *, addressing_information: AbstractCanAddressingInformation, dlc: int = CanDlcHandler.MIN_BASE_UDS_DLC, min_dlc: Optional[int] = None, use_data_optimization: bool = False, filler_byte: int = DEFAULT_FILLER_BYTE) -> None: """ Configure CAN Segmenter. :param addressing_information: Addressing Information configuration of a node that is taking part in DoCAN communication. :param dlc: Base CAN DLC value to use for creating CAN Packets. :param min_dlc: Minimal CAN DLC to use for CAN Packets during Data Optimization. None value means no restriction. :param use_data_optimization: Information whether to use CAN Frame Data Optimization in created CAN Packets during segmentation. :param filler_byte: Filler byte value to use for CAN Frame Data Padding in created CAN Packets during segmentation. """ super().__init__(addressing_information=addressing_information) self.addressing_information: AbstractCanAddressingInformation self.dlc = dlc self.min_dlc = min_dlc self.use_data_optimization = use_data_optimization self.filler_byte = filler_byte @property def supported_addressing_information_class(self) -> Type[AbstractAddressingInformation]: """Addressing Information class supported by this segmenter.""" return AbstractCanAddressingInformation @property def supported_packet_class(self) -> Type[AbstractPacket]: """Packet class supported by CAN segmenter.""" return CanPacket @property def supported_packet_record_class(self) -> Type[AbstractPacketRecord]: """Packet Record class supported by CAN segmenter.""" return CanPacketRecord @property def addressing_format(self) -> CanAddressingFormat: """CAN Addressing Format used.""" return self.addressing_information.ADDRESSING_FORMAT @property def dlc(self) -> int: """ Value of base CAN DLC to use for CAN Packets. .. note:: All output CAN Packets (created by :meth:`~uds.segmentation.can_segmenter.CanSegmenter.segmentation`) will have this DLC value set unless :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` is used. """ return self.__dlc @dlc.setter def dlc(self, value: int) -> None: """ Set value of base CAN DLC to use for CAN Packets. :param value: Value to set. :raise ValueError: Provided value is too small. """ CanDlcHandler.validate_dlc(value) if value < CanDlcHandler.MIN_BASE_UDS_DLC: raise ValueError(f"Provided value is too small. Expected: DLC >= {CanDlcHandler.MIN_BASE_UDS_DLC}. " f"Actual value: {value}") self.__dlc: int = value if hasattr(self, "_CanSegmenter__min_dlc") and self.min_dlc is not None and self.min_dlc > value: warn(message="Min DLC value had to be decreased, cause it was greater than base DLC.", category=ValueWarning) self.min_dlc = value @property def min_dlc(self) -> Optional[int]: """ Value of minimal CAN DLC to use for CAN Packets during Data Optimization. .. note:: Output CAN Packets (created by :meth:`~uds.segmentation.can_segmenter.CanSegmenter.segmentation`) will never have DLC smaller than this value even if :ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` is used. """ return self.__min_dlc @min_dlc.setter def min_dlc(self, value: Optional[int]) -> None: """ Set value of minimal CAN DLC to use for CAN Packets during Data Optimization. :param value: Value to set. :raise ValueError: Provided value is greater than base CAN DLC. """ if value is not None: CanDlcHandler.validate_dlc(value) if value > self.dlc: raise ValueError(f"Min DLC must be less or equal than base DLC. DLC = {self.dlc}. " f"Actual value: {value}") self.__min_dlc: Optional[int] = value @property def use_data_optimization(self) -> bool: """Information whether to use CAN Frame Data Optimization during CAN Packet creation.""" return self.__use_data_optimization @use_data_optimization.setter def use_data_optimization(self, value: bool) -> None: """ Set whether to use CAN Frame Data Optimization during CAN Packets creation. :param value: Value to set. """ self.__use_data_optimization: bool = bool(value) @property def filler_byte(self) -> int: """Filler byte value to use for CAN Frame Data Padding during segmentation.""" return self.__filler_byte @filler_byte.setter def filler_byte(self, value: int) -> None: """ Set value of filler byte to use for CAN Frame Data Padding. :param value: Value to set. """ validate_raw_byte(value) self.__filler_byte: int = value def __physical_segmentation(self, message: UdsMessage) -> Tuple[CanPacket, ...]: """ Segment physically addressed diagnostic message. :param message: UDS message to divide into packets. :raise SegmentationError: Provided diagnostic message cannot be segmented. :return: CAN packets that are an outcome of UDS message segmentation. """ message_payload_size = len(message.payload) if message_payload_size > MAX_LONG_FF_DL_VALUE: raise SegmentationError("Provided diagnostic message cannot be segmented to CAN Packet as it is too big " "to transmit it over CAN bus. " f"Maximal diagnostic message length: {MAX_LONG_FF_DL_VALUE}") try: min_sf_dlc = get_single_frame_min_dlc(addressing_format=self.addressing_format, payload_length=message_payload_size) except ValueError: min_sf_dlc = CanDlcHandler.MAX_DLC_VALUE + 1 if min_sf_dlc <= self.dlc: if self.use_data_optimization: dlc = None if self.min_dlc is None else max(min_sf_dlc, self.min_dlc) else: dlc = self.dlc single_frame = CanPacket(packet_type=CanPacketType.SINGLE_FRAME, payload=message.payload, filler_byte=self.filler_byte, dlc=dlc, **self.addressing_information.tx_physical_params) return (single_frame,) ff_payload_size = get_first_frame_payload_size(addressing_format=self.addressing_format, dlc=self.dlc, long_ff_dl_format=message_payload_size > MAX_SHORT_FF_DL_VALUE) first_frame = CanPacket(packet_type=CanPacketType.FIRST_FRAME, payload=message.payload[:ff_payload_size], dlc=self.dlc, data_length=message_payload_size, **self.addressing_information.tx_physical_params) cf_payload_size = get_consecutive_frame_max_payload_size(addressing_format=self.addressing_format, dlc=self.dlc) total_cfs_number = (message_payload_size - ff_payload_size + cf_payload_size - 1) // cf_payload_size consecutive_frames = [] for cf_index in range(total_cfs_number): sequence_number = (cf_index + 1) % 0x10 payload_i_start = ff_payload_size + cf_index * cf_payload_size payload_i_stop = payload_i_start + cf_payload_size last_cf = cf_index == total_cfs_number - 1 if last_cf and self.use_data_optimization: dlc = None if self.min_dlc is None else max( self.min_dlc, get_consecutive_frame_min_dlc(addressing_format=self.addressing_format, payload_length=len(message.payload[payload_i_start:]))) else: dlc = self.dlc consecutive_frame = CanPacket(packet_type=CanPacketType.CONSECUTIVE_FRAME, payload=message.payload[payload_i_start:payload_i_stop], dlc=dlc, sequence_number=sequence_number, filler_byte=self.filler_byte, **self.addressing_information.tx_physical_params) consecutive_frames.append(consecutive_frame) return first_frame, *consecutive_frames def __functional_segmentation(self, message: UdsMessage) -> Tuple[CanPacket, ...]: """ Segment functionally addressed diagnostic message. :param message: UDS message to divide into packets. :raise SegmentationError: Provided diagnostic message cannot be segmented. :return: CAN packets that are an outcome of UDS message segmentation. """ message_payload_size = len(message.payload) try: min_sf_dlc = get_single_frame_min_dlc(addressing_format=self.addressing_format, payload_length=message_payload_size) except ValueError: min_sf_dlc = CanDlcHandler.MAX_DLC_VALUE + 1 if min_sf_dlc > self.dlc: raise SegmentationError("Provided diagnostic message cannot be segmented using functional addressing " "as it will not fit into a Single Frame.") if self.use_data_optimization: dlc = None if self.min_dlc is None else max(min_sf_dlc, self.min_dlc) else: dlc = self.dlc single_frame = CanPacket(packet_type=CanPacketType.SINGLE_FRAME, payload=message.payload, filler_byte=self.filler_byte, dlc=dlc, **self.addressing_information.tx_functional_params) return (single_frame,)
[docs] def is_desegmented_message(self, packets: CanPacketsContainersSequence) -> bool: """ Check whether provided packets are full sequence of packets that form exactly one diagnostic message. :param packets: Packets sequence to check. :raise ValueError: Provided value is not CAN packets sequence. :raise NotImplementedError: There is missing implementation for the provided initial packet type. :return: True if the packets form exactly one diagnostic message. False if there are missing, additional or inconsistent (e.g. two packets that initiate a message) packets. """ if not self.is_supported_packets_sequence_type(packets): raise ValueError("Provided packets are not consistent CAN Packets sequence.") if not CanPacketType.is_initial_packet_type(packets[0].packet_type): return False if packets[0].packet_type == CanPacketType.SINGLE_FRAME: return len(packets) == 1 if packets[0].packet_type == CanPacketType.FIRST_FRAME: total_payload_size: int = packets[0].data_length # type: ignore payload_bytes_found = len(packets[0].payload) # type: ignore sequence_number = 1 for following_packet in packets[1:]: if payload_bytes_found >= total_payload_size: return False if following_packet.packet_type == CanPacketType.FLOW_CONTROL: continue if following_packet.packet_type == CanPacketType.CONSECUTIVE_FRAME: if following_packet.sequence_number != sequence_number: return False payload_bytes_found += len(following_packet.payload) # type: ignore sequence_number = (sequence_number + 1) & 0xF else: return False return payload_bytes_found >= total_payload_size raise NotImplementedError("There is missing implementation for the CAN Packet of provided type: " f"{packets[0].packet_type}.")
[docs] def get_flow_control_packet(self, flow_status: CanFlowStatus, block_size: Optional[int] = None, st_min: Optional[int] = None) -> CanPacket: """ Create Flow Control CAN packet. :param flow_status: Value of Flow Status parameter. :param block_size: Value of Block Size parameter. This parameter is only required with ContinueToSend Flow Status, leave None otherwise. :param st_min: Value of Separation Time minimum (STmin) parameter. This parameter is only required with ContinueToSend Flow Status, leave None otherwise. :return: Flow Control CAN packet with provided parameters. """ return CanPacket(packet_type=CanPacketType.FLOW_CONTROL, flow_status=flow_status, block_size=block_size, st_min=st_min, filler_byte=self.filler_byte, dlc=None if self.use_data_optimization else self.dlc, **self.addressing_information.tx_physical_params)
[docs] def desegmentation(self, packets: CanPacketsContainersSequence) -> Union[UdsMessage, UdsMessageRecord]: """ Perform desegmentation of CAN packets. :param packets: CAN packets to desegment into UDS message. :raise SegmentationError: Provided packets are not a complete packets sequence that form a diagnostic message. :raise NotImplementedError: There is missing implementation for the provided CAN Packets type. :return: A diagnostic message that is an outcome of CAN packets desegmentation. """ if not self.is_desegmented_message(packets): raise SegmentationError("Provided packets are not a complete packets sequence.") if isinstance(packets[0], CanPacketRecord): return UdsMessageRecord(packets) # type: ignore if isinstance(packets[0], CanPacket): if packets[0].packet_type == CanPacketType.SINGLE_FRAME and len(packets) == 1: return UdsMessage(payload=packets[0].payload, # type: ignore addressing_type=packets[0].addressing_type) if packets[0].packet_type == CanPacketType.FIRST_FRAME: payload_bytes = bytearray() for packet in packets: if packet.payload is not None: payload_bytes += bytearray(packet.payload) return UdsMessage(payload=payload_bytes[:packets[0].data_length], addressing_type=packets[0].addressing_type) raise SegmentationError("Unexpectedly, something went wrong...") raise NotImplementedError("Missing implementation for the provided CAN Packet type.")
[docs] def segmentation(self, message: UdsMessage) -> Tuple[CanPacket, ...]: """ Perform segmentation of a diagnostic message. :param message: UDS message to divide into packets. :raise TypeError: Provided value is not instance of UdsMessage class. :raise NotImplementedError: There is missing implementation for the Addressing Type used by the provided message. :return: CAN packets that are an outcome of UDS message segmentation. """ if not isinstance(message, UdsMessage): raise TypeError(f"Provided value is not instance of UdsMessage class. Actual type: {type(message)}.") if message.addressing_type == AddressingType.PHYSICAL: return self.__physical_segmentation(message) if message.addressing_type == AddressingType.FUNCTIONAL: return self.__functional_segmentation(message) raise NotImplementedError("Unhandled addressing type.")