Source code for uds.packet.abstract_packet

"""Abstract definition of packets that is common for all bus/network types."""

__all__ = ["AbstractPacketContainer", "AbstractPacket", "AbstractPacketRecord",
           "PacketsContainersSequence", "PacketsTuple", "PacketsRecordsTuple", "PacketsRecordsSequence"]

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Optional, Sequence, Tuple

from uds.addressing import AddressingType, TransmissionDirection
from uds.utilities import ReassignmentError

from .abstract_packet_type import AbstractPacketType


[docs] class AbstractPacketContainer(ABC): """Abstract definition of a container with packet information."""
[docs] def __str__(self) -> str: """Present object in string format.""" payload_str = "None" if self.payload is None else f"[{', '.join(f'0x{byte:02X}' for byte in self.payload)}]" return (f"{self.__class__.__name__}(" f"payload={payload_str}, " f"addressing_type={self.addressing_type}, " f"packet_type={self.packet_type}, " f"raw_frame_data=[{', '.join(f'0x{byte:02X}' for byte in self.raw_frame_data)}])")
@property @abstractmethod def raw_frame_data(self) -> bytes: """Raw data bytes of a frame that carries this packet.""" @property @abstractmethod def packet_type(self) -> AbstractPacketType: """Type (N_PCI value) of this packet.""" @property @abstractmethod def data_length(self) -> Optional[int]: """Payload bytes number of a diagnostic message.""" @property @abstractmethod def addressing_type(self) -> AddressingType: """Addressing for which this packet is relevant.""" @property @abstractmethod def payload(self) -> Optional[bytes]: """Diagnostic message payload carried by this packet."""
[docs] class AbstractPacket(AbstractPacketContainer, ABC): """Abstract definition of a packet (Network Protocol Data Unit - N_PDU)."""
[docs] class AbstractPacketRecord(AbstractPacketContainer, ABC): """Abstract definition of a storage for historic information about transmitted or received packet.""" @abstractmethod def __init__(self, frame: Any, direction: TransmissionDirection, transmission_time: datetime) -> None: """ Create a record of historic information about a packet. :param frame: Frame that carried this packet. :param direction: Information whether this packet was transmitted or received. :param transmission_time: Time stamp when this packet was fully transmitted on a bus/network. """ self.frame = frame self.direction = direction self.transmission_time = transmission_time self._validate_attributes()
[docs] def __str__(self) -> str: """Present object in string format.""" payload_str = "None" if self.payload is None else f"[{', '.join(f'0x{byte:02X}' for byte in self.payload)}]" return (f"{self.__class__.__name__}(" f"payload={payload_str}, " f"addressing_type={self.addressing_type}, " f"packet_type={self.packet_type}, " f"raw_frame_data=[{', '.join(f'0x{byte:02X}' for byte in self.raw_frame_data)}], " f"direction={self.direction}, " f"transmission_time={self.transmission_time})")
@property def frame(self) -> Any: """Frame that carried this packet.""" return self.__frame @frame.setter def frame(self, value: Any) -> None: """ Set value of frame attribute. :param value: Frame value to set. :raise ReassignmentError: An attempt to change the value after object creation. """ if not hasattr(self, "_AbstractPacketRecord__frame"): self._validate_frame(value) self.__frame = value else: raise ReassignmentError("You cannot change value of 'frame' attribute once it is assigned.") @property def direction(self) -> TransmissionDirection: """Information whether this packet was transmitted or received.""" return self.__direction @direction.setter def direction(self, value: TransmissionDirection) -> None: """ Set value of direction attribute. :param value: Direction value to set. :raise ReassignmentError: An attempt to change the value after object creation. """ if not hasattr(self, "_AbstractPacketRecord__direction"): self.__direction = TransmissionDirection.validate_member(value) else: raise ReassignmentError("You cannot change value of 'direction' attribute once it is assigned.") @property def transmission_time(self) -> datetime: """Time when this packet was fully transmitted on a bus/network.""" return self.__transmission_time @transmission_time.setter def transmission_time(self, value: datetime) -> None: """ Set value when this packet was transmitted on a bus/network. :param value: Value of transmission time to set. :raise TypeError: Provided value has unexpected type. :raise ReassignmentError: An attempt to change the value after object creation. """ if not isinstance(value, datetime): raise TypeError("Provided value is not datetime type") if not hasattr(self, "_AbstractPacketRecord__transmission_time"): self.__transmission_time = value else: raise ReassignmentError("You cannot change value of 'transmission_time' attribute once it is assigned.")
[docs] @staticmethod @abstractmethod def _validate_frame(value: Any) -> None: """ Validate a frame argument. :param value: Value to validate. """
[docs] @abstractmethod def _validate_attributes(self) -> None: """Validate whether attributes that were set are a valid for a Packet record."""
PacketsContainersSequence = Sequence[AbstractPacketContainer] """Alias for a sequence filled with packet or packet record objects.""" PacketsTuple = Tuple[AbstractPacket, ...] """Alias for a packet objects tuple.""" PacketsRecordsTuple = Tuple[AbstractPacketRecord, ...] """Alias for a packet record objects tuple.""" PacketsRecordsSequence = Sequence[AbstractPacketRecord] """Alias for a packet record objects sequence."""