# Copyright 2020-2022, Digi International Inc.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import logging
import threading
from ipaddress import IPv4Address
from digi.xbee.exception import TimeoutException
from digi.xbee.models.address import XBee64BitAddress, XBee16BitAddress
from digi.xbee.models.atcomm import ATStringCommand
from digi.xbee.models.mode import OperatingMode
from digi.xbee.models.options import RemoteATCmdOptions
from digi.xbee.models.protocol import XBeeProtocol
from digi.xbee.models.status import ATCommandStatus
from digi.xbee.packets.aft import ApiFrameType
from digi.xbee.packets.base import XBeeAPIPacket
from digi.xbee.util import utils
[docs]class PacketSender:
"""
Class to send XBee packets.
"""
_LOG_PATTERN = "{comm_iface:s} - {event:s} - {opmode:s}: {content:s}"
"""
Pattern used to log packet events.
"""
_log = logging.getLogger(__name__)
"""
Logger.
"""
def __init__(self, xbee):
"""
Class constructor. Instantiates a new :class:`.PacketSender` object
with the provided parameters.
Args:
xbee (:class:`.XBeeDevice`): The XBee.
"""
self.__xbee = xbee
self._at_cmds_sent = {}
self._future_apply = {}
[docs] def send_packet(self, packet):
"""
Sends a packet to the XBee. The packet to send is escaped depending on
the current operating mode.
Args:
packet (:class:`.XBeePacket`): The packet to send.
Raises:
InvalidOperatingModeException: If the XBee device's operating mode
is not API or ESCAPED API. This method only checks the cached
value of the operating mode.
XBeeException: if the XBee device's communication interface is closed.
.. seealso::
| :class:`.XBeePacket`
"""
f_type = packet.get_frame_type()
# Do not allow to set a non API operating mode in the local XBee
if (f_type in (ApiFrameType.AT_COMMAND, ApiFrameType.AT_COMMAND_QUEUE)
and packet.parameter
and packet.command.upper() == ATStringCommand.AP.command
and not self.is_op_mode_valid(packet.parameter)):
return
comm_iface = self.__xbee.comm_iface
op_mode = self.__xbee.operating_mode
if self.__xbee._serial_port:
self.__xbee._update_tx_stats(packet)
out = packet.output(escaped=op_mode == OperatingMode.ESCAPED_API_MODE)
comm_iface.write_frame(out)
self._log.debug(self._LOG_PATTERN.format(comm_iface=str(comm_iface),
event="SENT",
opmode=op_mode,
content=utils.hex_to_string(out)))
# Refresh cached parameters if this method modifies some of them.
if self.__xbee.serial_port and f_type in (ApiFrameType.AT_COMMAND,
ApiFrameType.AT_COMMAND_QUEUE,
ApiFrameType.REMOTE_AT_COMMAND_REQUEST):
node = self.__xbee
# Get remote node in case of a remote at command
if (f_type == ApiFrameType.REMOTE_AT_COMMAND_REQUEST
and XBee64BitAddress.is_known_node_addr(packet.x64bit_dest_addr)):
node = self.__xbee.get_network().get_device_by_64(packet.x64bit_dest_addr)
# Store the sent AT command packet
if node:
if not node.get_64bit_addr():
return
key = str(node.get_64bit_addr())
if key not in self._at_cmds_sent:
self._at_cmds_sent[key] = {}
self._at_cmds_sent[key].update({packet.frame_id: packet})
[docs] def is_op_mode_valid(self, value):
"""
Returns `True` if the provided value is a valid operating mode for
the library.
Args:
value (Bytearray): The value to check.
Returns:
Boolean: `True` for a valid value, `False` otherwise.
"""
op_mode_value = utils.bytes_to_int(value)
op_mode = OperatingMode.get(op_mode_value)
if op_mode not in (OperatingMode.API_MODE,
OperatingMode.ESCAPED_API_MODE):
self._log.error(
"Operating mode '%d' (%s) not set not to loose XBee connection",
op_mode_value, op_mode.description if op_mode else "Unknown")
return False
return True
[docs] def at_response_received_cb(self, response):
"""
Callback to deal with AT command responses and update the
corresponding node. Only for internal use.
Args:
response (:class: `.XBeeAPIPacket`): The received API packet.
"""
f_type = response.get_frame_type()
if f_type not in (ApiFrameType.AT_COMMAND_RESPONSE,
ApiFrameType.REMOTE_AT_COMMAND_RESPONSE):
return
node = self.__xbee
# Get remote node in case of a remote at command
if (f_type == ApiFrameType.REMOTE_AT_COMMAND_RESPONSE
and XBee64BitAddress.is_known_node_addr(response.x64bit_source_addr)):
node = self.__xbee.get_network().get_device_by_64(response.x64bit_source_addr)
if not node:
return
key = str(node.get_64bit_addr())
requests = self._at_cmds_sent.get(key, {})
req = requests.pop(response.frame_id, None)
if not req or response.status != ATCommandStatus.OK:
return
def is_req_apply(at_req):
fr_type = at_req.get_frame_type()
return (at_req.command.upper() == ATStringCommand.AC.command
or fr_type == ApiFrameType.AT_COMMAND
or (fr_type == ApiFrameType.REMOTE_AT_COMMAND_REQUEST
and at_req.transmit_options & RemoteATCmdOptions.APPLY_CHANGES.value))
def is_node_info_param(at_pkt):
at_cmd = at_pkt.command.upper()
return at_cmd in (ATStringCommand.NI.command,
ATStringCommand.MY.command)
def is_port_param(at_pkt):
at_cmd = at_pkt.command.upper()
return at_cmd in (ATStringCommand.AP.command,
ATStringCommand.BD.command,
ATStringCommand.NB.command,
ATStringCommand.SB.command)
apply = is_req_apply(req)
if apply:
if key not in self._future_apply:
self._future_apply[key] = {}
node_fut_apply = self._future_apply.get(key, {})
node_fut_apply.pop(req.command.upper(), None)
for key, value in list(node_fut_apply.items()):
self._refresh_if_cached(node, key, value, apply=True)
if req.parameter and (is_port_param(req) or is_node_info_param(req)):
self._refresh_if_cached(node, req.command.upper(), req.parameter,
apply=apply)
def _refresh_if_cached(self, node, parameter, value, apply=True):
"""
Refreshes the proper cached parameter depending on `parameter` value.
If `parameter` is not a cached parameter, this method does nothing.
Args:
node (:class:`.AbstractXBeeDevice`): The XBee to refresh.
parameter (String): the parameter to refresh its value.
value (Bytearray): the new value of the parameter.
apply (Boolean, optional, default=`True`): `True` to apply
immediately, `False` otherwise.
"""
updated = False
param = parameter.upper()
key = str(node.get_64bit_addr())
if key not in self._future_apply:
self._future_apply[key] = {}
node_fut_apply = self._future_apply.get(key, {})
# Node identifier
if param == ATStringCommand.NI.command:
node_id = str(value, encoding='utf8', errors='ignore')
changed = node.get_node_id() != node_id
updated = changed and apply
if updated:
node._node_id = node_id
node_fut_apply.pop(param, None)
elif changed:
node_fut_apply.update({param: value})
# 16-bit address / IP address
elif param == ATStringCommand.MY.command:
if XBeeProtocol.is_ip_protocol(node.get_protocol()):
ip_addr = IPv4Address(utils.bytes_to_int(value))
changed = node.get_ip_addr() != ip_addr
updated = changed and apply
if updated:
node._ip_addr = ip_addr
node_fut_apply.pop(param, None)
elif changed:
node_fut_apply.update({param: value})
elif XBee16BitAddress.is_valid(value):
x16bit_addr = XBee16BitAddress(value)
changed = node.get_16bit_addr() != x16bit_addr
updated = changed and apply
if updated:
node._16bit_addr = x16bit_addr
node_fut_apply.pop(param, None)
elif changed:
node_fut_apply.update({param: value})
elif not node.is_remote():
updated = self._refresh_serial_params(node, param, value, apply=apply)
if updated:
network = node.get_local_xbee_device().get_network() if node.is_remote() \
else node.get_network()
if (network
and (not node.is_remote()
or network.get_device_by_64(node.get_64bit_addr())
or network.get_device_by_16(node.get_16bit_addr()))):
from digi.xbee.devices import NetworkEventType, NetworkEventReason
network._network_modified(
NetworkEventType.UPDATE, NetworkEventReason.READ_INFO, node=node)
def _refresh_serial_params(self, node, parameter, value, apply=True):
"""
Refreshes the proper cached parameter depending on `parameter` value.
If `parameter` is not a cached parameter, this method does nothing.
Args:
node (:class:`.AbstractXBeeDevice`): The XBee to refresh.
parameter (String): the parameter to refresh its value.
value (Bytearray): the new value of the parameter.
apply (Boolean, optional, default=`True`): `True` to apply
immediately, `False` otherwise.
Returns:
Boolean: `True` if a network event must be sent, `False` otherwise.
"""
node_fut_apply = self._future_apply.get(str(node.get_64bit_addr()), {})
if parameter == ATStringCommand.AP.command:
new_op_mode = OperatingMode.get(utils.bytes_to_int(value))
changed = bool(
new_op_mode != node.operating_mode
and new_op_mode in (OperatingMode.API_MODE,
OperatingMode.ESCAPED_API_MODE))
if changed and apply:
node._operating_mode = new_op_mode
node_fut_apply.pop(parameter, None)
elif changed:
node_fut_apply.update({parameter: value})
return changed and apply
if not node.serial_port or parameter not in (ATStringCommand.BD.command,
ATStringCommand.NB.command,
ATStringCommand.SB.command):
return False
if parameter == ATStringCommand.BD.command:
from digi.xbee.profile import FirmwareBaudrate
new_bd = utils.bytes_to_int(value)
baudrate = FirmwareBaudrate.get(new_bd)
new_bd = baudrate.baudrate if baudrate else new_bd
changed = new_bd != node.serial_port.baudrate
parameter = "baudrate" if changed and apply else parameter
value = new_bd if changed and apply else value
elif parameter == ATStringCommand.NB.command:
from digi.xbee.profile import FirmwareParity
new_parity = FirmwareParity.get(utils.bytes_to_int(value))
new_parity = new_parity.parity if new_parity else None
changed = new_parity != node.serial_port.parity
parameter = "parity" if changed and apply else parameter
value = new_parity if changed and apply else value
else:
from digi.xbee.profile import FirmwareStopbits
new_sbits = FirmwareStopbits.get(utils.bytes_to_int(value))
new_sbits = new_sbits.stop_bits if new_sbits else None
changed = new_sbits != node.serial_port.stopbits
parameter = "stopbits" if changed and apply else parameter
value = new_sbits if changed and apply else value
if changed and apply:
node.serial_port.apply_settings({parameter: value})
node_fut_apply.pop(parameter, None)
elif changed:
node_fut_apply.update({parameter: value})
return False
[docs]class SyncRequestSender:
"""
Class to synchronously send XBee packets. This means after sending
the packet it waits for its response, if the package includes a frame ID,
otherwise it does not wait.
"""
def __init__(self, xbee, packet_to_send, timeout):
"""
Class constructor. Instantiates a new :class:`.SyncRequestSender` object
with the provided parameters.
Args:
xbee (:class:`.XBeeDevice`): The local XBee to send the packet.
packet_to_send (:class:`.XBeePacket`): The packet to transmit.
timeout (Integer): Number of seconds to wait. -1 to wait indefinitely.
"""
self._xbee = xbee
self._packet = packet_to_send
self._timeout = timeout
self._lock = threading.Condition()
self._response_list = []
[docs] def send(self):
"""
Sends the packet and waits for its corresponding response.
Returns:
:class:`.XBeePacket`: Received response packet.
Raises:
InvalidOperatingModeException: If the XBee device's operating mode
is not API or ESCAPED API. This method only checks the cached
value of the operating mode.
TimeoutException: If the response is not received in the configured
timeout.
XBeeException: If the XBee device's communication interface is closed.
.. seealso::
| :class:`.XBeePacket`
"""
# Add the packet received callback.
if self._packet.needs_id():
self._xbee.add_packet_received_callback(self._packet_received_cb)
try:
# Send the packet.
self._xbee.send_packet(self._packet, sync=False)
if not self._packet.needs_id():
return None
# Wait for response or timeout.
self._lock.acquire()
if self._timeout == -1:
self._lock.wait()
else:
self._lock.wait(self._timeout)
self._lock.release()
# After waiting check if we received any response, if not throw a
# timeout exception.
if not self._response_list:
raise TimeoutException(
message="Response not received in the configured timeout.")
# Return the received packet.
return self._response_list[0]
finally:
# Always remove the packet listener from the list.
if self._packet.needs_id():
self._xbee.del_packet_received_callback(self._packet_received_cb)
@property
def xbee(self):
"""
Returns the local XBee to send the packet.
Returns:
:class:`.XBeeDevice`: Local XBee device.
"""
return self._xbee
@property
def packet(self):
"""
Returns the packet to send.
Returns:
:class:`.XBeePacket`: Packet to send.
"""
return self._packet
@property
def timeout(self):
"""
Returns the maximum number of seconds to wait for a response.
Returns:
Integer: Timeout to wait for a response.
"""
return self._timeout
def _packet_received_cb(self, rcv_packet):
"""
Callback to receive XBee packets. It filters the received packets to
find the response that corresponds to the sent packet: by id, by
command (for local or remote AT commands), by socket ID, etc.
Args:
rcv_packet (:class:`.XBeePacket`): Received packet.
"""
# Verify that the sent packet is not the received one!
# This can happen when the echo mode is enabled in the serial port.
if self._packet == rcv_packet:
return
if (not isinstance(self._packet, XBeeAPIPacket)
or not isinstance(rcv_packet, XBeeAPIPacket)):
return
# Check if it is the packet we are waiting for.
if (not rcv_packet.needs_id()
or rcv_packet.frame_id != self._packet.frame_id):
return
s_f_type = self._packet.get_frame_type()
r_f_type = rcv_packet.get_frame_type()
if s_f_type in (ApiFrameType.AT_COMMAND, ApiFrameType.AT_COMMAND_QUEUE):
received_response = self._is_valid_at_response(rcv_packet)
elif s_f_type == ApiFrameType.REMOTE_AT_COMMAND_REQUEST:
received_response = self._is_valid_remote_at_response(rcv_packet)
elif s_f_type in (ApiFrameType.TRANSMIT_REQUEST,
ApiFrameType.EXPLICIT_ADDRESSING):
received_response = (r_f_type == ApiFrameType.TRANSMIT_STATUS)
elif s_f_type in (ApiFrameType.TX_64, ApiFrameType.TX_16,
ApiFrameType.USER_DATA_RELAY_REQUEST):
# User data relay requests only receive a tx status frame for errors
# This means successful user data relay requests throw a
# TimeoutException using this method
received_response = (r_f_type == ApiFrameType.TX_STATUS)
elif s_f_type == ApiFrameType.FILE_SYSTEM_REQUEST:
received_response = self._is_valid_fs_response(rcv_packet)
elif s_f_type == ApiFrameType.REMOTE_FILE_SYSTEM_REQUEST:
# A remote file system request may receive 2 frames: the remote file
# system response and a transmit status
received_response = self._is_valid_remote_fs_response(rcv_packet)
elif s_f_type == ApiFrameType.SOCKET_CREATE:
received_response = (r_f_type == ApiFrameType.SOCKET_CREATE_RESPONSE)
elif s_f_type == ApiFrameType.SOCKET_OPTION_REQUEST:
received_response = self._is_valid_socket_opt_response(rcv_packet)
elif s_f_type == ApiFrameType.SOCKET_CONNECT:
received_response = self._is_valid_socket_conn_response(rcv_packet)
elif s_f_type == ApiFrameType.SOCKET_CLOSE:
received_response = self._is_valid_socket_close_response(rcv_packet)
elif s_f_type == ApiFrameType.SOCKET_BIND:
received_response = self._is_valid_socket_bind_response(rcv_packet)
elif s_f_type == ApiFrameType.REGISTER_JOINING_DEVICE:
received_response = (
r_f_type == ApiFrameType.REGISTER_JOINING_DEVICE_STATUS)
else:
received_response = True
if received_response:
# Add the received packet to the list and notify the lock.
self._lock.acquire()
self._response_list.append(rcv_packet)
self._lock.notify()
self._lock.release()
def _is_valid_at_response(self, packet):
"""
Checks if the provided packet is the AT command response packet that
matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the AT command response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is an AT command, verify that the received one is
# an AT command response and the command matches in both packets.
return (packet.get_frame_type() == ApiFrameType.AT_COMMAND_RESPONSE
and self._packet.command.upper() == packet.command.upper())
def _is_valid_remote_at_response(self, packet):
"""
Checks if the provided packet is the remote AT command response packet
that matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the remote AT command response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a remote AT command, verify that the received
# one is a remote AT command response and their commands match.
return (packet.get_frame_type() == ApiFrameType.REMOTE_AT_COMMAND_RESPONSE
and self._packet.command.upper() == packet.command.upper()
and (not XBee64BitAddress.is_known_node_addr(self._packet.x64bit_dest_addr)
or self._packet.x64bit_dest_addr == packet.x64bit_source_addr)
and (not XBee16BitAddress.is_known_node_addr(self._packet.x16bit_dest_addr)
or not XBee16BitAddress.is_known_node_addr(packet.x16bit_source_addr)
or self._packet.x16bit_dest_addr == packet.x16bit_source_addr))
def _is_valid_fs_response(self, packet):
"""
Checks if the provided packet is the file system response packet that
matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the file system response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a file system command, verify that the received
# one is a file system response and their commands match.
return (packet.get_frame_type() == ApiFrameType.FILE_SYSTEM_RESPONSE
and self._packet.command.type == packet.command.type)
def _is_valid_remote_fs_response(self, packet):
"""
Checks if the provided packet is the remote file system response packet
that matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the remote file system response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a remote file system command, verify that the
# received one is a remote file system response and their commands match.
return (packet.get_frame_type() == ApiFrameType.REMOTE_FILE_SYSTEM_RESPONSE
and self._packet.command.type == packet.command.type
and (not XBee64BitAddress.is_known_node_addr(self._packet.x64bit_dest_addr)
or self._packet.x64bit_dest_addr == packet.x64bit_source_addr))
def _is_valid_socket_opt_response(self, packet):
"""
Checks if the provided packet is the Socket Option Response packet
that matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the Socket Option Response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a Socket Option request, verify that the
# received one is a Socket Option response and their commands match.
return (packet.get_frame_type() == ApiFrameType.SOCKET_OPTION_RESPONSE
and self._packet.socket_id == packet.socket_id)
def _is_valid_socket_conn_response(self, packet):
"""
Checks if the provided packet is the Socket Connect Response packet
that matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the Socket Connect Response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a Socket Connect, verify that the received one
# is a Socket Connect Response and their socket IDs match.
return (packet.get_frame_type() == ApiFrameType.SOCKET_CONNECT_RESPONSE
and self._packet.socket_id == packet.socket_id)
def _is_valid_socket_close_response(self, packet):
"""
Checks if the provided packet is the Socket Close Response packet that
matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the Socket Close Response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a Socket Close, verify that the received one is
# a Socket Close Response and their socket IDs match.
return (packet.get_frame_type() == ApiFrameType.SOCKET_CLOSE_RESPONSE
and self._packet.socket_id == packet.socket_id)
def _is_valid_socket_bind_response(self, packet):
"""
Checks if the provided packet is the Socket Listen Response packet that
matches the sent package.
Args:
packet (:class:`.XBeeAPIPacket`): Packet to check.
Returns:
Boolean: `True` if packet is the Socket Listen Response packet
corresponding to the sent package, `False` otherwise.
"""
# If the sent packet is a Socket Bind, verify that the received one is
# a Socket Listen Response and their socket IDs match.
return (packet.get_frame_type() == ApiFrameType.SOCKET_LISTEN_RESPONSE
and self._packet.socket_id == packet.socket_id)