# Copyright 2019-2021, Digi International Inc.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from digi.xbee.models.mode import OperatingMode
from digi.xbee.models.options import XBeeLocalInterface
from digi.xbee.packets.aft import ApiFrameType
from digi.xbee.packets.base import XBeeAPIPacket, DictKeys
from digi.xbee.exception import InvalidOperatingModeException, InvalidPacketException
[docs]class UserDataRelayPacket(XBeeAPIPacket):
"""
This class represents a User Data Relay packet. Packet is built using the
parameters of the constructor.
The User Data Relay packet allows for data to come in on an interface with
a designation of the target interface for the data to be output on.
The destination interface must be one of the interfaces found in the
corresponding enumerator (see :class:`.XBeeLocalInterface`).
.. seealso::
| :class:`.UserDataRelayOutputPacket`
| :class:`.XBeeAPIPacket`
| :class:`.XBeeLocalInterface`
"""
__MIN_PACKET_LENGTH = 7
def __init__(self, frame_id, local_iface, data=None, op_mode=OperatingMode.API_MODE):
"""
Class constructor. Instantiates a new :class:`.UserDataRelayPacket`
object with the provided parameters.
Args:
frame_id (integer): the frame ID of the packet.
local_iface (:class:`.XBeeLocalInterface`): the destination interface.
data (Bytearray, optional): Data to send to the destination interface.
op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`):
The mode in which the frame was captured.
.. seealso::
| :class:`.XBeeAPIPacket`
| :class:`.XBeeLocalInterface`
Raises:
ValueError: if `local_interface` is `None`.
ValueError: if `frame_id` is less than 0 or greater than 255.
"""
if local_iface is None:
raise ValueError("Destination interface cannot be None")
if frame_id > 255 or frame_id < 0:
raise ValueError("frame_id must be between 0 and 255.")
super().__init__(ApiFrameType.USER_DATA_RELAY_REQUEST, op_mode=op_mode)
self._frame_id = frame_id
self.__local_iface = local_iface
self.__data = data
[docs] @staticmethod
def create_packet(raw, operating_mode):
"""
Override method.
Returns:
:class:`.UserDataRelayPacket`.
Raises:
InvalidPacketException: if the bytearray length is less than 7.
(start delim. + length (2 bytes) + frame type + frame id
+ relay interface + checksum = 7 bytes).
InvalidPacketException: if the length field of 'raw' is different
from its real length. (length field: bytes 2 and 3)
InvalidPacketException: if the first byte of 'raw' is not the
header byte. See :class:`.SpecialByte`.
InvalidPacketException: if the calculated checksum is different
from the checksum field value (last byte).
InvalidPacketException: if the frame type is not
:attr:`.ApiFrameType.USER_DATA_RELAY_REQUEST`.
InvalidOperatingModeException: if `operating_mode` is not supported.
.. seealso::
| :meth:`.XBeePacket.create_packet`
| :meth:`.XBeeAPIPacket._check_api_packet`
"""
if operating_mode not in (OperatingMode.ESCAPED_API_MODE,
OperatingMode.API_MODE):
raise InvalidOperatingModeException(op_mode=operating_mode)
XBeeAPIPacket._check_api_packet(raw, min_length=UserDataRelayPacket.__MIN_PACKET_LENGTH)
if raw[3] != ApiFrameType.USER_DATA_RELAY_REQUEST.code:
raise InvalidPacketException(message="This packet is not a user data relay packet.")
return UserDataRelayPacket(
raw[4], XBeeLocalInterface.get(raw[5]),
data=raw[6:-1] if len(raw) > UserDataRelayPacket.__MIN_PACKET_LENGTH else None,
op_mode=operating_mode)
[docs] def needs_id(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket.needs_id`
"""
return True
def _get_api_packet_spec_data(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket._get_api_packet_spec_data`
"""
ret = bytearray()
ret.append(self.__local_iface.code)
if self.__data is not None:
return ret + self.__data
return ret
def _get_api_packet_spec_data_dict(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict`
"""
return {DictKeys.DEST_INTERFACE: self.__local_iface.description,
DictKeys.DATA: list(self.__data) if self.__data is not None else None}
@property
def data(self):
"""
Returns the data to send.
Returns:
Bytearray: the data to send.
"""
if self.__data is None:
return None
return self.__data.copy()
@data.setter
def data(self, data):
"""
Sets the data to send.
Args:
data (Bytearray): the new data to send.
"""
if data is None:
self.__data = None
else:
self.__data = data.copy()
@property
def dest_interface(self):
"""
Returns the the destination interface.
Returns:
:class:`.XBeeLocalInterface`: the destination interface.
.. seealso::
| :class:`.XBeeLocalInterface`
"""
return self.__local_iface
@dest_interface.setter
def dest_interface(self, local_interface):
"""
Sets the destination interface.
Args:
local_interface (:class:`.XBeeLocalInterface`): the new destination interface.
.. seealso::
| :class:`.XBeeLocalInterface`
"""
self.__local_iface = local_interface
[docs]class UserDataRelayOutputPacket(XBeeAPIPacket):
"""
This class represents a User Data Relay Output packet. Packet is built
using the parameters of the constructor.
The User Data Relay Output packet can be received from any relay interface.
The source interface must be one of the interfaces found in the
corresponding enumerator (see :class:`.XBeeLocalInterface`).
.. seealso::
| :class:`.UserDataRelayPacket`
| :class:`.XBeeAPIPacket`
| :class:`.XBeeLocalInterface`
"""
__MIN_PACKET_LENGTH = 6
def __init__(self, local_iface, data=None, op_mode=OperatingMode.API_MODE):
"""
Class constructor. Instantiates a new
:class:`.UserDataRelayOutputPacket` object with the provided
parameters.
Args:
local_iface (:class:`.XBeeLocalInterface`): the source interface.
data (Bytearray, optional): Data received from the source interface.
op_mode (:class:`.OperatingMode`, optional, default=`OperatingMode.API_MODE`):
The mode in which the frame was captured.
Raises:
ValueError: if `local_interface` is `None`.
.. seealso::
| :class:`.XBeeAPIPacket`
| :class:`.XBeeLocalInterface`
"""
if local_iface is None:
raise ValueError("Source interface cannot be None")
super().__init__(ApiFrameType.USER_DATA_RELAY_OUTPUT, op_mode=op_mode)
self.__local_iface = local_iface
self.__data = data
[docs] @staticmethod
def create_packet(raw, operating_mode):
"""
Override method.
Returns:
:class:`.UserDataRelayOutputPacket`.
Raises:
InvalidPacketException: if the bytearray length is less than 6.
(start delim. + length (2 bytes) + frame type + relay interface
+ checksum = 6 bytes).
InvalidPacketException: if the length field of 'raw' is different
from its real length. (length field: bytes 2 and 3)
InvalidPacketException: if the first byte of 'raw' is not the
header byte. See :class:`.SpecialByte`.
InvalidPacketException: if the calculated checksum is different
from the checksum field value (last byte).
InvalidPacketException: if the frame type is not
:attr:`.ApiFrameType.USER_DATA_RELAY_OUTPUT`.
InvalidOperatingModeException: if `operating_mode` is not supported.
.. seealso::
| :meth:`.XBeePacket.create_packet`
| :meth:`.XBeeAPIPacket._check_api_packet`
"""
if operating_mode not in (OperatingMode.ESCAPED_API_MODE,
OperatingMode.API_MODE):
raise InvalidOperatingModeException(op_mode=operating_mode)
XBeeAPIPacket._check_api_packet(
raw, min_length=UserDataRelayOutputPacket.__MIN_PACKET_LENGTH)
if raw[3] != ApiFrameType.USER_DATA_RELAY_OUTPUT.code:
raise InvalidPacketException(
message="This packet is not a user data relay output packet.")
return UserDataRelayOutputPacket(
XBeeLocalInterface.get(raw[4]),
data=raw[5:-1] if len(raw) > UserDataRelayOutputPacket.__MIN_PACKET_LENGTH else None,
op_mode=operating_mode)
[docs] def needs_id(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket.needs_id`
"""
return False
def _get_api_packet_spec_data(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket._get_api_packet_spec_data`
"""
ret = bytearray()
ret.append(self.__local_iface.code)
if self.__data is not None:
return ret + self.__data
return ret
def _get_api_packet_spec_data_dict(self):
"""
Override method.
.. seealso::
| :meth:`.XBeeAPIPacket._get_api_packet_spec_data_dict`
"""
return {DictKeys.SOURCE_INTERFACE: self.__local_iface.description,
DictKeys.DATA: list(self.__data) if self.__data is not None else None}
@property
def data(self):
"""
Returns the received data.
Returns:
Bytearray: the received data.
"""
if self.__data is None:
return None
return self.__data.copy()
@data.setter
def data(self, data):
"""
Sets the received data.
Args:
data (Bytearray): the new received data.
"""
if data is None:
self.__data = None
else:
self.__data = data.copy()
@property
def src_interface(self):
"""
Returns the the source interface.
Returns:
:class:`.XBeeLocalInterface`: the source interface.
.. seealso::
| :class:`.XBeeLocalInterface`
"""
return self.__local_iface
@src_interface.setter
def src_interface(self, local_interface):
"""
Sets the source interface.
Args:
local_interface (:class:`.XBeeLocalInterface`): the new source interface.
.. seealso::
| :class:`.XBeeLocalInterface`
"""
self.__local_iface = local_interface