"""Segmentation specific for CAN bus."""
__all__ = ["CanSegmenter"]
from typing import Optional, Union, Tuple, Dict, Type
from copy import copy
from uds.utilities import RawByte, RawBytesList, validate_raw_byte, AmbiguityError
from uds.transmission_attributes import AddressingType, AddressingTypeAlias
from uds.can import CanAddressingInformationHandler, CanAddressingFormat, CanAddressingFormatAlias, \
CanDlcHandler, CanSingleFrameHandler, CanFirstFrameHandler, CanConsecutiveFrameHandler, DEFAULT_FILLER_BYTE
from uds.packet import CanPacket, CanPacketRecord, CanPacketType, PacketAlias, PacketsSequence, PacketsDefinitionTuple
from uds.message import UdsMessage, UdsMessageRecord
from .abstract_segmenter import AbstractSegmenter, SegmentationError
AIArgsAlias = Dict[str, Optional[Union[int, RawByte]]]
"""Alias of Addressing Information arguments to configure CAN Segmenter communication model."""
AIParamsAlias = Dict[str, Optional[Union[int, AddressingTypeAlias]]]
"""Alias of Addressing Information parameters used by CAN Segmenter for each communication model."""
[docs]class CanSegmenter(AbstractSegmenter):
"""Segmenter class that provides utilities for segmentation and desegmentation on CAN bus."""
def __init__(self, *,
addressing_format: CanAddressingFormatAlias,
physical_ai: Optional[AIArgsAlias] = None,
functional_ai: Optional[AIArgsAlias] = None,
dlc: int = CanDlcHandler.MIN_BASE_UDS_DLC,
use_data_optimization: bool = False,
filler_byte: RawByte = DEFAULT_FILLER_BYTE) -> None:
"""
Configure CAN Segmenter.
:param addressing_format: CAN Addressing format used.
:param physical_ai: CAN Addressing Information parameters to use for physically addressed communication.
Leave None if the segmenter will not be used for segmenting physically addressed messages.
:param functional_ai: CAN Addressing Information parameters to use for functionally addressed communication.
Leave None if the segmenter will not be used for segmenting functionally addressed messages.
:param dlc: Base CAN DLC value to use for CAN Packets.
:param use_data_optimization: Information whether to use CAN Frame Data Optimization during segmentation.
:param filler_byte: Filler byte value to use for CAN Frame Data Padding during segmentation.
"""
CanAddressingFormat.validate_member(addressing_format)
self.__addressing_format: CanAddressingFormatAlias = CanAddressingFormat(addressing_format)
self.physical_ai = physical_ai # type: ignore
self.functional_ai = functional_ai # type: ignore
self.dlc = dlc
self.use_data_optimization = use_data_optimization
self.filler_byte = filler_byte
@property
def supported_packet_classes(self) -> Tuple[Type[PacketAlias], ...]:
"""Classes that define packet objects supported by this segmenter."""
return CanPacket, CanPacketRecord
@property
def addressing_format(self) -> CanAddressingFormatAlias:
"""CAN Addressing format used."""
return self.__addressing_format
@property
def physical_ai(self) -> Optional[AIParamsAlias]:
"""
CAN Addressing Information parameters used for physically addressed communication.
None if physically addressed communication parameters are not configured.
"""
return copy(self.__physical_ai)
@physical_ai.setter
def physical_ai(self, value: Optional[AIArgsAlias]):
"""
Set value of CAN Addressing Information parameters to use for physically addressed communication.
:param value: Value to set.
"""
if value is None:
self.__physical_ai: Optional[AIParamsAlias] = None
else:
CanAddressingInformationHandler.validate_ai(
addressing_format=self.addressing_format,
addressing_type=AddressingType.PHYSICAL,
**value)
physical_ai = copy(value)
physical_ai.update({
"addressing_format": self.addressing_format, # type: ignore
CanAddressingInformationHandler.ADDRESSING_TYPE_NAME: AddressingType.PHYSICAL # type: ignore
})
self.__physical_ai = physical_ai # type: ignore
@property
def functional_ai(self) -> AIParamsAlias:
"""
CAN Addressing Information parameters used for functionally addressed communication.
None if functionally addressed communication parameters are not configured.
"""
return copy(self.__functional_ai) # type: ignore
@functional_ai.setter
def functional_ai(self, value: Optional[AIArgsAlias]):
"""
Set value of CAN Addressing Information parameters to use for functionally addressed communication.
:param value: Value to set.
"""
if value is None:
self.__functional_ai: Optional[AIParamsAlias] = None
else:
CanAddressingInformationHandler.validate_ai(
addressing_format=self.addressing_format,
addressing_type=AddressingType.FUNCTIONAL,
**value)
functional_ai = copy(value)
functional_ai.update({
"addressing_format": self.addressing_format, # type: ignore
CanAddressingInformationHandler.ADDRESSING_TYPE_NAME: AddressingType.FUNCTIONAL # type: ignore
})
self.__functional_ai = functional_ai # type: ignore
@property
def dlc(self) -> int:
"""
Value of base CAN DLC to use for CAN Packets.
.. note:: All output CAN Packets (created by :meth:`~uds.segmentation.can_segmenter.CanSegmenter.segmentation`)
will have this DLC value set unless
:ref:`CAN Frame Data Optimization <knowledge-base-can-data-optimization>` is used.
"""
return self.__dlc
@dlc.setter
def dlc(self, value: int):
"""
Set value of base CAN DLC to use for CAN Packets.
:param value: Value to set.
:raise ValueError: Provided value is too small.
"""
CanDlcHandler.validate_dlc(value)
if value < CanDlcHandler.MIN_BASE_UDS_DLC:
raise ValueError(f"Provided value is too small. Expected: DLC >= {CanDlcHandler.MIN_BASE_UDS_DLC}. "
f"Actual value: {value}")
self.__dlc: int = value
@property
def use_data_optimization(self) -> bool:
"""Information whether to use CAN Frame Data Optimization for CAN Packet created during segmentation."""
return self.__use_data_optimization
@use_data_optimization.setter
def use_data_optimization(self, value: bool):
"""
Set whether to use CAN Frame Data Optimization for CAN Packet created during segmentation.
:param value: Value to set.
"""
self.__use_data_optimization: bool = bool(value)
@property
def filler_byte(self) -> RawByte:
"""Filler byte value to use for CAN Frame Data Padding during segmentation."""
return self.__filler_byte
@filler_byte.setter
def filler_byte(self, value: RawByte):
"""
Set value of filler byte to use for CAN Frame Data Padding.
:param value: Value to set.
"""
validate_raw_byte(value)
self.__filler_byte: RawByte = value
[docs] def desegmentation(self, packets: PacketsSequence) -> Union[UdsMessage, UdsMessageRecord]:
"""
Perform desegmentation of CAN packets.
:param packets: CAN packets to desegment into UDS message.
:raise SegmentationError: Provided packets are not a complete packets sequence that form a diagnostic message.
:raise NotImplementedError: There is missing implementation for the provided CAN Packets.
Please create an issue in our `Issues Tracking System <https://github.com/mdabrowski1990/uds/issues>`_
with detailed description if you face this error.
:return: A diagnostic message that is an outcome of CAN packets desegmentation.
"""
if not self.is_complete_packets_sequence(packets):
raise SegmentationError("Provided packets are not a complete packets sequence")
if isinstance(packets[0], CanPacketRecord):
return UdsMessageRecord(packets) # type: ignore
if isinstance(packets[0], CanPacket):
if packets[0].packet_type == CanPacketType.SINGLE_FRAME and len(packets) == 1:
return UdsMessage(payload=packets[0].payload, # type: ignore
addressing_type=packets[0].addressing_type)
if packets[0].packet_type == CanPacketType.FIRST_FRAME:
payload_bytes: RawBytesList = []
for packet in packets:
if packet.payload is not None:
payload_bytes.extend(packet.payload)
return UdsMessage(payload=payload_bytes[:packets[0].data_length],
addressing_type=packets[0].addressing_type)
raise SegmentationError("Unexpectedly, something went wrong...")
raise NotImplementedError(f"Missing implementation for provided CAN Packet: {type(packets[0])}")
[docs] def segmentation(self, message: UdsMessage) -> PacketsDefinitionTuple:
"""
Perform segmentation of a diagnostic message.
:param message: UDS message to divide into UDS packets.
:raise TypeError: Provided value is not instance of UdsMessage class.
:raise AmbiguityError: Segmentation cannot be completed because CAN Segmenter is not properly configured.
:raise NotImplementedError: There is missing implementation for the Addressing Type used by provided message.
Please create an issue in our `Issues Tracking System <https://github.com/mdabrowski1990/uds/issues>`_
with detailed description if you face this error.
:return: CAN packets that are an outcome of UDS message segmentation.
"""
if not isinstance(message, UdsMessage):
raise TypeError(f"Provided value is not instance of UdsMessage class. Actual type: {type(message)}")
if message.addressing_type == AddressingType.PHYSICAL:
if self.physical_ai is None:
raise AmbiguityError("Provided diagnostic message cannot be segmented as physical addressing "
"information are not configured.")
return self.__physical_segmentation(message)
if message.addressing_type == AddressingType.FUNCTIONAL:
if self.functional_ai is None:
raise AmbiguityError("Provided diagnostic message cannot be segmented as functional addressing "
"information are not configured.")
return self.__functional_segmentation(message)
raise NotImplementedError(f"Unknown addressing type received: {message.addressing_type}")
[docs] def is_complete_packets_sequence(self, packets: PacketsSequence) -> bool:
"""
Check whether provided packets are full sequence of packets that form exactly one diagnostic message.
:param packets: Packets sequence to check.
:raise ValueError: Provided value is not CAN packets sequence.
:raise NotImplementedError: There is missing implementation for the provided initial packet type.
Please create an issue in our `Issues Tracking System <https://github.com/mdabrowski1990/uds/issues>`_
with detailed description if you face this error.
:return: True if the packets form exactly one diagnostic message.
False if there are missing, additional or inconsistent (e.g. two packets that initiate a message) packets.
"""
if not self.is_supported_packets_sequence(packets):
raise ValueError("Provided packets are not consistent CAN Packets sequence.")
if not CanPacketType.is_initial_packet_type(packets[0].packet_type):
return False
if packets[0].packet_type == CanPacketType.SINGLE_FRAME:
return len(packets) == 1
if packets[0].packet_type == CanPacketType.FIRST_FRAME:
total_payload_size = packets[0].data_length
payload_bytes_found = len(packets[0].payload) # type: ignore
for following_packet in packets[1:]:
if CanPacketType.is_initial_packet_type(following_packet.packet_type):
return False
if payload_bytes_found >= total_payload_size: # type: ignore
return False
if following_packet.payload is not None:
payload_bytes_found += len(following_packet.payload)
return payload_bytes_found >= total_payload_size # type: ignore
raise NotImplementedError(f"Unknown packet type received: {packets[0].packet_type}")
def __physical_segmentation(self, message: UdsMessage) -> PacketsDefinitionTuple:
"""
Segment physically addressed diagnostic message.
:param message: UDS message to divide into UDS packets.
:raise SegmentationError: Provided diagnostic message cannot be segmented.
:return: CAN packets that are an outcome of UDS message segmentation.
"""
message_payload_size = len(message.payload)
if message_payload_size > CanFirstFrameHandler.MAX_LONG_FF_DL_VALUE:
raise SegmentationError("Provided diagnostic message cannot be segmented to CAN Packet as it is too big "
"to transmit it over CAN bus.")
if message_payload_size <= CanSingleFrameHandler.get_max_payload_size(addressing_format=self.addressing_format,
dlc=self.dlc):
single_frame = CanPacket(packet_type=CanPacketType.SINGLE_FRAME, # type: ignore
payload=message.payload,
filler_byte=self.filler_byte,
dlc=None if self.use_data_optimization else self.dlc,
**self.physical_ai)
return (single_frame,)
ff_payload_size = CanFirstFrameHandler.get_payload_size(
addressing_format=self.addressing_format,
dlc=self.dlc,
long_ff_dl_format=message_payload_size > CanFirstFrameHandler.MAX_SHORT_FF_DL_VALUE)
first_frame = CanPacket(packet_type=CanPacketType.FIRST_FRAME, # type: ignore
payload=message.payload[:ff_payload_size],
dlc=self.dlc,
data_length=message_payload_size,
**self.physical_ai)
cf_payload_size = CanConsecutiveFrameHandler.get_max_payload_size(addressing_format=self.addressing_format,
dlc=self.dlc)
total_cfs_number = (message_payload_size - ff_payload_size + cf_payload_size - 1) // cf_payload_size
consecutive_frames = []
for cf_index in range(total_cfs_number):
sequence_number = (cf_index + 1) % 0x10
payload_i_start = ff_payload_size + cf_index * cf_payload_size
payload_i_stop = payload_i_start + cf_payload_size
consecutive_frame = CanPacket(packet_type=CanPacketType.CONSECUTIVE_FRAME, # type: ignore
payload=message.payload[payload_i_start: payload_i_stop],
dlc=None if self.use_data_optimization and cf_index == total_cfs_number - 1
else self.dlc,
sequence_number=sequence_number,
filler_byte=self.filler_byte,
**self.physical_ai)
consecutive_frames.append(consecutive_frame)
return (first_frame, *consecutive_frames)
def __functional_segmentation(self, message: UdsMessage) -> PacketsDefinitionTuple:
"""
Segment functionally addressed diagnostic message.
:param message: UDS message to divide into UDS packets.
:raise SegmentationError: Provided diagnostic message cannot be segmented.
:return: CAN packets that are an outcome of UDS message segmentation.
"""
max_payload_size = CanSingleFrameHandler.get_max_payload_size(addressing_format=self.addressing_format,
dlc=self.dlc)
message_payload_size = len(message.payload)
if message_payload_size > max_payload_size:
raise SegmentationError("Provided diagnostic message cannot be segmented using functional addressing "
"as it will not fit into a Single Frame.")
single_frame = CanPacket(packet_type=CanPacketType.SINGLE_FRAME,
payload=message.payload,
filler_byte=self.filler_byte,
dlc=None if self.use_data_optimization else self.dlc,
**self.functional_ai) # type: ignore
return (single_frame,)