Source code for digi.xbee.packets.wifi

# Copyright 2017-2022, Digi International Inc.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from ipaddress import IPv4Address

from digi.xbee.packets.base import XBeeAPIPacket, DictKeys
from digi.xbee.util import utils
from digi.xbee.exception import InvalidOperatingModeException, InvalidPacketException
from digi.xbee.packets.aft import ApiFrameType
from digi.xbee.models.mode import OperatingMode
from digi.xbee.models.status import ATCommandStatus
from digi.xbee.io import IOSample, IOLine


[docs]class IODataSampleRxIndicatorWifiPacket(XBeeAPIPacket): """ This class represents a IO data sample RX indicator (Wi-Fi) packet. Packet is built using the parameters of the constructor or providing a valid API payload. When the module receives an IO sample frame from a remote device, it sends the sample out the UART or SPI using this frame type. Only modules running API mode will be able to receive IO samples. Among received data, some options can also be received indicating transmission parameters. .. seealso:: | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 16 def __init__(self, src_address, rssi, rx_options, rf_data=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a new :class:`.IODataSampleRxIndicatorWifiPacket` object with the provided parameters. Args: src_address (:class:`ipaddress.IPv4Address`): the 64-bit source address. rssi (Integer): received signal strength indicator. rx_options (Integer): bitfield indicating the receive options. rf_data (Bytearray, optional): received RF data. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. Raises: ValueError: if `rf_data` is not `None` and it's not valid for create an :class:`.IOSample`. .. seealso:: | :class:`.IOSample` | :class:`ipaddress.IPv4Address` | :class:`.ReceiveOptions` | :class:`.XBeeAPIPacket` """ super().__init__(ApiFrameType.IO_DATA_SAMPLE_RX_INDICATOR_WIFI, op_mode=op_mode) self.__src_addr = src_address self.__rssi = rssi self.__rx_opts = rx_options self.__data = rf_data self.__io_sample = IOSample(rf_data) if rf_data is not None and len(rf_data) >= 5 else None
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.IODataSampleRxIndicatorWifiPacket`. Raises: InvalidPacketException: if the bytearray length is less than 16. (start delim. + length (2 bytes) + frame type + source addr. (4 bytes) + rssi + receive options + rf data (5 bytes) + checksum = 16 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is not :attr:`.ApiFrameType.IO_DATA_SAMPLE_RX_INDICATOR_WIFI`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet( raw, min_length=IODataSampleRxIndicatorWifiPacket.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.IO_DATA_SAMPLE_RX_INDICATOR_WIFI.code: raise InvalidPacketException( message="This packet is not an IO data sample RX indicator Wi-Fi packet.") return IODataSampleRxIndicatorWifiPacket( IPv4Address(bytes(raw[4:8])), raw[7], raw[8], rf_data=raw[9:-1], op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return False
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = bytearray(self.__src_addr.packed) ret += utils.int_to_bytes(self.__rssi, num_bytes=1) ret += utils.int_to_bytes(self.__rx_opts, num_bytes=1) if self.__data is not None: ret += self.__data return ret def _get_api_packet_spec_data_dict(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ base = {DictKeys.SRC_IPV4_ADDR: "%s (%s)" % (self.__src_addr.packed, self.__src_addr.exploded), DictKeys.RSSI: self.__rssi, DictKeys.RECEIVE_OPTIONS: self.__rx_opts} if self.__io_sample is not None: base[DictKeys.NUM_SAMPLES] = 1 base[DictKeys.DIGITAL_MASK] = self.__io_sample.digital_mask base[DictKeys.ANALOG_MASK] = self.__io_sample.analog_mask # Digital values for i in range(16): if self.__io_sample.has_digital_value(IOLine.get(i)): base[IOLine.get(i).description + " digital value"] = \ self.__io_sample.get_digital_value(IOLine.get(i)).name # Analog values for i in range(6): if self.__io_sample.has_analog_value(IOLine.get(i)): base[IOLine.get(i).description + " analog value"] = \ self.__io_sample.get_analog_value(IOLine.get(i)) # Power supply if self.__io_sample.has_power_supply_value(): base["Power supply value "] = "%02X" % self.__io_sample.power_supply_value elif self.__data is not None: base[DictKeys.RF_DATA] = utils.hex_to_string(self.__data) return base @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 8 # Remove 64-bit address @property def source_address(self): """ Returns the IPv4 address of the source device. Returns: :class:`ipaddress.IPv4Address`: the IPv4 address of the source device. .. seealso:: | :class:`ipaddress.IPv4Address` """ return self.__src_addr @source_address.setter def source_address(self, source_address): """ Sets the IPv4 source address. Args: source_address (:class:`ipaddress.IPv4Address`): The new IPv4 source address. .. seealso:: | :class:`ipaddress.IPv4Address` """ if source_address is not None: self.__src_addr = source_address @property def rssi(self): """ Returns the received Signal Strength Indicator (RSSI). Returns: Integer: the received Signal Strength Indicator (RSSI). """ return self.__rssi @rssi.setter def rssi(self, rssi): """ Sets the received Signal Strength Indicator (RSSI). Args: rssi (Integer): the new received Signal Strength Indicator (RSSI). """ self.__rssi = rssi @property def receive_options(self): """ Returns the receive options bitfield. Returns: Integer: the receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ return self.__rx_opts @receive_options.setter def receive_options(self, receive_options): """ Sets the receive options bitfield. Args: receive_options (Integer): the new receive options bitfield. .. seealso:: | :class:`.ReceiveOptions` """ self.__rx_opts = receive_options @property def rf_data(self): """ Returns the received RF data. Returns: Bytearray: the received RF data. """ if self.__data is None: return None return self.__data.copy() @rf_data.setter def rf_data(self, rf_data): """ Sets the received RF data. Args: rf_data (Bytearray): the new received RF data. """ if rf_data is None: self.__data = None else: self.__data = rf_data.copy() # Modify the IO sample accordingly if rf_data is not None and len(rf_data) >= 5: self.__io_sample = IOSample(self.__data) else: self.__io_sample = None @property def io_sample(self): """ Returns the IO sample corresponding to the data contained in the packet. Returns: :class:`.IOSample`: the IO sample of the packet, `None` if the packet has not any data or if the sample could not be generated correctly. .. seealso:: | :class:`.IOSample` """ return self.__io_sample @io_sample.setter def io_sample(self, io_sample): """ Sets the IO sample of the packet. Args: io_sample (:class:`.IOSample`): the new IO sample to set. .. seealso:: | :class:`.IOSample` """ self.__io_sample = io_sample
[docs]class RemoteATCommandWifiPacket(XBeeAPIPacket): """ This class represents a remote AT command request (Wi-Fi) packet. Packet is built using the parameters of the constructor or providing a valid API payload. Used to query or set module parameters on a remote device. For parameter changes on the remote device to take effect, changes must be applied, either by setting the apply changes options bit, or by sending an `AC` command to the remote node. Remote command options are set as a bitfield. If configured, command response is received as a :class:`.RemoteATCommandResponseWifiPacket`. .. seealso:: | :class:`.RemoteATCommandResponseWifiPacket` | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 17 def __init__(self, frame_id, dest_address, tx_options, command, parameter=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a new :class:`.RemoteATCommandWifiPacket` object with the provided parameters. Args: frame_id (integer): the frame ID of the packet. dest_address (:class:`ipaddress.IPv4Address`): the IPv4 address of the destination device. tx_options (Integer): bitfield of supported transmission options. command (String): AT command to send. parameter (Bytearray, optional): AT command parameter. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. Raises: ValueError: if `frame_id` is less than 0 or greater than 255. ValueError: if length of `command` is different than 2. .. seealso:: | :class:`ipaddress.IPv4Address` | :class:`.RemoteATCmdOptions` | :class:`.XBeeAPIPacket` """ if frame_id < 0 or frame_id > 255: raise ValueError("Frame id must be between 0 and 255.") if len(command) != 2: raise ValueError("Invalid command " + command) super().__init__(ApiFrameType.REMOTE_AT_COMMAND_REQUEST_WIFI, op_mode=op_mode) self._frame_id = frame_id self.__dest_addr = dest_address self.__tx_opts = tx_options self.__cmd = command self.__param = parameter
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.RemoteATCommandWifiPacket` Raises: InvalidPacketException: if the Bytearray length is less than 17. (start delim. + length (2 bytes) + frame type + frame id + dest. addr. (8 bytes) + transmit options + command (2 bytes) + checksum = 17 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is not :attr:`.ApiFrameType.REMOTE_AT_COMMAND_REQUEST_WIFI`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet( raw, min_length=RemoteATCommandWifiPacket.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.REMOTE_AT_COMMAND_REQUEST_WIFI.code: raise InvalidPacketException( message="This packet is not a remote AT command request Wi-Fi packet.") return RemoteATCommandWifiPacket( raw[4], IPv4Address(bytes(raw[9:13])), raw[13], raw[14:16].decode("utf8"), parameter=raw[16:-1] if len(raw) > RemoteATCommandWifiPacket.__MIN_PACKET_LENGTH else None, op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return True
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = bytearray(self.__dest_addr.packed) ret += utils.int_to_bytes(self.__tx_opts, num_bytes=1) ret += bytearray(self.__cmd, "utf8") if self.__param is not None: ret += self.__param return ret def _get_api_packet_spec_data_dict(self): """ Override method. See: :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict` """ return {DictKeys.DEST_IPV4_ADDR: "%s (%s)" % (self.__dest_addr.packed, self.__dest_addr.exploded), DictKeys.TRANSMIT_OPTIONS: self.__tx_opts, DictKeys.COMMAND: self.__cmd, DictKeys.PARAMETER: list(self.__param) if self.__param is not None else None} @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 8 # Destination address @property def dest_address(self): """ Returns the IPv4 address of the destination device. Returns: :class:`ipaddress.IPv4Address`: the IPv4 address of the destination device. .. seealso:: | :class:`ipaddress.IPv4Address` """ return self.__dest_addr @dest_address.setter def dest_address(self, dest_address): """ Sets the IPv4 destination address. Args: dest_address (:class:`ipaddress.IPv4Address`): The new IPv4 destination address. .. seealso:: | :class:`ipaddress.IPv4Address` """ if dest_address is not None: self.__dest_addr = dest_address @property def transmit_options(self): """ Returns the transmit options bitfield. Returns: Integer: the transmit options bitfield. .. seealso:: | :class:`.RemoteATCmdOptions` """ return self.__tx_opts @transmit_options.setter def transmit_options(self, transmit_options): """ Sets the transmit options bitfield. Args: transmit_options (Integer): the new transmit options bitfield. .. seealso:: | :class:`.RemoteATCmdOptions` """ self.__tx_opts = transmit_options @property def command(self): """ Returns the AT command. Returns: String: the AT command. """ return self.__cmd @command.setter def command(self, command): """ Sets the AT command. Args: command (String): the new AT command. """ self.__cmd = command @property def parameter(self): """ Returns the AT command parameter. Returns: Bytearray: the AT command parameter. """ return self.__param @parameter.setter def parameter(self, parameter): """ Sets the AT command parameter. Args: parameter (Bytearray): the new AT command parameter. """ self.__param = parameter
[docs]class RemoteATCommandResponseWifiPacket(XBeeAPIPacket): """ This class represents a remote AT command response (Wi-Fi) packet. Packet is built using the parameters of the constructor or providing a valid API payload. If a module receives a remote command response RF data frame in response to a Remote AT Command Request, the module will send a Remote AT Command Response message out the UART. Some commands may send back multiple frames for example, Node Discover (`ND`) command. This packet is received in response of a :class:`.RemoteATCommandPacket`. Response also includes an :class:`.ATCommandStatus` object with the status of the AT command. .. seealso:: | :class:`.RemoteATCommandWifiPacket` | :class:`.ATCommandStatus` | :class:`.XBeeAPIPacket` """ __MIN_PACKET_LENGTH = 17 def __init__(self, frame_id, src_address, command, resp_status, comm_value=None, op_mode=OperatingMode.API_MODE): """ Class constructor. Instantiates a new :class:`.RemoteATCommandResponseWifiPacket` object with the provided parameters. Args: frame_id (Integer): the frame ID of the packet. src_address (:class:`ipaddress.IPv4Address`): the IPv4 address of the source device. command (String): the AT command of the packet. Must be a string. resp_status (:class:`.ATCommandStatus`): the status of the AT command. comm_value (Bytearray, optional): the AT command response value. op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`): The mode in which the frame was captured. Raises: ValueError: if `frame_id` is less than 0 or greater than 255. ValueError: if length of `command` is different than 2. .. seealso:: | :class:`.ATCommandStatus` | :class:`ipaddress.IPv4Address` """ if frame_id > 255 or frame_id < 0: raise ValueError("frame_id must be between 0 and 255.") if len(command) != 2: raise ValueError("Invalid command " + command) super().__init__(ApiFrameType.REMOTE_AT_COMMAND_RESPONSE_WIFI, op_mode=op_mode) self._frame_id = frame_id self.__src_addr = src_address self.__cmd = command self.__resp_status = resp_status self.__comm_val = comm_value
[docs] @staticmethod def create_packet(raw, operating_mode): """ Override method. Returns: :class:`.RemoteATCommandResponseWifiPacket`. Raises: InvalidPacketException: if the bytearray length is less than 17. (start delim. + length (2 bytes) + frame type + frame id + source addr. (8 bytes) + command (2 bytes) + receive options + checksum = 17 bytes). InvalidPacketException: if the length field of 'raw' is different from its real length. (length field: bytes 2 and 3) InvalidPacketException: if the first byte of 'raw' is not the header byte. See :class:`.SpecialByte`. InvalidPacketException: if the calculated checksum is different from the checksum field value (last byte). InvalidPacketException: if the frame type is not :attr:`.ApiFrameType.REMOTE_AT_COMMAND_RESPONSE_WIFI`. InvalidOperatingModeException: if `operating_mode` is not supported. .. seealso:: | :meth:`.XBeePacket.create_packet` | :meth:`.XBeeAPIPacket._check_api_packet` """ if operating_mode not in (OperatingMode.ESCAPED_API_MODE, OperatingMode.API_MODE): raise InvalidOperatingModeException(op_mode=operating_mode) XBeeAPIPacket._check_api_packet( raw, min_length=RemoteATCommandResponseWifiPacket.__MIN_PACKET_LENGTH) if raw[3] != ApiFrameType.REMOTE_AT_COMMAND_RESPONSE_WIFI.code: raise InvalidPacketException( message="This packet is not a remote AT command response Wi-Fi packet.") return RemoteATCommandResponseWifiPacket( raw[4], IPv4Address(bytes(raw[9:13])), raw[13:15].decode("utf8"), ATCommandStatus.get(raw[15]), comm_value=raw[16:-1] if len(raw) > RemoteATCommandResponseWifiPacket.__MIN_PACKET_LENGTH else None, op_mode=operating_mode)
[docs] def needs_id(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.needs_id` """ return True
def _get_api_packet_spec_data(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket._get_api_packet_spec_data` """ ret = bytearray(self.__src_addr.packed) ret += bytearray(self.__cmd, "utf8") ret += utils.int_to_bytes(self.__resp_status.code, num_bytes=1) if self.__comm_val is not None: ret += self.__comm_val return ret def _get_api_packet_spec_data_dict(self): return {DictKeys.SRC_IPV4_ADDR: "%s (%s)" % (self.__src_addr.packed, self.__src_addr.exploded), DictKeys.COMMAND: self.__cmd, DictKeys.AT_CMD_STATUS: self.__resp_status, DictKeys.RF_DATA: list(self.__comm_val) if self.__comm_val is not None else None} @property def effective_len(self): """ Override method. .. seealso:: | :meth:`.XBeeAPIPacket.effective_len` """ return len(self) - 8 # Remove source address @property def source_address(self): """ Returns the IPv4 address of the source device. Returns: :class:`ipaddress.IPv4Address`: the IPv4 address of the source device. .. seealso:: | :class:`ipaddress.IPv4Address` """ return self.__src_addr @source_address.setter def source_address(self, source_address): """ Sets the IPv4 source address. Args: source_address (:class:`ipaddress.IPv4Address`): The new IPv4 source address. .. seealso:: | :class:`ipaddress.IPv4Address` """ if source_address is not None: self.__src_addr = source_address @property def command(self): """ Returns the AT command of the packet. Returns: String: the AT command of the packet. """ return self.__cmd @command.setter def command(self, command): """ Sets the AT command of the packet. Args: command (String): the new AT command of the packet. Must have length = 2. Raises: ValueError: if length of `command` is different than 2. """ if len(command) != 2: raise ValueError("Invalid command " + command) self.__cmd = command @property def status(self): """ Returns the AT command response status of the packet. Returns: :class:`.ATCommandStatus`: the AT command response status of the packet. .. seealso:: | :class:`.ATCommandStatus` """ return self.__resp_status @status.setter def status(self, response_status): """ Sets the AT command response status of the packet Args: response_status (:class:`.ATCommandStatus`) : the new AT command response status of the packet. .. seealso:: | :class:`.ATCommandStatus` """ self.__resp_status = response_status @property def command_value(self): """ Returns the AT command response value. Returns: Bytearray: the AT command response value. """ return self.__comm_val @command_value.setter def command_value(self, comm_value): """ Sets the AT command response value. Args: comm_value (Bytearray): the new AT command response value. """ self.__comm_val = comm_value