Source code for uds.can.packet.can_packet_record

"""CAN bus specific implementation of packets records."""

__all__ = ["CanPacketRecord", "CanFrameAlias"]

from datetime import datetime
from typing import Any, Union

from can import Message as PythonCanMessage
from uds.addressing import AddressingType, TransmissionDirection
from uds.packet import AbstractPacketRecord
from uds.utilities import ReassignmentError

from ..addressing import CanAddressingFormat, CanAddressingInformation
from .abstract_container import AbstractCanPacketContainer
from .can_packet_type import CanPacketType

CanFrameAlias = Union[PythonCanMessage]
"""Alias of supported CAN frames objects."""


[docs] class CanPacketRecord(AbstractCanPacketContainer, AbstractPacketRecord): """ Definition of a CAN packet record. Objects of this class act as a storage for historic information about transmitted or received :ref:`CAN packet <knowledge-base-can-packet>`. """ def __init__(self, *, frame: CanFrameAlias, addressing_format: CanAddressingFormat, addressing_type: AddressingType, direction: TransmissionDirection, transmission_time: datetime) -> None: """ Create a record of historic information about a CAN packet that was either received or transmitted. :param frame: Either received or transmitted CAN frame that carried this CAN Packet. :param addressing_format: CAN Addressing Format used. :param addressing_type: Addressing type for which this CAN packet is relevant. :param direction: Information whether this packet was transmitted or received. :param transmission_time: Time stamp when this packet was fully transmitted on a CAN bus. """ self.addressing_format = addressing_format self.addressing_type = addressing_type super().__init__(frame=frame, direction=direction, transmission_time=transmission_time)
[docs] def __str__(self) -> str: """Present object in string format.""" payload_str = "None" if self.payload is None else f"[{', '.join(f'0x{byte:02X}' for byte in self.payload)}]" return (f"{self.__class__.__name__}(" f"payload={payload_str}," f"addressing_type={self.addressing_type}, " f"addressing_format={self.addressing_format}, " f"packet_type={self.packet_type}, " f"raw_frame_data=[{', '.join(f'0x{byte:02X}' for byte in self.raw_frame_data)}], " f"can_id={self.can_id}, " f"direction={self.direction}, " f"transmission_time={self.transmission_time})")
@property def can_id(self) -> int: """ CAN Identifier (CAN ID) of a CAN Frame that carries this CAN packet. :raise NotImplementedError: There is missing implementation for the stored CAN Frame object type. """ if isinstance(self.frame, PythonCanMessage): return self.frame.arbitration_id raise NotImplementedError(f"Missing implementation for: {self.frame}") @property def raw_frame_data(self) -> bytes: """ Raw data bytes of a CAN frame that carried this CAN packet. :raise NotImplementedError: There is missing implementation for the stored CAN Frame object type. """ if isinstance(self.frame, PythonCanMessage): return bytes(self.frame.data) raise NotImplementedError(f"Missing implementation for: {self.frame}") @property def addressing_format(self) -> CanAddressingFormat: """CAN Addressing Format used by this CAN packet record.""" return self.__addressing_format @addressing_format.setter def addressing_format(self, value: CanAddressingFormat) -> None: """ Set CAN Addressing Format used by this CAN packet record. :param value: Value of CAN Addressing Format. :raise ReassignmentError: An attempt to change the value after object creation. """ try: getattr(self, "_CanPacketRecord__addressing_format") except AttributeError: self.__addressing_format = CanAddressingFormat.validate_member(value) else: raise ReassignmentError("You cannot change value of 'addressing_format' attribute once it is assigned. " "Create a new object instead.") @property def addressing_type(self) -> AddressingType: """Addressing type over which this CAN packet was transmitted.""" return self.__addressing_type @addressing_type.setter def addressing_type(self, value: AddressingType) -> None: """ Set addressing type over which this CAN packet was transmitted. :param value: Value of addressing type. """ try: getattr(self, "_CanPacketRecord__addressing_type") except AttributeError: self.__addressing_type = AddressingType.validate_member(value) else: raise ReassignmentError("You cannot change value of 'addressing_type' attribute once it is assigned. " "Create a new object instead.")
[docs] @staticmethod def _validate_frame(value: Any) -> None: """ Validate a CAN frame argument. :param value: Value to validate. :raise TypeError: Provided frame object has unsupported type. """ if isinstance(value, PythonCanMessage): return None raise TypeError(f"Unsupported CAN Frame type was provided. Actual type: {type(value)}")
[docs] def _validate_attributes(self) -> None: """Validate whether attributes that were set are a valid for a CAN Packet record.""" CanAddressingInformation.validate_addressing_params(addressing_format=self.addressing_format, addressing_type=self.addressing_type, can_id=self.can_id, target_address=self.target_address, source_address=self.source_address, address_extension=self.address_extension) CanPacketType.validate_member(self.packet_type)