Source code for digi.xbee.packets.digimesh
# Copyright 2019-2021, Digi International Inc.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
from digi.xbee.exception import InvalidOperatingModeException, InvalidPacketException
from digi.xbee.models.address import XBee64BitAddress
from digi.xbee.models.mode import OperatingMode
from digi.xbee.packets.aft import ApiFrameType
from digi.xbee.packets.base import XBeeAPIPacket, DictKeys
from digi.xbee.util import utils
[docs]class RouteInformationPacket(XBeeAPIPacket):
"""
This class represents a DigiMesh Route Information packet. Packet is built
using the parameters of the constructor or providing a valid API
payload.
A Route Information Packet can be output for DigiMesh unicast transmissions
on which the NACK enable or the Trace Route enable TX option is enabled.
.. seealso::
| :class:`.XBeeAPIPacket`
"""
__MIN_PACKET_LENGTH = 46
def __init__(self, src_event, timestamp, ack_timeout_count, tx_block_count,
dst_addr, src_addr, responder_addr, successor_addr,
additional_data=None, op_mode=OperatingMode.API_MODE):
"""
Class constructor. Instantiates a new
:class:`.RouteInformationPacket` object with the provided
parameters.
Args:
src_event (Integer): Source event identifier.
0x11=NACK, 0x12=Trace route
timestamp (Integer): System timer value on the node generating the
this packet. The timestamp is in microseconds.
ack_timeout_count (Integer): The number of MAC ACK timeouts.
tx_block_count (Integer): The number of times the transmission was
blocked due to reception in progress.
dst_addr (:class:`.XBee64BitAddress`): The 64-bit address of the
final destination node of this network-level transmission.
src_addr (:class:`.XBee64BitAddress`): The 64-bit address of the
source node of this network-level transmission.
responder_addr (:class:`.XBee64BitAddress`): The 64-bit address of
the node that generates this packet after it sends (or attempts
to send) the packet to the next hop (successor node).
successor_addr (:class:`.XBee64BitAddress`): The 64-bit address of
the next node after the responder in the route towards the
destination, whether or not the packet arrived successfully at
the successor node.
additional_data (Bytearray, optional, default=`None`): Additional
data of the packet.
op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`):
The mode in which the frame was captured.
Raises:
ValueError: if `src_event` is not 0x11 or 0x12.
ValueError: if `timestamp` is not between 0 and 0xFFFFFFFF.
ValueError: if `ack_timeout_count` or `tx_block_count` are not
between 0 and 255.
.. seealso::
| :class:`.XBee64BitAddress`
| :class:`.XBeeAPIPacket`
"""
if src_event not in [0x11, 0x12]:
raise ValueError("Source event must be 0x11 or 0x12.")
if timestamp < 0 or timestamp > 0xFFFFFFFF: # 4 bytes
raise ValueError("Timestamp must be between 0 and %d." % 0xFFFFFFFF)
if ack_timeout_count < 0 or ack_timeout_count > 0xFF: # 1 byte
raise ValueError("ACK timeout count must be between 0 and 255")
if tx_block_count < 0 or tx_block_count > 0xFF: # 1 byte
raise ValueError("TX blocked count must be between 0 and 255")
super().__init__(ApiFrameType.DIGIMESH_ROUTE_INFORMATION, op_mode=op_mode)
self.__src_event = src_event
self.__timestamp = timestamp
self.__ack_timeout_count = ack_timeout_count
self.__tx_block_count = tx_block_count
self._reserved = 0
self.__dst_addr = dst_addr
self.__src_addr = src_addr
self.__responder_addr = responder_addr
self.__successor_addr = successor_addr
self.__additional_data = additional_data
[docs] @staticmethod
def create_packet(raw, operating_mode):
"""
Override method.
Returns:
:class:`.RouteInformationPacket`.
Raises:
InvalidPacketException: If the bytearray length is less than 46.
(start delim. + length (2 bytes) + frame type + src_event
+ length + timestamp (4 bytes) + ack timeout count
+ tx blocked count + reserved + dest addr (8 bytes)
+ src addr (8 bytes) + responder addr (8 bytes)
+ successor addr (8 bytes) + checksum = 46 bytes).
InvalidPacketException: If the length field of `raw` is different
from its real length. (length field: bytes 1 and 3)
InvalidPacketException: If the first byte of 'raw' is not the
header byte. See :class:`.SpecialByte`.
InvalidPacketException: If the calculated checksum is different
from the checksum field value (last byte).
InvalidPacketException: If the frame type is not
:attr:`.ApiFrameType.DIGIMESH_ROUTE_INFORMATION`.
InvalidPacketException: If the internal length byte of the rest
of the frame (without the checksum) is different from its real
length.
InvalidOperatingModeException: If `operating_mode` is not
supported.
.. seealso::
| :meth:`.XBeePacket.create_packet`
| :meth:`.XBeeAPIPacket._check_api_packet`
"""
if operating_mode not in (OperatingMode.ESCAPED_API_MODE,
OperatingMode.API_MODE):
raise InvalidOperatingModeException(
operating_mode.name + " is not supported.")
XBeeAPIPacket._check_api_packet(
raw, min_length=RouteInformationPacket.__MIN_PACKET_LENGTH)
if raw[3] != ApiFrameType.DIGIMESH_ROUTE_INFORMATION.code:
raise InvalidPacketException(
"This packet is not a Route Information packet.")
# 7: frame len starting from this byte (index 5) and without the checksum
if raw[5] != len(raw) - 7:
raise InvalidPacketException("Length does not match with the data length")
additional_data = []
if len(raw) > RouteInformationPacket.__MIN_PACKET_LENGTH:
additional_data = raw[45:]
packet = RouteInformationPacket(
raw[4], utils.bytes_to_int(raw[6:10]), raw[10], raw[11],
XBee64BitAddress(raw[13:21]), XBee64BitAddress(raw[21:29]),
XBee64BitAddress(raw[29:37]), XBee64BitAddress(raw[37:45]),
additional_data, op_mode=operating_mode)
packet._reserved = raw[12]
return packet
[docs] def needs_id(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket.needs_id`
"""
return False
def _get_api_packet_spec_data(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket._get_api_packet_spec_data`
"""
ret = bytearray([self.__src_event])
ret.append(self.length)
ret += utils.int_to_bytes(self.__timestamp, num_bytes=4)
ret.append(self.__ack_timeout_count)
ret.append(self.__tx_block_count)
ret.append(self._reserved)
ret += self.__dst_addr.address
ret += self.__src_addr.address
ret += self.__responder_addr.address
ret += self.__successor_addr.address
if self.__additional_data:
ret += self.__additional_data
return ret
def _get_api_packet_spec_data_dict(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict`
"""
return {DictKeys.SRC_EVENT: self.__src_event,
DictKeys.LENGTH: self.length,
DictKeys.TIMESTAMP: self.__timestamp,
DictKeys.ACK_TIMEOUT_COUNT: self.__ack_timeout_count,
DictKeys.TX_BLOCKED_COUNT: self.__tx_block_count,
DictKeys.DEST_ADDR: self.__dst_addr,
DictKeys.SRC_ADDR: self.__src_addr,
DictKeys.RESPONDER_ADDR: self.__responder_addr,
DictKeys.SUCCESSOR_ADDR: self.__successor_addr,
DictKeys.ADDITIONAL_DATA: self.__additional_data}
@property
def src_event(self):
"""
Returns the source event.
Returns:
Integer: The source event.
"""
return self.__src_event
@src_event.setter
def src_event(self, src_event):
"""
Sets the source event identifier. 0x11=NACK, 0x12=Trace route
Args:
src_event (Integer): The new source event.
Raises:
ValueError: if `src_event` is not 0x11 or 0x12.
"""
if src_event not in [0x11, 0x12]:
raise ValueError("Source event must be 0x11 or 0x12.")
self.__src_event = src_event
@property
def length(self):
"""
Returns the number of bytes that follow, excluding the checksum.
Returns:
Integer: Data length.
"""
# len: len(additional_data) + 4 MACS + timestamp (4 bytes) + 3 bytes
return len(self.__additional_data) + 8 * 4 + 4 + 3
@property
def timestamp(self):
"""
Returns the system timer value on the node generating this package.
The timestamp is in microseconds.
Returns:
Integer: The system timer value in microseconds.
"""
return self.__timestamp
@timestamp.setter
def timestamp(self, timestamp):
"""
Sets the system timer value on the node generating this package.
The timestamp is in microseconds.
Args:
timestamp (Integer): The number of microseconds.
Raises:
ValueError: if `timestamp` is not between 0 and 0xFFFFFFFF.
"""
if timestamp < 0 or timestamp > 0xFFFFFFFF: # 4 bytes
raise ValueError("Timestamp must be between 0 and %d." % 0xFFFFFFFF)
self.__timestamp = timestamp
@property
def ack_timeout_count(self):
"""
Returns the number of MAC ACK timeouts that occur.
Returns:
Integer: The number of MAC ACK timeouts that occur.
"""
return self.__ack_timeout_count
@ack_timeout_count.setter
def ack_timeout_count(self, ack_timeout_count):
"""
Sets the number of MAC ACK timeouts that occur.
Args:
ack_timeout_count (Integer): The number of MAC ACK timeouts that occur.
Raises:
ValueError: if `ack_timeout_count` is not between 0 and 255.
"""
if ack_timeout_count < 0 or ack_timeout_count > 0xFF: # 1 byte
raise ValueError("ACK timeout count must be between 0 and 255")
self.__ack_timeout_count = ack_timeout_count
@property
def tx_block_count(self):
"""
Returns the number of times the transmission was blocked due to reception
in progress.
Returns:
Integer: The number of times the transmission was blocked due to
reception in progress.
"""
return self.__tx_block_count
@tx_block_count.setter
def tx_block_count(self, tx_block_count):
"""
Sets the number of times the transmission was blocked due to reception
in progress.
Args:
tx_block_count (Integer): The number of times the transmission was
blocked due to reception in progress.
Raises:
ValueError: if `tx_block_count` is not between 0 and 255.
"""
if tx_block_count < 0 or tx_block_count > 0xFF: # 1 byte
raise ValueError("TX blocked count must be between 0 and 255")
self.__tx_block_count = tx_block_count
@property
def dst_addr(self):
"""
Returns the 64-bit source address.
Returns:
:class:`.XBee64BitAddress`: The 64-bit address of the final
destination node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
return self.__dst_addr
@dst_addr.setter
def dst_addr(self, dst_addr):
"""
Sets the 64-bit address of the final destination node of this
network-level transmission.
Args:
dst_addr (:class:`.XBee64BitAddress`): The new 64-bit address of the
final destination node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
self.__dst_addr = dst_addr
@property
def src_addr(self):
"""
Returns the 64-bit address of the source node of this network-level
transmission.
Returns:
:class:`.XBee64BitAddress`: The 64-bit address of the source node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
return self.__src_addr
@src_addr.setter
def src_addr(self, src_addr):
"""
Sets the 64-bit address of the source node of this network-level
transmission.
Args:
src_addr (:class:`.XBee64BitAddress`): The new 64-bit address of the
source node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
self.__src_addr = src_addr
@property
def responder_addr(self):
"""
Returns the 64-bit address of the node that generates this packet after
it sends (or attempts to send) the packet to the next hop (successor node).
Returns:
:class:`.XBee64BitAddress`: The 64-bit address of the responder node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
return self.__responder_addr
@responder_addr.setter
def responder_addr(self, responder_addr):
"""
Sets the 64-bit address of the node that generates this packet after it
sends (or attempts to send) the packet to the next hop (successor node).
Args:
responder_addr (:class:`.XBee64BitAddress`): The new 64-bit address
of the responder node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
self.__responder_addr = responder_addr
@property
def successor_addr(self):
"""
Returns the 64-bit address of the next node after the responder in the
route towards the destination, whether or not the packet arrived
successfully at the successor node.
Returns:
:class:`.XBee64BitAddress`: The 64-bit address of the successor node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
return self.__successor_addr
@successor_addr.setter
def successor_addr(self, successor_addr):
"""
Sets the 64-bit address of the next node after the responder in the
route towards the destination, whether or not the packet arrived
successfully at the successor node.
Args:
successor_addr (:class:`.XBee64BitAddress`): The new 64-bit address
of the successor node.
.. seealso::
| :class:`.XBee64BitAddress`
"""
self.__successor_addr = successor_addr