Source code for digi.xbee.packets.raw

# Copyright 2017-2022, Digi International Inc.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

from digi.xbee.packets.base import XBeeAPIPacket, DictKeys
from digi.xbee.models.address import XBee64BitAddress, XBee16BitAddress
from digi.xbee.models.status import TransmitStatus
from digi.xbee.exception import InvalidOperatingModeException, InvalidPacketException
from digi.xbee.packets.aft import ApiFrameType
from digi.xbee.models.mode import OperatingMode
from digi.xbee.io import IOSample, IOLine
from digi.xbee.util import utils


[docs]class TX64Packet(XBeeAPIPacket): """ This class represents a TX (Transmit) 64 Request packet. Packet is built using the parameters of the constructor or providing a valid byte array. A TX Request message will cause the module to transmit data as an RF Packet. .. seealso:: | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 15 def __init__(self, frame_id, x64bit_addr, tx_opts, rf_data=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a new :class:`.TX64Packet` object with the provided parameters. Args: frame_id (Integer): the frame ID of the packet. x64bit_addr (:class:`.XBee64BitAddress`): the 64-bit destination address. tx_opts (Integer): bitfield of supported transmission options. rf_data (Bytearray, optional): RF data that is sent to the destination device. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. .. seealso:: | :class:`.TransmitOptions` | :class:`.XBee64BitAddress` | :class:`.XBeeAPIPacket` Raises: ValueError: if `frame_id` is less than 0 or greater than 255. """ if frame_id < 0 or frame_id > 255: raise ValueError("Frame id must be between 0 and 255.") super().__init__(ApiFrameType.TX_64, op_mode=op_mode) self._frame_id = frame_id self.__x64bit_addr = x64bit_addr self.__tx_opts = tx_opts self.__data = rf_data
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.TX64Packet`. Raises: InvalidPacketException: if the bytearray length is less than 15. (start delim. + length (2 bytes) + frame type + frame id + 64bit addr. + transmit options + checksum = 15 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.TX_64`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in(OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=TX64Packet.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.TX_64.code: raise InvalidPacketException(message="This packet is not a TX 64 packet.") return TX64Packet(raw[4], XBee64BitAddress(raw[5:13]), raw[13], rf_data=raw[14:-1] if len(raw) > TX64Packet.__MIN_PACKET_LENGTH else None, op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return True
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = self.__x64bit_addr.address ret.append(self.__tx_opts) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ return {DictKeys.X64BIT_ADDR: self.__x64bit_addr.address, DictKeys.TRANSMIT_OPTIONS: self.__tx_opts, DictKeys.RF_DATA: self.__data} @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 8 # Remove 64-bit address @property def x64bit_dest_addr(self): """ Returns the 64-bit destination address. Returns: :class:`.XBee64BitAddress`: the 64-bit destination address. .. seealso:: | :class:`.XBee64BitAddress` """ return self.__x64bit_addr @x64bit_dest_addr.setter def x64bit_dest_addr(self, x64bit_addr): """ Sets the 64-bit destination address. Args: x64bit_addr (:class:`.XBee64BitAddress`): the new 64-bit destination address. .. seealso:: | :class:`.XBee64BitAddress` """ self.__x64bit_addr = x64bit_addr @property def transmit_options(self): """ Returns the transmit options bitfield. Returns: Integer: the transmit options bitfield. .. seealso:: | :class:`.TransmitOptions` """ return self.__tx_opts @transmit_options.setter def transmit_options(self, transmit_options): """ Sets the transmit options bitfield. Args: transmit_options (Integer): the new transmit options bitfield. .. seealso:: | :class:`.TransmitOptions` """ self.__tx_opts = transmit_options @property def rf_data(self): """ Returns the RF data to send. Returns: Bytearray: the RF data to send. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the RF data to send. Args: rf_data (Bytearray): the new RF data to send. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy()
[docs]class TX16Packet(XBeeAPIPacket): """ This class represents a TX (Transmit) 16 Request packet. Packet is built using the parameters of the constructor or providing a valid byte array. A TX request message will cause the module to transmit data as an RF packet. .. seealso:: | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 9 def __init__(self, frame_id, x16bit_addr, tx_opts, rf_data=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a new :class:`.TX16Packet` object with the provided parameters. Args: frame_id (Integer): the frame ID of the packet. x16bit_addr (:class:`.XBee16BitAddress`): the 16-bit destination address. tx_opts (Integer): bitfield of supported transmission options. rf_data (Bytearray, optional): RF data that is sent to the destination device. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. .. seealso:: | :class:`.TransmitOptions` | :class:`.XBee16BitAddress` | :class:`.XBeeAPIPacket` Raises: ValueError: if `frame_id` is less than 0 or greater than 255. """ if frame_id < 0 or frame_id > 255: raise ValueError("Frame id must be between 0 and 255.") super().__init__(ApiFrameType.TX_16, op_mode=op_mode) self._frame_id = frame_id self.__x16bit_addr = x16bit_addr self.__tx_opts = tx_opts self.__data = rf_data
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.TX16Packet`. Raises: InvalidPacketException: if the bytearray length is less than 9. (start delim. + length (2 bytes) + frame type + frame id + 16bit addr. + transmit options + checksum = 9 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.TX_16`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=TX16Packet.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.TX_16.code: raise InvalidPacketException(message="This packet is not a TX 16 packet.") return TX16Packet(raw[4], XBee16BitAddress(raw[5:7]), raw[7], rf_data=raw[8:-1] if len(raw) > TX16Packet.__MIN_PACKET_LENGTH else None, op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return True
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = self.__x16bit_addr.address ret.append(self.__tx_opts) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ return {DictKeys.X16BIT_ADDR: self.__x16bit_addr, DictKeys.TRANSMIT_OPTIONS: self.__tx_opts, DictKeys.RF_DATA: self.__data} @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 2 # Remove 16-bit address @property def x16bit_dest_addr(self): """ Returns the 16-bit destination address. Returns: :class:`.XBee16BitAddress`: the 16-bit destination address. .. seealso:: | :class:`.XBee16BitAddress` """ return self.__x16bit_addr @x16bit_dest_addr.setter def x16bit_dest_addr(self, x16bit_addr): """ Sets the 16-bit destination address. Args: x16bit_addr (:class:`.XBee16BitAddress`): the new 16-bit destination address. .. seealso:: | :class:`.XBee16BitAddress` """ self.__x16bit_addr = x16bit_addr @property def transmit_options(self): """ Returns the transmit options bitfield. Returns: Integer: the transmit options bitfield. .. seealso:: | :class:`.TransmitOptions` """ return self.__tx_opts @transmit_options.setter def transmit_options(self, transmit_options): """ Sets the transmit options bitfield. Args: transmit_options (Integer): the new transmit options bitfield. .. seealso:: | :class:`.TransmitOptions` """ self.__tx_opts = transmit_options @property def rf_data(self): """ Returns the RF data to send. Returns: Bytearray: the RF data to send. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the RF data to send. Args: rf_data (Bytearray): the new RF data to send. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy()
[docs]class TXStatusPacket(XBeeAPIPacket): """ This class represents a TX (Transmit) status packet. Packet is built using the parameters of the constructor or providing a valid API payload. When a TX request is completed, the module sends a TX status message. This message will indicate if the packet was transmitted successfully or if there was a failure. .. seealso:: | :class:`.TX16Packet` | :class:`.TX64Packet` | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 7 def __init__(self, frame_id, tx_status, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a new :class:`.TXStatusPacket` object with the provided parameters. Args: frame_id (Integer): the frame ID of the packet. tx_status (:class:`.TransmitStatus`): transmit status. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. Raises: ValueError: if `frame_id` is less than 0 or greater than 255. .. seealso:: | :class:`.TransmitStatus` | :class:`.XBeeAPIPacket` """ if frame_id < 0 or frame_id > 255: raise ValueError("Frame id must be between 0 and 255.") super().__init__(ApiFrameType.TX_STATUS, op_mode=op_mode) self._frame_id = frame_id self.__tx_status = tx_status
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.TXStatusPacket`. Raises: InvalidPacketException: if the bytearray length is less than 7. (start delim. + length (2 bytes) + frame type + frame id + transmit status + checksum = 7 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.TX_16`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=TXStatusPacket.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.TX_STATUS.code: raise InvalidPacketException(message="This packet is not a TX status packet.") return TXStatusPacket(raw[4], TransmitStatus.get(raw[5]), op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return True
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ return utils.int_to_bytes(self.__tx_status.code, num_bytes=1) def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ return {DictKeys.TS_STATUS: self.__tx_status} @property def transmit_status(self): """ Returns the transmit status. Returns: :class:`.TransmitStatus`: the transmit status. .. seealso:: | :class:`.TransmitStatus` """ return self.__tx_status @transmit_status.setter def transmit_status(self, transmit_status): """ Sets the transmit status. Args: transmit_status (:class:`.TransmitStatus`): the new transmit status to set. .. seealso:: | :class:`.TransmitStatus` """ self.__tx_status = transmit_status
[docs]class RX64Packet(XBeeAPIPacket): """ This class represents an RX (Receive) 64 request packet. Packet is built using the parameters of the constructor or providing a valid API byte array. When the module receives an RF packet, it is sent out the UART using this message type. This packet is the response to TX (transmit) 64 request packets. .. seealso:: | :class:`.ReceiveOptions` | :class:`.TX64Packet` | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 15 def __init__(self, x64bit_addr, rssi, rx_opts, rf_data=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a :class:`.RX64Packet` object with the provided parameters. Args: x64bit_addr (:class:`.XBee64BitAddress`): the 64-bit source address. rssi (Integer): received signal strength indicator. rx_opts (Integer): bitfield indicating the receive options. rf_data (Bytearray, optional): received RF data. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. .. seealso:: | :class:`.ReceiveOptions` | :class:`.XBee64BitAddress` | :class:`.XBeeAPIPacket` """ super().__init__(ApiFrameType.RX_64, op_mode=op_mode) self.__x64bit_addr = x64bit_addr self.__rssi = rssi self.__rx_opts = rx_opts self.__data = rf_data
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.RX64Packet` Raises: InvalidPacketException: if the bytearray length is less than 15. (start delim. + length (2 bytes) + frame type + 64bit addr. + rssi + receive options + checksum = 15 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.RX_64`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=RX64Packet.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.RX_64.code: raise InvalidPacketException(message="This packet is not an RX 64 packet.") return RX64Packet(XBee64BitAddress(raw[4:12]), raw[12], raw[13], rf_data=raw[14:-1] if len(raw) > RX64Packet.__MIN_PACKET_LENGTH else None, op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return False
[docs] def is_broadcast(self): """ Override method. .. seealso:: | :meth:`XBeeAPIPacket.is_broadcast` """ return (utils.is_bit_enabled(self.__rx_opts, 1) or utils.is_bit_enabled(self.__rx_opts, 2))
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = self.__x64bit_addr.address ret.append(self.__rssi) ret.append(self.__rx_opts) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ return {DictKeys.X64BIT_ADDR: self.__x64bit_addr, DictKeys.RSSI: self.__rssi, DictKeys.RECEIVE_OPTIONS: self.__rx_opts, DictKeys.RF_DATA: self.__data} @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 8 # Remove 64-bit address @property def x64bit_source_addr(self): """ Returns the 64-bit source address. Returns: :class:`.XBee64BitAddress`: the 64-bit source address. .. seealso:: | :class:`.XBee64BitAddress` """ return self.__x64bit_addr @x64bit_source_addr.setter def x64bit_source_addr(self, x64bit_addr): """ Sets the 64-bit source address. Args: x64bit_addr (:class:`.XBee64BitAddress`): the new 64-bit source address. .. seealso:: | :class:`.XBee64BitAddress` """ self.__x64bit_addr = x64bit_addr @property def rssi(self): """ Returns the received Signal Strength Indicator (RSSI). Returns: Integer: the received Signal Strength Indicator (RSSI). """ return self.__rssi @rssi.setter def rssi(self, rssi): """ Sets the received Signal Strength Indicator (RSSI). Args: rssi (Integer): the new received Signal Strength Indicator (RSSI). """ self.__rssi = rssi @property def receive_options(self): """ Returns the receive options bitfield. Returns: Integer: the receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ return self.__rx_opts @receive_options.setter def receive_options(self, receive_options): """ Sets the receive options bitfield. Args: receive_options (Integer): the new receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ self.__rx_opts = receive_options @property def rf_data(self): """ Returns the received RF data. Returns: Bytearray: the received RF data. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the received RF data. Args: rf_data (Bytearray): the new received RF data. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy()
[docs]class RX16Packet(XBeeAPIPacket): """ This class represents an RX (Receive) 16 Request packet. Packet is built using the parameters of the constructor or providing a valid API byte array. When the module receives an RF packet, it is sent out the UART using this message type This packet is the response to TX (Transmit) 16 Request packets. .. seealso:: | :class:`.ReceiveOptions` | :class:`.TX16Packet` | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 9 def __init__(self, x16bit_addr, rssi, rx_opts, rf_data=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a :class:`.RX16Packet` object with the provided parameters. Args: x16bit_addr (:class:`.XBee16BitAddress`): the 16-bit source address. rssi (Integer): received signal strength indicator. rx_opts (Integer): bitfield indicating the receive options. rf_data (Bytearray, optional): received RF data. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. .. seealso:: | :class:`.ReceiveOptions` | :class:`.XBee16BitAddress` | :class:`.XBeeAPIPacket` """ super().__init__(ApiFrameType.RX_16, op_mode=op_mode) self.__x16bit_addr = x16bit_addr self.__rssi = rssi self.__rx_opts = rx_opts self.__data = rf_data
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.RX16Packet`. Raises: InvalidPacketException: if the bytearray length is less than 9. (start delim. + length (2 bytes) + frame type + 16bit addr. + rssi + receive options + checksum = 9 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.RX_16`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=RX16Packet.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.RX_16.code: raise InvalidPacketException(message="This packet is not an RX 16 Packet") return RX16Packet(XBee16BitAddress(raw[4:6]), raw[6], raw[7], rf_data=raw[8:-1] if len(raw) > RX16Packet.__MIN_PACKET_LENGTH else None, op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return False
[docs] def is_broadcast(self): """ Override method. .. seealso:: | :meth:`XBeeAPIPacket.is_broadcast` """ return (utils.is_bit_enabled(self.__rx_opts, 1) or utils.is_bit_enabled(self.__rx_opts, 2))
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = self.__x16bit_addr.address ret.append(self.__rssi) ret.append(self.__rx_opts) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ return {DictKeys.X16BIT_ADDR: self.__x16bit_addr, DictKeys.RSSI: self.__rssi, DictKeys.RECEIVE_OPTIONS: self.__rx_opts, DictKeys.RF_DATA: self.__data} @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 2 # Remove 16-bit address @property def x16bit_source_addr(self): """ Returns the 16-bit source address. Returns: :class:`.XBee16BitAddress`: the 16-bit source address. .. seealso:: | :class:`.XBee16BitAddress` """ return self.__x16bit_addr @x16bit_source_addr.setter def x16bit_source_addr(self, x16bit_addr): """ Sets the 16-bit source address. Args: x16bit_addr (:class:`.XBee16BitAddress`): the new 16-bit source address. .. seealso:: | :class:`.XBee16BitAddress` """ self.__x16bit_addr = x16bit_addr @property def rssi(self): """ Returns the received Signal Strength Indicator (RSSI). Returns: Integer: the received Signal Strength Indicator (RSSI). """ return self.__rssi @rssi.setter def rssi(self, rssi): """ Sets the received Signal Strength Indicator (RSSI). Args: rssi (Integer): the new received Signal Strength Indicator (RSSI). """ self.__rssi = rssi @property def receive_options(self): """ Returns the receive options bitfield. Returns: Integer: the receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ return self.__rx_opts @receive_options.setter def receive_options(self, receive_options): """ Sets the receive options bitfield. Args: receive_options (Integer): the new receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ self.__rx_opts = receive_options @property def rf_data(self): """ Returns the received RF data. Returns: Bytearray: the received RF data. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the received RF data. Args: rf_data (Bytearray): the new received RF data. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy()
[docs]class RX64IOPacket(XBeeAPIPacket): """ This class represents an RX64 address IO packet. Packet is built using the parameters of the constructor or providing a valid API payload. I/O data is sent out the UART using an API frame. .. seealso:: | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 20 def __init__(self, x64bit_addr, rssi, rx_opts, data, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates an :class:`.RX64IOPacket` object with the provided parameters. Args: x64bit_addr (:class:`.XBee64BitAddress`): the 64-bit source address. rssi (Integer): received signal strength indicator. rx_opts (Integer): bitfield indicating the receive options. data (Bytearray): received RF data. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. .. seealso:: | :class:`.ReceiveOptions` | :class:`.XBee64BitAddress` | :class:`.XBeeAPIPacket` """ super().__init__(ApiFrameType.RX_IO_64, op_mode=op_mode) self.__x64bit_addr = x64bit_addr self.__rssi = rssi self.__rs_opts = rx_opts self.__data = data self.__io_sample = IOSample(data) if data is not None and len(data) >= 5 else None
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.RX64IOPacket`. Raises: InvalidPacketException: if the bytearray length is less than 20. (start delim. + length (2 bytes) + frame type + 64bit addr. + rssi + receive options + rf data (5 bytes) + checksum = 20 bytes) InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.RX_IO_64`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=RX64IOPacket.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.RX_IO_64.code: raise InvalidPacketException(message="This packet is not an RX 64 IO packet.") return RX64IOPacket(XBee64BitAddress(raw[4:12]), raw[12], raw[13], raw[14:-1], op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return False
[docs] def is_broadcast(self): """ Override method. .. seealso:: | :meth:`XBeeAPIPacket.is_broadcast` """ return (utils.is_bit_enabled(self.__rs_opts, 1) or utils.is_bit_enabled(self.__rs_opts, 2))
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = self.__x64bit_addr.address ret.append(self.__rssi) ret.append(self.__rs_opts) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ base = {DictKeys.X16BIT_ADDR: self.__x64bit_addr.address, DictKeys.RSSI: self.__rssi, DictKeys.RECEIVE_OPTIONS: self.__rs_opts} if self.__io_sample is not None: base[DictKeys.NUM_SAMPLES] = 1 base[DictKeys.DIGITAL_MASK] = self.__io_sample.digital_mask base[DictKeys.ANALOG_MASK] = self.__io_sample.analog_mask # Digital values for i in range(16): if self.__io_sample.has_digital_value(IOLine.get(i)): base[IOLine.get(i).description + "digital value"] = \ utils.hex_to_string(self.__io_sample.get_digital_value(IOLine.get(i))) # Analog values for i in range(6): if self.__io_sample.has_analog_value(IOLine.get(i)): base[IOLine.get(i).description + "analog value"] = \ utils.hex_to_string(self.__io_sample.get_analog_value(IOLine.get(i))) # Power supply if self.__io_sample.has_power_supply_value(): base["Power supply value "] = "%02X" % self.__io_sample.power_supply_value elif self.__data is not None: base[DictKeys.RF_DATA] = utils.hex_to_string(self.__data) return base @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 8 # Remove 64-bit address @property def x64bit_source_addr(self): """ Returns the 64-bit source address. Returns: :class:`XBee64BitAddress`: the 64-bit source address. .. seealso:: | :class:`.XBee64BitAddress` """ return self.__x64bit_addr @x64bit_source_addr.setter def x64bit_source_addr(self, x64bit_addr): """ Sets the 64-bit source address. Args: x64bit_addr (:class:`.XBee64BitAddress`): the new 64-bit source address. .. seealso:: | :class:`.XBee64BitAddress` """ self.__x64bit_addr = x64bit_addr @property def rssi(self): """ Returns the received Signal Strength Indicator (RSSI). Returns: Integer: the received Signal Strength Indicator (RSSI). """ return self.__rssi @rssi.setter def rssi(self, rssi): """ Sets the received Signal Strength Indicator (RSSI). Args: rssi (Integer): the new received Signal Strength Indicator (RSSI). """ self.__rssi = rssi @property def receive_options(self): """ Returns the receive options bitfield. Returns: Integer: the receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ return self.__rs_opts @receive_options.setter def receive_options(self, receive_options): """ Sets the receive options bitfield. Args: receive_options (Integer): the new receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ self.__rs_opts = receive_options @property def rf_data(self): """ Returns the received RF data. Returns: Bytearray: the received RF data. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the received RF data. Args: rf_data (Bytearray): the new received RF data. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy() # Modify the ioSample accordingly if rf_data is not None and len(rf_data) >= 5: self.__io_sample = IOSample(self.__data) else: self.__io_sample = None @property def io_sample(self): """ Returns the IO sample corresponding to the data contained in the packet. Returns: :class:`.IOSample`: the IO sample of the packet, `None` if the packet has not any data or if the sample could not be generated correctly. .. seealso:: | :class:`.IOSample` """ return self.__io_sample @io_sample.setter def io_sample(self, io_sample): """ Sets the IO sample of the packet. Args: io_sample (:class:`.IOSample`): the new IO sample to set. .. seealso:: | :class:`.IOSample` """ self.__io_sample = io_sample
[docs]class RX16IOPacket(XBeeAPIPacket): """ This class represents an RX16 address IO packet. Packet is built using the parameters of the constructor or providing a valid byte array. I/O data is sent out the UART using an API frame. .. seealso:: | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 14 def __init__(self, x16bit_addr, rssi, rx_opts, data, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates an :class:`.RX16IOPacket` object with the provided parameters. Args: x16bit_addr (:class:`.XBee16BitAddress`): the 16-bit source address. rssi (Integer): received signal strength indicator. rx_opts (Integer): bitfield indicating the receive options. data (Bytearray): received RF data. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. .. seealso:: | :class:`.ReceiveOptions` | :class:`.XBee16BitAddress` | :class:`.XBeeAPIPacket` """ super().__init__(ApiFrameType.RX_IO_16, op_mode=op_mode) self.__x16bit_addr = x16bit_addr self.__rssi = rssi self.__rx_opts = rx_opts self.__data = data self.__io_sample = IOSample(data) if data is not None and len(data) >= 5 else None
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.RX16IOPacket`. Raises: InvalidPacketException: if the bytearray length is less than 14. (start delim. + length (2 bytes) + frame type + 16bit addr. + rssi + receive options + rf data (5 bytes) + checksum = 14 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is different from :attr:`.ApiFrameType.RX_IO_16`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet(raw, min_length=RX16IOPacket.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.RX_IO_16.code: raise InvalidPacketException(message="This packet is not an RX 16 IO packet.") return RX16IOPacket(XBee16BitAddress(raw[4:6]), raw[6], raw[7], raw[8:-1], op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return False
[docs] def is_broadcast(self): """ Override method. .. seealso:: | :meth:`XBeeAPIPacket.is_broadcast` """ return (utils.is_bit_enabled(self.__rx_opts, 1) or utils.is_bit_enabled(self.__rx_opts, 2))
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = self.__x16bit_addr.address ret.append(self.__rssi) ret.append(self.__rx_opts) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ base = {DictKeys.X16BIT_ADDR: self.__x16bit_addr.address, DictKeys.RSSI: self.__rssi, DictKeys.RECEIVE_OPTIONS: self.__rx_opts} if self.__io_sample is not None: base[DictKeys.NUM_SAMPLES] = 1 base[DictKeys.DIGITAL_MASK] = self.__io_sample.digital_mask base[DictKeys.ANALOG_MASK] = self.__io_sample.analog_mask # Digital values for i in range(16): if self.__io_sample.has_digital_value(IOLine.get(i)): base[IOLine.get(i).description + "digital value"] = \ utils.hex_to_string(self.__io_sample.get_digital_value(IOLine.get(i))) # Analog values for i in range(6): if self.__io_sample.has_analog_value(IOLine.get(i)): base[IOLine.get(i).description + "analog value"] = \ utils.hex_to_string(self.__io_sample.get_analog_value(IOLine.get(i))) # Power supply if self.__io_sample.has_power_supply_value(): base["Power supply value "] = "%02X" % self.__io_sample.power_supply_value elif self.__data is not None: base[DictKeys.RF_DATA] = utils.hex_to_string(self.__data) return base @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 2 # Remove 16-bit address @property def x16bit_source_addr(self): """ Returns the 16-bit source address. Returns: :class:`.XBee16BitAddress`: the 16-bit source address. .. seealso:: | :class:`.XBee16BitAddress` """ return self.__x16bit_addr @x16bit_source_addr.setter def x16bit_source_addr(self, x16bit_addr): """ Sets the 16-bit source address. Args: x16bit_addr (:class:`.XBee16BitAddress`): the new 16-bit source address. .. seealso:: | :class:`.XBee16BitAddress` """ self.__x16bit_addr = x16bit_addr @property def rssi(self): """ Returns the received Signal Strength Indicator (RSSI). Returns: Integer: the received Signal Strength Indicator (RSSI). """ return self.__rssi @rssi.setter def rssi(self, rssi): """ Sets the received Signal Strength Indicator (RSSI). Args: rssi (Integer): the new received Signal Strength Indicator (RSSI). """ self.__rssi = rssi @property def receive_options(self): """ Returns the receive options bitfield. Returns: Integer: the receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ return self.__rx_opts @receive_options.setter def receive_options(self, receive_options): """ Sets the receive options bitfield. Args: receive_options (Integer): the new receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ self.__rx_opts = receive_options @property def rf_data(self): """ Returns the received RF data. Returns: Bytearray: the received RF data. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the received RF data. Args: rf_data (Bytearray): the new received RF data. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy() # Modify the ioSample accordingly if rf_data is not None and len(rf_data) >= 5: self.__io_sample = IOSample(self.__data) else: self.__io_sample = None @property def io_sample(self): """ Returns the IO sample corresponding to the data contained in the packet. Returns: :class:`.IOSample`: the IO sample of the packet, `None` if the packet has not any data or if the sample could not be generated correctly. .. seealso:: | :class:`.IOSample` """ return self.__io_sample @io_sample.setter def io_sample(self, io_sample): """ Sets the IO sample of the packet. Args: io_sample (:class:`.IOSample`): the new IO sample to set. .. seealso:: | :class:`.IOSample` """ self.__io_sample = io_sample