"""Abstract definition of packets that is common for all bus/network types."""
__all__ = ["AbstractPacketContainer", "AbstractPacket", "AbstractPacketRecord",
"PacketsContainersSequence", "PacketsTuple", "PacketsRecordsTuple", "PacketsRecordsSequence"]
from abc import ABC, abstractmethod
from datetime import datetime
from time import perf_counter
from typing import Any, Optional, Sequence, Tuple
from warnings import warn
from uds.addressing import AddressingType, TransmissionDirection
from uds.utilities import ReassignmentError, bytes_to_hex
from .abstract_packet_type import AbstractPacketType
[docs]
class AbstractPacketContainer(ABC):
"""Abstract definition of a container with packet information."""
[docs]
def __str__(self) -> str:
"""Present object in string format."""
return (f"{self.__class__.__name__}("
f"raw_frame_data={bytes_to_hex(self.raw_frame_data)}, "
f"payload={None if self.payload is None else bytes_to_hex(self.payload)}, "
f"addressing_type={self.addressing_type}, "
f"packet_type={self.packet_type})")
@property
@abstractmethod
def raw_frame_data(self) -> bytes:
"""Raw data bytes of a frame that carries this packet."""
@property
@abstractmethod
def packet_type(self) -> AbstractPacketType:
"""Type (N_PCI value) of this packet."""
@property
@abstractmethod
def data_length(self) -> Optional[int]:
"""Payload bytes number of a diagnostic message."""
@property
@abstractmethod
def addressing_type(self) -> AddressingType:
"""Addressing for which this packet is relevant."""
@property
@abstractmethod
def payload(self) -> Optional[bytes]:
"""Diagnostic message payload carried by this packet."""
[docs]
class AbstractPacket(AbstractPacketContainer, ABC):
"""Abstract definition of a packet (Network Protocol Data Unit - N_PDU)."""
[docs]
class AbstractPacketRecord(AbstractPacketContainer, ABC):
"""Abstract definition of a storage for historic information about transmitted or received packet."""
@abstractmethod
def __init__(self,
frame: Any,
direction: TransmissionDirection,
transmission_time: datetime,
transmission_timestamp: float) -> None:
"""
Create a record of historic information about a packet.
:param frame: Frame that carried this packet.
:param direction: Information whether this packet was transmitted or received.
:param transmission_time: Time when this packet was transmitted on a bus/network.
:param transmission_timestamp: Timestamp when this packet was transmitted on a bus/network.
"""
self.frame = frame
self.direction = direction
self.transmission_time = transmission_time
self.transmission_timestamp = transmission_timestamp
self._validate_attributes()
[docs]
def __str__(self) -> str:
"""Present object in string format."""
return (f"{self.__class__.__name__}("
f"raw_frame_data={bytes_to_hex(self.raw_frame_data)}, "
f"addressing_type={self.addressing_type}, "
f"direction={self.direction}, "
f"payload={None if self.payload is None else bytes_to_hex(self.payload)}, "
f"packet_type={self.packet_type}, "
f"transmission_time={self.transmission_time}, "
f"transmission_timestamp={self.transmission_timestamp})")
@property
def frame(self) -> Any:
"""Frame that carried this packet."""
return self.__frame
@frame.setter
def frame(self, value: Any) -> None:
"""
Set value of frame attribute.
:param value: Frame value to set.
:raise ReassignmentError: An attempt to change the value after object creation.
"""
if hasattr(self, "_AbstractPacketRecord__frame"):
raise ReassignmentError("Value of 'frame' attribute cannot be changed once assigned.")
self._validate_frame(value)
self.__frame = value
@property
def direction(self) -> TransmissionDirection:
"""Information whether this packet was transmitted or received."""
return self.__direction
@direction.setter
def direction(self, value: TransmissionDirection) -> None:
"""
Set value of direction attribute.
:param value: Direction value to set.
:raise ReassignmentError: An attempt to change the value after object creation.
"""
if hasattr(self, "_AbstractPacketRecord__direction"):
raise ReassignmentError("Value of 'direction' attribute cannot be changed once assigned.")
self.__direction = TransmissionDirection.validate_member(value)
@property
def transmission_time(self) -> datetime:
"""Time when this packet was transmitted on a bus/network."""
return self.__transmission_time
@transmission_time.setter
def transmission_time(self, value: datetime) -> None:
"""
Set time value when this packet was transmitted on a bus/network.
:param value: Value of transmission time to set.
:raise TypeError: Provided value is not datetime type.
:raise ReassignmentError: An attempt to change the value after object creation.
"""
time_now = datetime.now()
if not isinstance(value, datetime):
raise TypeError(f"Provided value is not datetime type. Actual type: {type(value)}.")
if hasattr(self, "_AbstractPacketRecord__transmission_time"):
raise ReassignmentError("Value of 'transmission_time' attribute cannot be changed once assigned.")
if value > time_now:
warn(message="Future time provided as `transmission_time` to a packet record. "
"Current time was used instead.",
category=RuntimeWarning)
value = time_now
self.__transmission_time = value
@property
def transmission_timestamp(self) -> float:
"""Timestamp when this packet was transmitted on a bus/network."""
return self.__transmission_timestamp
@transmission_timestamp.setter
def transmission_timestamp(self, value: float) -> None:
"""
Set timestamp value when this packet was transmitted on a bus/network.
:param value: Value of transmission timestamp to set.
:raise TypeError: Provided value is not float type.
:raise ReassignmentError: An attempt to change the value after object creation.
"""
timestamp_now = perf_counter()
if not isinstance(value, float):
raise TypeError(f"Provided value is not float type. Actual type: {type(value)}.")
if hasattr(self, "_AbstractPacketRecord__transmission_timestamp"):
raise ReassignmentError("Value of 'transmission_timestamp' attribute cannot be changed once assigned.")
if value > timestamp_now:
warn(message="Future timestamp provided as `transmission_timestamp` to a packet record. "
"Current timestamp was used instead.",
category=RuntimeWarning)
value = timestamp_now
self.__transmission_timestamp = value
[docs]
@staticmethod
@abstractmethod
def _validate_frame(value: Any) -> None:
"""
Validate a frame argument.
:param value: Value to validate.
"""
[docs]
@abstractmethod
def _validate_attributes(self) -> None:
"""Validate whether attributes that were set are a valid for a Packet record."""
PacketsContainersSequence = Sequence[AbstractPacketContainer]
"""Alias for a sequence filled with packet or packet record objects."""
PacketsTuple = Tuple[AbstractPacket, ...]
"""Alias for a packet objects tuple."""
PacketsRecordsTuple = Tuple[AbstractPacketRecord, ...]
"""Alias for a packet record objects tuple."""
PacketsRecordsSequence = Sequence[AbstractPacketRecord]
"""Alias for a packet record objects sequence."""