Source code for digi.xbee.filesystem

# Copyright 2019-2021, Digi International Inc.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import functools
import logging
import os
import re
import string
import threading
import time
from abc import ABCMeta, abstractmethod
from enum import Enum
from os import listdir
from os.path import isfile
from pathlib import PurePosixPath
from serial.serialutil import SerialException

from digi.xbee.exception import XBeeException, OperationNotSupportedException
from digi.xbee.models.atcomm import ATStringCommand
from digi.xbee.models.filesystem import FSCmd, GetPathIdCmdRequest, \
    CreateDirCmdRequest, OpenDirCmdRequest, DeleteCmdRequest, VolStatCmdRequest, \
    VolFormatCmdRequest, HashFileCmdRequest, ReadDirCmdRequest, \
    OpenFileCmdRequest, CloseFileCmdRequest, ReadFileCmdRequest, \
    WriteFileCmdRequest, CloseDirCmdRequest, RenameCmdRequest
from digi.xbee.models.hw import HardwareVersion
from digi.xbee.models.options import TransmitOptions, DirResponseFlag, FileOpenRequestOption
from digi.xbee.models.protocol import XBeeProtocol
from digi.xbee.models.status import TransmitStatus, FSCommandStatus
from digi.xbee.packets.filesystem import RemoteFSRequestPacket, FSRequestPacket
from digi.xbee.util import xmodem, utils
from digi.xbee.util.xmodem import XModemException

_ANSWER_ATFS = "AT%s" % ATStringCommand.FS.command
_ANSWER_SHA256 = "sha256"

_COMMAND_AT = "AT\r"
_COMMAND_ATFS = "AT%s %s" % (ATStringCommand.FS.command, "%s\r")
_COMMAND_FILE_SYSTEM = "AT%s\r" % ATStringCommand.FS.command
_COMMAND_MODE_ANSWER_OK = "OK"
_COMMAND_MODE_CHAR = "+"
_COMMAND_MODE_EXIT = "AT%s\r" % ATStringCommand.CN.command
_COMMAND_MODE_TIMEOUT = 2

_ERROR_CONNECT_FILESYSTEM = "Error connecting file system manager: %s"
_ERROR_ENTER_CMD_MODE = "Could not enter AT command mode"
_ERROR_EXECUTE_COMMAND = "Error executing command '%s': %s"
_ERROR_FUNCTION_NOT_SUPPORTED = "Function not supported: %s"
_ERROR_TIMEOUT = "Timeout executing command"
ERROR_FILESYSTEM_NOT_SUPPORTED = "The device does not support file system feature"

_FORMAT_TIMEOUT = 10  # Seconds.

_FUNCTIONS_SEPARATOR = " "

_GUARD_TIME = 2  # In seconds.

_NAK_TIMEOUT = 10  # Seconds.

_PATH_SEPARATOR = "/"
_PATTERN_FILE_SYSTEM_DIRECTORY = "^ +<DIR> (.+)/$"
_PATTERN_FILE_SYSTEM_ERROR = "^(.*\\s)?(E[A-Z0-9]+)( .*)?\\s*$"
_PATTERN_FILE_SYSTEM_FILE = "^ +([0-9]+) (.+)$"
_PATTERN_FILE_SYSTEM_FUNCTIONS = "^.*AT%s %s" % (ATStringCommand.FS.command, "commands: (.*)$")
_PATTERN_FILE_SYSTEM_INFO = "^ *([0-9]*) (.*)$"

_READ_BUFFER = 256
_READ_DATA_TIMEOUT = 3  # Seconds.
_READ_EMPTY_DATA_RETRIES = 10
_READ_EMPTY_DATA_RETRIES_DEFAULT = 1
_READ_PORT_TIMEOUT = 0.05  # Seconds.

_SECURE_ELEMENT_SUFFIX = "#"

REMOTE_SUPPORTED_HW_VERSIONS = (HardwareVersion.XBEE3.code,
                                HardwareVersion.XBEE3_SMT.code,
                                HardwareVersion.XBEE3_TH.code)
LOCAL_SUPPORTED_HW_VERSIONS = REMOTE_SUPPORTED_HW_VERSIONS \
                              + (HardwareVersion.XBEE3_RR.code,
                                 HardwareVersion.XBEE3_RR_TH.code)

# Update this value when File System API frames are supported
XB3_MIN_FW_VERSION_FS_API_SUPPORT = {
    XBeeProtocol.ZIGBEE: 0x10FF,
    XBeeProtocol.DIGI_MESH: 0x30FF,
    XBeeProtocol.RAW_802_15_4: 0x20FF
}

# Update this values when the File System OTA support is deprecated
XB3_MAX_FW_VERSION_FS_OTA_SUPPORT = {
    XBeeProtocol.ZIGBEE: 0x10FF,
    XBeeProtocol.DIGI_MESH: 0x30FF,
    XBeeProtocol.RAW_802_15_4: 0x20FF
}

_DEFAULT_BLOCK_SIZE = 64

_TRANSFER_TIMEOUT = 5  # Seconds.

_log = logging.getLogger(__name__)
_printable_ascii_bytes = string.printable.encode(encoding='utf8')


class _FilesystemFunction(Enum):
    """
    This class lists the available file system functions for XBee devices.

    | Inherited properties:
    |     **name** (String): The name of this _FilesystemFunction.
    |     **value** (Integer): The ID of this _FilesystemFunction.
    """
    PWD = ("PWD", "pwd")
    CD = ("CD", "cd %s")
    MD = ("MD", "md %s")
    LS = ("LS", "ls")
    LS_DIR = ("LS", "ls %s")
    PUT = ("PUT", "put %s")
    XPUT = ("XPUT", "xput %s")
    GET = ("GET", "get %s")
    MV = ("MV", "mv %s %s")
    RM = ("RM", "rm %s")
    HASH = ("HASH", "hash %s")
    INFO = ("INFO", "info")
    FORMAT = ("FORMAT", "format confirm")

    def __init__(self, name, command):
        self.__name = name
        self.__command = command

    @classmethod
    def get(cls, name):
        """
        Returns the `_FilesystemFunction` for the given name.

        Args:
            name (String): Name of the `_FilesystemFunction` to get.

        Returns:
            :class:`._FilesystemFunction`: `_FilesystemFunction` with the given
                name, `None` if there is not a `_FilesystemFunction` with the
                provided name.
        """
        for value in _FilesystemFunction:
            if value.cmd_name == name:
                return value

        return None

    @property
    def cmd_name(self):
        """
        Returns the name of the `_FilesystemFunction` element.

        Returns:
            String: Name of the `_FilesystemFunction` element.
        """
        return self.__name

    @property
    def command(self):
        """
        Returns the command of the `_FilesystemFunction` element.

        Returns:
            String: Command of the `_FilesystemFunction` element.
        """
        return self.__command


[docs]class FileSystemElement: """ Class used to represent XBee file system elements (files and directories). """ def __init__(self, name, path=None, is_dir=False, size=0, is_secure=False): """ Class constructor. Instantiates a new :class:`.FileSystemElement` object with the given parameters. Args: name (String or bytearray): Name of the file system element. path (String or bytearray, optional, default=`None`): Absolute path of the element. is_dir (Boolean, optional, default=`True`): `True` if the element is a directory, `False` for a file. size (Integer, optional, default=0): Element size in bytes. Only for files. is_secure (Boolean, optional, default=`False`): `True` for a secure element, `False` otherwise. Raises: ValueError: If any of the parameters are invalid. """ if not name or not isinstance(name, (str, bytearray, bytes)): raise ValueError("Name must be a non-empty string or bytearray") if not isinstance(size, int): raise ValueError("Size must be a integer") if path and not isinstance(path, (str, bytearray, bytes)): raise ValueError("Path must be a string or bytearray") if isinstance(name, str): self._name = name.encode('utf8', errors='ignore') else: self._name = name if isinstance(path, str): self._path = path.encode('utf8', errors='ignore') else: self._path = path if path is not None else bytearray() self._is_dir = is_dir self._size = size if not is_dir else 0 self._is_secure = is_secure def __str__(self): return "{:s} {:10s} {:25s} {:s}".format( "d" if self._is_dir else "*" if self._is_secure else "-", self.size_pretty, self.name, self.path) @property def name(self): """ Returns the file system element name. Returns: String: File system element name. """ return self._name.decode(encoding='utf8', errors='ignore') @property def path(self): """ Returns the file system element absolute path. Returns: String: File system element absolute path. """ return self._path.decode(encoding='utf8', errors='ignore') @path.setter def path(self, element_path): """ Sets the file system element absolute path. Args: element_path (String): File system element absolute path. """ self._path = element_path @property def is_dir(self): """ Returns whether the file system element is a directory. Returns: Boolean: `True` for a directory, `False` otherwise. """ return self._is_dir @property def size(self): """ Returns the size in bytes of the element. Returns: Integer: The size in bytes of the file, 0 for a directory. """ return self._size @property def size_pretty(self): """ Returns a human readable size (e.g., 1K 234M 2G). Returns: String: Human readable size. """ units = [(1 << 50, 'PB'), (1 << 40, 'TB'), (1 << 30, 'GB'), (1 << 20, 'MB'), (1 << 10, 'KB'), (1, 'B')] factor, suffix = units[len(units) - 1] for factor, suffix in units: if self._size >= factor: break amount = round(self._size / factor, 2) return "%5.2f%s" % (amount, suffix) @property def is_secure(self): """ Returns whether the element is secure. Returns: Boolean: `True` for a secure element, `False` otherwise. """ return self._is_secure
[docs] @staticmethod def from_data(name, size, flags, path=None): """ Creates a file element from its name and the bytearray with info and size. Args: name (String or bytearray): The name of the element to create. size (Bytearray): Byte array containing file size. flags (Integer): Integer with file system element information. path (String or bytearray, optional, default=`None`): The absolute path of the element (without its name). Returns: :class:`.FileSystemElement`: The new file system element. """ return FileSystemElement( name, path=path, is_dir=bool(flags & DirResponseFlag.IS_DIR), size=utils.bytes_to_int(size), is_secure=bool(flags & DirResponseFlag.IS_SECURE))
[docs]class FileSystemException(XBeeException): """ This exception will be thrown when any problem related with the XBee file system occurs. All functionality of this class is the inherited from `Exception <https://docs.python.org/2/library/exceptions.html?highlight=exceptions.exception#exceptions.Exception>`_. """ def __init__(self, message, fs_status=None): super().__init__(message) self.status = fs_status
[docs]class FileSystemNotSupportedException(FileSystemException): """ This exception will be thrown when the file system feature is not supported in the device. All functionality of this class is the inherited from `Exception <https://docs.python.org/2/library/exceptions.html?highlight=exceptions.exception#exceptions.Exception>`_. """
class _FSFrameSender: """ Helper class used to send file system frames and wait for the response. """ def __init__(self, xbee): """ Class constructor. Instantiates a new :class:`._FSFrameSender` with the given parameters. Args: xbee (:class:`.AbstractXBeeDevice`): Destination XBee. """ self.__xbee = xbee self.__lock = threading.Event() self.__frame = None self.__resp_cmd = None self.__rec_opts = None def __str__(self): return "File system sender (dst: %s)" % self.__xbee def _fs_frame_cb(self, xbee, frame_id, cmd, receive_opts): """ Callback to execute when a new frame id is received. Args: xbee (:class:`.AbstractXBeeDevice`): The node that sent the file system frame. frame_id (Integer): The received frame id. cmd (:class:`.FSCmd`): The file system command. receive_opts (Integer): Bitfield indicating receive options. See :class:`.ReceiveOptions`. """ if (frame_id != self.__frame.frame_id or cmd.type != self.__frame.command.type or xbee != self.__xbee): return self.__resp_cmd = cmd self.__rec_opts = receive_opts self.__lock.set() def send(self, frame_to_send, timeout=10): """ Sends the file system frame to the provided XBee and waits for its response. Args: frame_to_send (:class:`XBeeAPIPacket`): The file system frame to send. timeout (Float): Maximum number of seconds to wait for the response. Returns: Tuple: Tuple containing route data: - rv_status (Integer): Status of the file system command execution. See :class:`.FSCommandStatus`. - resp_cmd (:class:`.FSCmd`): The response command. - rv_opts (Integer): Bitfield indicating the receive options. See :class:`.ReceiveOptions`. """ local_xb = self.__xbee if self.__xbee.is_remote(): local_xb = self.__xbee.get_local_xbee_device() tr_status = None self.__lock.clear() self.__frame = frame_to_send self.__resp_cmd = None self.__rec_opts = None log_msg_fmt = "%s: %s: %s" % (str(self), self.__frame.command.type.description, "%s") local_xb.add_fs_frame_received_callback(self._fs_frame_cb) try: #start = time.time() if self.__xbee.is_remote(): _log.debug(log_msg_fmt, "Sending remote frame") local_xb.send_packet(self.__frame) if not self.__lock.wait(timeout): self._throw_fs_exc(self.__frame.command, "Timeout waiting for remote response") tr_status = TransmitStatus.SUCCESS # Transmit status frame is never received for Zigbee, # DigiMesh is receiving it, 802.15.4 # https://jira.digi.com/browse/XBHAWK-530 #st_frame = local_xb.send_packet_sync_and_get_response( # self.__frame, timeout=timeout) #tr_status = st_frame.transmit_status if st_frame else None #if tr_status in (TransmitStatus.SUCCESS, # TransmitStatus.SELF_ADDRESSED): # if not self.__lock.wait(timeout - (time.time() - start)): # self._throw_fs_exc(self.__frame.command, # "Timeout waiting for remote response") #else: # self._throw_fs_exc(self.__frame.command, # "Remote frame not sent (tr status: %s)" % tr_status) else: _log.debug(log_msg_fmt, "Sending local frame") local_xb.send_packet(self.__frame) if not self.__lock.wait(timeout): self._throw_fs_exc(self.__frame.command, "Timeout waiting for local response") tr_status = TransmitStatus.SUCCESS except FileSystemException: pass except XBeeException as exc: self._throw_fs_exc(self.__frame.command, str(exc)) finally: local_xb.del_fs_frame_received_callback(self._fs_frame_cb) if not tr_status or not self.__resp_cmd: self._throw_fs_exc(self.__frame.command, "Response not received in timeout") status = self.__resp_cmd.status_value if status != FSCommandStatus.SUCCESS.code: fs_status = FSCommandStatus.get(status) msg = str(fs_status) if fs_status else "Unknown file system status (0x%0.2X)" % status _log.error("%s: %s: %s", str(self), self.__frame.command.type.description, msg) return status, self.__resp_cmd, self.__rec_opts def _throw_fs_exc(self, cmd, msg, status=None): exc_msg_fmt = "%s error: %s" % (cmd.type.description, "%s") log_msg_fmt = "%s: %s: %s" % (str(self), cmd.type.description, "%s") _log.error(log_msg_fmt, msg) raise FileSystemException(exc_msg_fmt % msg, fs_status=status)
[docs]class FileProcess(metaclass=ABCMeta): """ This class represents a file process. """ def __init__(self, f_mng, file, timeout): """ Class constructor. Instantiates a new :class:`._FileProcess` object with the provided parameters. Args: f_mng (class:`.FileSystemManager`): The file system manager. file (:class:`.FileSystemElement` or String): File or its absolute path. timeout(Float): Timeout in seconds. """ if not isinstance(file, (str, FileSystemElement)): raise ValueError("File must be a string or a FileSystemElement") if isinstance(file, FileSystemElement): if file.is_dir: raise ValueError("File cannot be a directory") if file.path in ("/", "\\", ".", ".."): raise ValueError("Invalid file path") if isinstance(file, str) and file in ("/", "\\", ".", ".."): raise ValueError("Invalid file path") # Sanitize path file_path = file if isinstance(file, FileSystemElement): file_path = file.path file_path = os.path.normpath(file_path.replace('\\', '/')) self._f_mng = f_mng self._f_path = file_path self._timeout = timeout self._fid = None self._fsize = None self._cpid = None self._running = False self._opened = False self._status = None self._cb = None @property def running(self): """ Returns if this file command is running. Returns: Boolean: `True` if it is running, `False` otherwise. """ return self._running @property def status(self): """ Returns the status code. Returns: Integer: The status. """ return self._status @property def block_size(self): """ Returns the size of the block for this file operation. Returns: Integer: Size of the block for this file operation. """ return self._get_block_size(0) def _next(self, last=True): """ Executes the next action. """ error = bool(self._status not in (None, FSCommandStatus.SUCCESS.code)) if not self._fid and not self._opened and not error: self._start_process() if self._fid is None or self._cpid is None: return r_last = False if not error: r_last = self._exec_specific_cmd() if self._opened and (last or r_last or error): self._end_process() def _start_process(self): """ Starts the file process. """ self._running = True self._status = None self._cpid = 0 # Check length of path, if is too big try to change to a parent self._cpid, f_path = self._f_mng._cd_to_execute(self._f_path, self._cpid, self._timeout) self._status, self._fid, self._fsize = self._f_mng.popen_file( f_path, options=self._get_open_flags(), path_id=self._cpid, timeout=self._timeout) self._opened = bool(self._status == FSCommandStatus.SUCCESS.code) if not self._opened: if self._cpid: self._f_mng.prelease_path_id(self._cpid, self._timeout) self._running = False self._notify_process_finished() def _end_process(self): """ Closes the file and releases the path id. """ cl_st = None # Close file and release directory path id if self._fid: cl_st = self._f_mng.pclose_file(self._fid, timeout=self._timeout) if self._cpid: self._f_mng.prelease_path_id(self._cpid, self._timeout) self._opened = False self._running = False self._status = self._status if self._status else cl_st if self._status: self._notify_process_finished() def _get_block_size(self, extra_data_len): xbee = self._f_mng.xbee n_bytes = self._f_mng.np_value if not n_bytes: n_bytes = _DEFAULT_BLOCK_SIZE else: n_bytes = self._f_mng.np_value - extra_data_len if xbee.is_remote(): cfg_max = xbee.get_ota_max_block_size() n_bytes = min(cfg_max, n_bytes) if cfg_max else n_bytes # If max block is not configured and NP cannot be read, set 64 if n_bytes < 1: n_bytes = _DEFAULT_BLOCK_SIZE return n_bytes @abstractmethod def _get_open_flags(self): """ Bitmask that specifies the options to open the file. Returns: :class:`.FileOpenRequestOption`: Options to open the file. """ @abstractmethod def _exec_specific_cmd(self): """ Executes the specific file process (read or write). """ @abstractmethod def _notify_process_finished(self): """ Notifies that the file process has finished its execution. """ def _log_str(self, msg, *args): return "%s: %s" % (str(self), msg % args)
class _ReadFileProcess(FileProcess): def __init__(self, f_mng, file, offset, timeout, read_callback=None): """ Override. Args: offset (Integer): File offset to start reading. read_callback (Function, optional, default=`None`): Method called when new data is read. Receives three arguments: * The read chunk of data. * The progress percentage as float. * The total size of the file. * The completion status code (integer). See `.FSCommandStatus`. """ if offset is not None and not isinstance(offset, int) or offset < 0: raise ValueError("Offset must be 0 or greater") super().__init__(f_mng, file, timeout) self.__offset = offset self.__l_off = offset self._cb = read_callback self.__size = 0 self.__data = bytearray() _log.debug(self._log_str("Reading file '%s' (offset: %d)", self._f_path, offset)) def __str__(self): return "Read file command ('%s')" % self._f_path @property def block_size(self): """ Returns the size of the block for this file operation. Returns: Integer: The size of the block for this file operation. """ # cmd_id (1) + f_id (2) + offset (4) + size (2) = 9 return self._get_block_size(9) def next(self, size=-1, last=True): """ Reads from the current offset the provided amount of data. The process blocks until all data is read. Set `last` to `False` to use subsequents calls to `next` to read more data. When no more read is required, close the file setting `last` to `True`. If the end of the file is reached it is close independently of `last` value. Args: size (Integer, optional, default=-1): Number of bytes to read. -1 for the complete file. last (Boolean, optional, default=`True'): `True` if this is the last step, `False` otherwise. Returns: Bytearray: The total read data bytearray. Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. """ if not isinstance(size, int) or size < -1: raise ValueError("Size must be -1 or greater") if not size or self._fsize and self.__l_off >= self._fsize: return bytearray() self.__size = size super()._next(last=last) if self._status == FSCommandStatus.SUCCESS.code: return self.__data if not self._cb: _raise_exception(self._status, "Error reading file '%s'" % self._f_path) return bytearray() def _get_open_flags(self): """ Override. .. seealso:: | :meth:`._FileProcess._get_open_flags` """ return FileOpenRequestOption.READ def _exec_specific_cmd(self): """ Override. .. seealso:: | :meth:`._FileProcess._exec_specific_cmd` """ self._status = FSCommandStatus.SUCCESS.code self.__data = bytearray() # Calculate total size to read in file total_in_file = self._fsize - self.__offset if total_in_file <= 0: return True # Calculate remaining (not read) size in file remain_in_file = self._fsize - self.__l_off if not remain_in_file: return True # Calculate total size to read total_to_read = min(self.__size, total_in_file) if total_to_read == -1: total_to_read = total_in_file # Calculate remaining (not read) to read remain_to_read = min(self.__size, remain_in_file) if remain_to_read == -1: remain_to_read = remain_in_file # Calculate chunk length chunk_len = min(self.block_size, remain_to_read) _log.debug(self._log_str("Block size: %d", chunk_len)) while (chunk_len and len(self.__data) < remain_to_read and self.__l_off < self._fsize): _log.debug(self._log_str("Reading, offset: %d, size: %d", self.__l_off, chunk_len)) self._status, _fid, _offst, chunk = self._f_mng.pread_file( self._fid, offset=self.__l_off, size=chunk_len, timeout=self._timeout) if self._status != FSCommandStatus.SUCCESS.code: return True self.__data += chunk _log.debug(self._log_str("Read %d (%d/%d)", len(chunk), len(self.__data), remain_to_read)) if self._cb: self._cb(chunk, len(self.__data) * 100 / remain_to_read, self._fsize, self._status) # Recalculate offset self.__l_off += len(chunk) # Recalculate chunk length chunk_len = min(chunk_len, remain_to_read - len(self.__data)) return self.__l_off >= self._fsize def _notify_process_finished(self): """ Override. .. seealso:: | :meth:`._FileProcess._notify_process_finished` """ if self._cb: self._cb(bytearray(), 0, self._fsize, self._status) class _WriteFileProcess(FileProcess): def __init__(self, f_mng, file, offset, options, timeout, write_callback=None): """ Override. Args: write_callback (Function, optional, default=`None`): Method called when data is written. Receives three arguments: * The amount of bytes written in the chunk. * The progress percentage as float. * The completion status code (integer). See `.FSCommandStatus`. """ if offset is not None and not isinstance(offset, int) or offset < 0: raise ValueError("Offset must be 0 or greater") super().__init__(f_mng, file, timeout) self.__offset = offset self.__options = options self._cb = write_callback self.__n_bytes = 0 self.__data = bytearray() _log.debug(self._log_str("Writing to file '%s' (offset: %d)", self._f_path, offset)) def __str__(self): return "Write file command ('%s')" % self._f_path @property def block_size(self): """ Returns the size of the block for this file operation. Returns: Integer: Size of the block for this file operation. """ # cmd_id (1) + f_id (2) + offset (4) = 7 return self._get_block_size(7) def next(self, data, last=True): """ Writes the provided data in the current file offset. The process blocks until all requested data is written. Set `last` to `False` to use subsequents calls to `next` to write more data. When no more write is required, close the file setting `last` to `True`. If the end of the file is reached it is close independently of `last` value. Args: data (Bytearray, bytes, String): Data to write. last (Boolean, optional, default=`True'): 'True' if this is the last chunk to write, `False` otherwise. Returns: Integer: The total size written (in bytes). Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. """ if not isinstance(data, (bytearray, bytes, str)): raise ValueError("Data must be a bytearray, bytes or a string") self.__data = data if isinstance(data, str): self.__data = bytearray(data, encoding='utf8') super()._next(last=last) if self._status == FSCommandStatus.SUCCESS.code: return self.__n_bytes if not self._cb: _raise_exception(self._status, "Error writing file '%s'" % self._f_path) return None def _get_open_flags(self): """ Override. .. seealso:: | :meth:`._FileProcess._get_open_flags` """ return self.__options def _exec_specific_cmd(self): """ Override. .. seealso:: | :meth:`._FileProcess._exec_specific_cmd` """ self._status = FSCommandStatus.SUCCESS.code if not self.__data or self.__offset + 1 >= self._fsize: return True last_offset = self.__offset data_offset = 0 # Calculate chunk length chunk_len = min(self.block_size, len(self.__data)) _log.debug(self._log_str("Block size: %d", chunk_len)) while chunk_len and data_offset < len(self.__data): _log.debug(self._log_str("Writing, offset: %d, size: %d", last_offset, chunk_len)) self._status, _fid, last_offset = self._f_mng.pwrite_file( self._fid, data=self.__data[data_offset:data_offset + chunk_len], offset=last_offset, timeout=self._timeout) if self._status != FSCommandStatus.SUCCESS.code: return True data_offset += chunk_len self.__n_bytes += chunk_len if self._cb: self._cb(chunk_len, self.__n_bytes * 100 / len(self.__data), self._status) # Recalculate chunk length chunk_len = min(chunk_len, len(self.__data) - data_offset) self.__offset = last_offset self.__n_bytes = 0 return False def _notify_process_finished(self): """ Override. .. seealso:: | :meth:`._FileProcess._notify_process_finished` """ if self._cb: self._cb(0, self.__n_bytes, self._status)
[docs]class FileSystemManager: """ Helper class used to manage local or remote XBee file system. """ DEFAULT_TIMEOUT = 20 DEFAULT_FORMAT_TIMEOUT = 30 _LOCAL_READ_CHUNK = 1024 def __init__(self, xbee): """ Class constructor. Instantiates a new :class:`.FileSystemManager` with the given parameters. Args: xbee (:class:`.AbstractXBeeDevice`): XBee to manage its file system. Raises: FileSystemNotSupportedException: If the XBee does not support filesystem. """ from digi.xbee.devices import AbstractXBeeDevice if not isinstance(xbee, AbstractXBeeDevice): raise ValueError("XBee must be an XBee class") if not check_fs_support(xbee, min_fw_vers=XB3_MIN_FW_VERSION_FS_API_SUPPORT): raise FileSystemNotSupportedException(ERROR_FILESYSTEM_NOT_SUPPORTED) self.__xbee = xbee self.__np_val = None self.__root = FileSystemElement(name="/", path="/", is_dir=True, size=0, is_secure=False) def __str__(self): return "File system (%s)" % self.__xbee @property def xbee(self): """ Returns the XBee of this file system manager. Returns: :class:`.AbstractXBeeDevice`: XBee to manage its file system. """ return self.__xbee @property def np_value(self): """ The 'NP' parameter value of the local XBee. Returns: Integer: The 'NP' value. """ return self._get_np()
[docs] def get_root(self): """ Returns the root directory. Returns: :class:`.FileSystemElement`: The root directory. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. """ return self.__root
[docs] def make_directory(self, dir_path, base=None, mk_parents=True, timeout=DEFAULT_TIMEOUT): """ Creates the provided directory. Args: dir_path (String): Path of the new directory to create. It is relative to the directory specify in base. base (:class:`.FileSystemElement`, optional, default=`None): Base directory. If not specify it refers to '/flash'. mk_parents (Boolean, optional, default=`True`): `True` to make parent directories as needed, `False` otherwise. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. If `mk_parents` this is the timeout per directory creation. Returns: List: List of :class:`.FileSystemElement` created directories. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. """ if not isinstance(dir_path, str): raise ValueError("Directory path must be a non empty string") if dir_path in ("/", "\\", ".", ".."): raise ValueError("Invalid directory path") if base and not isinstance(base, FileSystemElement): raise ValueError("Base must be a FileSystemElement") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path_id = 0 path = os.path.normpath(dir_path.replace('\\', '/')) base_path = "/flash" if base: base_path = os.path.normpath(base.path.replace('\\', '/')) comp_path = os.path.join(base_path, path) _log.debug(self._log_str("Creating directory '%s' (base: %s)", path, base_path)) path = PurePosixPath(comp_path) dirs = [] start = time.time() try: # XBee create directory command does not make intermediate dir, this # method generates them recursively: # https://jira.digi.com/browse/XBHAWK-523 if mk_parents and str(path.parent) not in (path.root, '.', '/flash'): dirs += self.make_directory(str(path.parent), mk_parents=True, timeout=timeout) # Check length of path, if is too big try to change to a parent path_id, to_create = self._cd_to_execute( comp_path, path_id, timeout - (time.time() - start)) # Create the directory status = self.pmake_directory(to_create, path_id=path_id, timeout=(timeout - (time.time() - start))) finally: if path_id: self.prelease_path_id(path_id, timeout) if status not in (FSCommandStatus.SUCCESS.code, FSCommandStatus.ALREADY_EXISTS.code): _raise_exception(status, "Error making directory '%s'" % comp_path) dirs.append( FileSystemElement(os.path.basename(comp_path), path=comp_path, is_dir=True, size=0, is_secure=False)) return dirs
[docs] def list_directory(self, directory=None, timeout=DEFAULT_TIMEOUT): """ Lists the contents of the given directory. Args: directory (:class:`.FileSystemElement` or String): Directory to list or its absolute path. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: List: List of `:class:`.FilesystemElement` objects contained in the given directory, empty list if status is not 0. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. """ if directory: if not isinstance(directory, (str, FileSystemElement)): raise ValueError("Directory must be a string or a FileSystemElement") if isinstance(directory, FileSystemElement) and not directory.is_dir: raise ValueError("Directory must be a directory") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path_id = 0 dir_path = directory if isinstance(directory, FileSystemElement): dir_path = directory.path if dir_path in ("", ".", None): dir_path = "/flash" elif dir_path == "..": dir_path = "/" dir_path = os.path.normpath(dir_path.replace('\\', '/')) _log.debug(self._log_str("Listing directory '%s'", dir_path)) start = time.time() try: # Check length of path, if is too big try to change to a parent path_id, to_list = self._cd_to_execute(dir_path, path_id, timeout) status, files = self.plist_directory( to_list, path_id=path_id, timeout=(timeout - (time.time() - start))) # This will store the absolute path of the contents for entry in files: entry.path = os.path.join(dir_path, entry.name) finally: if path_id: self.prelease_path_id(path_id, timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error listing directory '%s'" % dir_path) return files
[docs] def remove(self, entry, rm_children=True, timeout=DEFAULT_TIMEOUT): """ Removes the given file system entry. All files in a directory must be deleted before removing the directory. On XBee 3 802.15.4, DigiMesh, and Zigbee, deleted files are marked as unusable space unless they are at the "end" of the file system (most-recently created). On these products, deleting a file triggers recovery of any deleted file space at the end of the file system, and can lead to a delayed response. Args: entry (:class:`.FileSystemElement` or String): File system entry to remove or its absolute path. rm_children (Boolean, optional, default=`True`): `True` to remove directory children if they exist, `False` otherwise. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. """ if not isinstance(entry, (str, FileSystemElement)): raise ValueError("Entry must be a string or a FileSystemElement") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path_id = 0 entry_path = entry if isinstance(entry, FileSystemElement): entry_path = entry.path if entry_path in ("", ".", None): entry_path = "/flash" elif entry_path == "..": entry_path = "/" entry_path = os.path.normpath(entry_path.replace('\\', '/')) _log.debug(self._log_str("Removing entry '%s'", entry_path)) start = time.time() try: # Check length of path, if is too big try to change to a parent path_id, to_rm = self._cd_to_execute(entry_path, path_id, timeout) status = self.premove(to_rm, path_id=path_id, timeout=(timeout - (time.time() - start))) # To remove a directory, it must be empty beforehand: # https://jira.digi.com/browse/XBHAWK-525 if rm_children and status == FSCommandStatus.DIR_NOT_EMPTY.code: # Release the path id if path_id: self.prelease_path_id(path_id, timeout) path_id = 0 # Remove the directory content files = self.list_directory( entry_path, timeout=(timeout - (time.time() - start))) for file in files: self.remove(file, rm_children=True, timeout=(timeout - (time.time() - start))) # Remove the directory path_id, to_rm = self._cd_to_execute(entry_path, path_id, timeout, refresh=False) status = self.premove(to_rm, path_id=path_id, timeout=(timeout - (time.time() - start))) finally: if path_id: self.prelease_path_id(path_id, timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error removing entry '%s'" % entry_path)
[docs] def read_file(self, file, offset=0, progress_cb=None): """ Reads from the provided file starting at the given offset. If there is no progress callback the function blocks until the required amount of bytes is read. Args: file (:class:`.FileSystemElement` or String): File to read or its absolute path. offset (Integer, optional, default=0): File offset to start reading. progress_cb (Function, optional, default=`None`): Function called when new data is read. Receives four arguments: * The chunk of data read as byte array. * The progress percentage as float. * The total size of the file. * The status when process finishes. Returns: :class:`.FileProcess`: The process to read data from the file. Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. .. seealso:: | :meth:`.get_file` """ return _ReadFileProcess(self, file, offset, self.DEFAULT_TIMEOUT, read_callback=progress_cb)
[docs] def write_file(self, file, offset=0, secure=False, options=None, progress_cb=None): """ Writes to the provided file the data starting at the given offset. The function blocks until the all data is written. Args: file (:class:`.FileSystemElement` or String): File to write or its absolute path. offset (Integer, optional, default=0): File offset to start writing. secure (Boolean, optional, default=`False`): `True` to store the file securely (no read access), `False` otherwise. options (Dictionary, optional): Other write options as list: `exclusive`, `truncate`, `append`. progress_cb (Function, optional, default=`None`): Function call when data is written. Receives three arguments: * The amount of bytes written (for each chunk). * The progress percentage as float. * The status when process finishes. Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. .. seealso:: | :meth:`.put_file` """ if options is None: options = [] wr_options = FileOpenRequestOption.WRITE if secure: wr_options |= FileOpenRequestOption.SECURE if "exclusive" in options: wr_options |= FileOpenRequestOption.EXCLUSIVE else: wr_options |= FileOpenRequestOption.CREATE if "truncate" in options: wr_options |= FileOpenRequestOption.TRUNCATE if "append" in options: wr_options |= FileOpenRequestOption.APPEND return _WriteFileProcess(self, file, offset, wr_options, self.DEFAULT_TIMEOUT, write_callback=progress_cb)
[docs] def get_file(self, src, dest, progress_cb=None): """ Downloads the given XBee file in the specified destination path. Args: src (:class:`.FileSystemElement` or String): File to download or its absolute path. dest (String): The absolute path of the destination file. progress_cb (Function, optional): Function call when data is being downloaded. Receives three arguments: * The progress percentage as float. * Destination file path. * Source file path. Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. """ if not isinstance(dest, str): raise ValueError("Destination path must be a non-empty string") src_path = src if isinstance(src, FileSystemElement): src_path = src.path src_path = os.path.normpath(src_path.replace('\\', '/')) total_read = 0 def p_cb(chunk, _perc, size, status): nonlocal total_read if status not in (None, FSCommandStatus.SUCCESS.code): _raise_exception(status, "Error getting file '%s'" % src_path) total_read += len(chunk) if progress_cb: progress_cb(total_read * 100.0 / size, dest, src_path) with open(dest, "wb+") as dst_file: r_proc = self.read_file(src, offset=0, progress_cb=p_cb) size = r_proc.block_size while True: try: data = r_proc.next(size=size, last=False) if not data: break dst_file.write(data) except EnvironmentError as exc: r_proc.next(size=0, last=True) raise exc
[docs] def put_file(self, src, dest, secure=False, overwrite=False, mk_parents=True, progress_cb=None): """ Uploads the given file to the specified destination path of the XBee. Args: src (String): Absolute path of the file to upload. dest (:class:`.FileSystemElement` or String): The file in the XBee or its absolute path. secure (Boolean, optional, default=`False`): `True` if the file should be stored securely, `False` otherwise. overwrite (Boolean, optional, default=`False`): `True` to overwrite the file if it exists, `False` otherwise. mk_parents (Boolean, optional, default=`True`): `True` to make parent directories as needed, `False` otherwise. progress_cb (Function, optional): Function call when data is being uploaded. Receives two arguments: * The progress percentage as float. * Destination file path. * Source file path. Returns: :class:`.FileSystemElement`: The new created file. Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. """ if not isinstance(src, str): raise ValueError("Source path must be a non-empty string") dst_path = dest if isinstance(dest, FileSystemElement): dst_path = dest.path dst_path = os.path.normpath(dst_path.replace('\\', '/')) f_size = os.stat(src).st_size wr_bytes = 0 def p_cb(n_bytes, _percent, status): nonlocal wr_bytes if status not in (None, FSCommandStatus.SUCCESS.code): _raise_exception(status, "Error putting file '%s'" % src) wr_bytes += n_bytes if progress_cb: progress_cb(wr_bytes * 100.0 / f_size, dst_path, src) # Create intermediate directories if required dest_parent = os.path.dirname(dst_path) if mk_parents and dest_parent != "/flash": self.make_directory(dest_parent, mk_parents=True) with open(src, "rb+") as src_file: wr_opts = [] if overwrite: wr_opts.append("truncate") w_proc = self.write_file(dest, offset=0, secure=secure, options=wr_opts, progress_cb=p_cb) try: size = w_proc.block_size data = src_file.read(size) while data: try: w_proc.next(data, last=False) except FileSystemException as exc: # If write options worked as they are described, we # would not need to remove the file previously # https://jira.digi.com/browse/XBHAWK-531 if not overwrite or exc.status != FSCommandStatus.ALREADY_EXISTS.code: raise exc self.remove(dest, rm_children=False) w_proc = self.write_file(dest, offset=0, secure=secure, options=wr_opts, progress_cb=p_cb) w_proc.next(data, last=False) data = src_file.read(size) finally: w_proc.next("", last=True) return FileSystemElement(os.path.basename(dst_path), path=dst_path, is_dir=False, size=os.stat(src).st_size, is_secure=secure)
[docs] def put_dir(self, src, dest="/flash", verify=True, progress_cb=None): """ Uploads the given source directory contents into the given destination directory in the XBee. Args: src (String): Local directory to upload its contents. dest (:class:`.FileSystemElement` or String): The destination dir in the XBee or its absolute path. Defaults to '/flash'. verify (Boolean, optional, default=`True`): `True` to check the hash of the uploaded content. progress_cb (Function, optional): Function call when data is being uploaded. Receives three argument: * The progress percentage as float. * Destination file path. * The absolute path of the local being uploaded as string. Raises: FileSystemException: If there is any error performing the operation and `progress_cb` is `None`. ValueError: If any of the parameters is invalid. """ if not isinstance(src, str): raise ValueError("Source path must be a non-empty string") if isinstance(dest, FileSystemElement): if not dest.is_dir: raise ValueError("Destination must be a directory") dest_path = dest.path elif isinstance(dest, str): dest_path = dest elif not dest: dest_path = "/flash" else: raise ValueError("Destination must be string or a FileSystemElement") # Create destination directory if dest_path != "/flash": self.make_directory(dest_path, mk_parents=True) # Upload directory contents for file in listdir(src): src_file_path = os.path.join(src, file) dst_file_path = os.path.join(dest_path, file) if isfile(src_file_path): self.put_file(src_file_path, dst_file_path, overwrite=True, mk_parents=True, progress_cb=progress_cb) if not verify: continue xb_hash = self.get_file_hash(dst_file_path) local_hash = get_local_file_hash(src_file_path) if xb_hash == local_hash: continue msg = "Error uploading file '%s': Local hash different from " \ "remote hash (%s != %s)" % \ (src_file_path, utils.hex_to_string(local_hash, pretty=False), utils.hex_to_string(xb_hash, pretty=False)) _log.error(msg) _raise_exception(None, msg) else: self.put_dir(src_file_path, dst_file_path, progress_cb=progress_cb)
[docs] def get_file_hash(self, file, timeout=DEFAULT_TIMEOUT): """ Returns the SHA256 hash of the given file. Args: file (:class:`.FileSystemElement` or String): File to get its hash or its absolute path. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Bytearray: SHA256 hash of the given file. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. """ if not isinstance(file, (str, FileSystemElement)): raise ValueError("File must be a string or a FileSystemElement") if isinstance(file, FileSystemElement): if not file.is_dir: raise ValueError("Cannot hash a directory") if file.path in ("/", "\\", ".", ".."): raise ValueError("Invalid file path") if isinstance(file, str) and file in ("/", "\\", ".", ".."): raise ValueError("Invalid file path") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path_id = 0 file_path = file if isinstance(file, FileSystemElement): file_path = file.path file_path = os.path.normpath(file_path.replace('\\', '/')) _log.debug(self._log_str("Retrieving SHA256 hash of '%s'", file_path)) start = time.time() try: # Check length of path, if is too big try to change to a parent path_id, to_hash = self._cd_to_execute(file_path, path_id, timeout) status, hash_val = self.pget_file_hash( to_hash, path_id=path_id, timeout=(timeout - (time.time() - start))) finally: if path_id: self.prelease_path_id(path_id, timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error getting hash of file '%s'" % file_path) return hash_val
[docs] def move(self, source, dest, timeout=DEFAULT_TIMEOUT): """ Moves the given source element to the given destination path. Args: source (:class:`.FileSystemElement` or String): Source entry to move. dest (String): Destination path of the element to move. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. """ if not isinstance(source, (str, FileSystemElement)): raise ValueError("Source must be a string or a FileSystemElement") if not isinstance(dest, str) or not dest: raise ValueError("Destination must be a non-empty string") src_path = source if isinstance(source, FileSystemElement): src_path = source.path if src_path in ("/", "\\", ".", ".."): raise ValueError("Invalid source path") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path_id = 0 src_path = os.path.normpath(src_path.replace('\\', '/')) dst_path = os.path.normpath(src_path.replace('\\', '/')) common_dir = os.path.normpath(os.path.commonprefix([src_path, dst_path])) _log.debug(self._log_str("Moving '%s' to '%s' (path id: %d)", src_path, dst_path, path_id)) start = time.time() # Change to a common directory if common_dir not in ('.', '/'): status, path_id, _f_path = self.pget_path_id( common_dir, path_id=path_id, timeout=timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error changing to directory '%s'" % common_dir) src_path = os.path.relpath(src_path, common_dir) dst_path = os.path.relpath(dst_path, common_dir) status = self.prename(src_path, dst_path, path_id=path_id, timeout=(timeout - (time.time() - start))) if path_id: self.prelease_path_id(path_id, timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception( status, "Error moving file '%s' to '%s'" % (src_path, dst_path))
[docs] def get_volume_info(self, vol="/flash", timeout=DEFAULT_TIMEOUT): """ Returns the file system volume information. Currently '/flash' is the only supported value. Args: vol (:class:`.FileSystemElement`or String, optional, default=`/flash`): Volume name. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Dictionary: Collection of pair values describing volume information. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(vol, (str, FileSystemElement)): raise ValueError("Volume must be a string or a FileSystemElement") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path name = vol if isinstance(vol, FileSystemElement): name = vol.path name = os.path.normpath(name.replace('\\', '/')) _log.info(self._log_str("Reading volume information '%s'", name)) to_send = FileSystemManager._create_fs_frame(self.__xbee, VolStatCmdRequest(name)) sender = _FSFrameSender(self.__xbee) status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error getting volume info '%s'" % name) _log.info(self._log_str( "Volume info '%s': %s (used), %s (free), %s (bad)", name, r_cmd.bytes_used, r_cmd.bytes_free, r_cmd.bytes_bad)) return {"used": r_cmd.bytes_used, "free": r_cmd.bytes_free, "bad": r_cmd.bytes_bad}
[docs] def format(self, vol="/flash", timeout=DEFAULT_FORMAT_TIMEOUT): """ Formats provided volume. Currently '/flash' is the only supported value. Formatting the file system takes time, and any other requests will fail until it completes and sends a response. Args: vol (:class:`.FileSystemElement`or String, optional, default=`/flash`): Volume name. timeout (Float, optional, default=`DEFAULT_FORMAT_TIMEOUT`): Maximum number Of seconds to wait for the operation completion. Returns: Dictionary: Collection of pair values describing volume information. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(vol, (str, FileSystemElement)): raise ValueError("Volume must be a string or a FileSystemElement") if not isinstance(timeout, int): timeout = self.DEFAULT_FORMAT_TIMEOUT # Sanitize path name = vol if isinstance(vol, FileSystemElement): name = vol.path name = os.path.normpath(name.replace('\\', '/')) _log.info(self._log_str("Formatting volume '%s'", name)) to_send = FileSystemManager._create_fs_frame(self.__xbee, VolFormatCmdRequest(name)) sender = _FSFrameSender(self.__xbee) status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error formatting volume '%s'" % name) _log.info(self._log_str( "After format, volume info '%s': %s (used), %s (free), %s (bad)", name, r_cmd.bytes_used, r_cmd.bytes_free, r_cmd.bytes_bad)) return {"used": r_cmd.bytes_used, "free": r_cmd.bytes_free, "bad": r_cmd.bytes_bad}
[docs] def pget_path_id(self, dir_path, path_id=0, timeout=DEFAULT_TIMEOUT): """ Returns the directory path id of the given path. Returned directory path id expires if not referenced in 2 minutes. Args: dir_path (String): Path of the directory to get its id. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Tuple (Integer, Integer, String): Status of the file system command execution, new directory path id (-1 if status is not 0) and its absolute path (empty if status is not 0). The full path may be `None` or empty if it is too long and exceeds the communication frames length. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(dir_path, str): raise ValueError("Directory path must be a non empty string") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path if dir_path not in (".", ".."): dir_path = os.path.normpath(dir_path.replace('\\', '/')) _log.info(self._log_str("Getting ID of directory '%s' (path id: %d)", dir_path, path_id)) # Check length of path, if is too big try to change to a parent to_cd = self._get_fit_parent_path(dir_path) # Change to directory to_send = FileSystemManager._create_fs_frame( self.__xbee, GetPathIdCmdRequest(path_id, to_cd)) sender = _FSFrameSender(self.__xbee) status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) if status != FSCommandStatus.SUCCESS.code: return status, -1, "" f_id = r_cmd.fs_id f_path = r_cmd.full_path # If we changed to a parent dir, change now to the final dir if len(dir_path) > len(to_cd): rel_path = os.path.relpath(dir_path, to_cd) status, f_id, f_path = self.pget_path_id(rel_path, path_id=f_id, timeout=timeout) if status != FSCommandStatus.SUCCESS.code: return status, -1, "" _log.info(self._log_str("Path id '%d' (%s)", f_id, f_path)) return status, f_id, f_path
[docs] def pmake_directory(self, dir_path, path_id=0, timeout=DEFAULT_TIMEOUT): """ Creates the provided directory. Parent directories of the one to be created must exist. Separate requests must be dane to make intermediate directories. Args: dir_path (String): Path of the new directory to create. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. If `mk_parents` this is the timeout per directory creation. Returns: Integer: Status of the file system command execution (see :class:`.FSCommandStatus`). Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(dir_path, str): raise ValueError("Directory path must be a non empty string") if dir_path in ("/", "\\", ".", ".."): raise ValueError("Invalid directory path") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path = PurePosixPath(os.path.normpath(dir_path.replace('\\', '/'))) _log.info(self._log_str("Creating directory '%s' (path id: %d)", str(path), path_id)) to_send = FileSystemManager._create_fs_frame( self.__xbee, CreateDirCmdRequest(path_id, str(path))) sender = _FSFrameSender(self.__xbee) rv_status, _r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) return rv_status
[docs] def plist_directory(self, dir_path, path_id=0, timeout=DEFAULT_TIMEOUT): """ Lists the contents of the given directory. Args: dir_path (String): Path of the directory to list. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Tuple (Integer, List): Status of the file system command execution and a list of `:class:`.FilesystemElement` objects contained in the given directory, empty list if status is not 0. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(dir_path, str): raise ValueError("Directory path must be a non empty string") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path if dir_path not in (".", ".."): dir_path = os.path.normpath(dir_path.replace('\\', '/')) _log.info(self._log_str("Listing directory '%s' (path id: %d)", dir_path, path_id)) to_send = FileSystemManager._create_fs_frame( self.__xbee, OpenDirCmdRequest(path_id, dir_path)) sender = _FSFrameSender(self.__xbee) start = time.time() rv_status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) if rv_status != FSCommandStatus.SUCCESS.code: return rv_status, [] dir_list = r_cmd.fs_entries while not r_cmd.is_last: to_send = FileSystemManager._create_fs_frame( self.__xbee, ReadDirCmdRequest(r_cmd.fs_id)) rv_status, r_cmd, _rv_opts = sender.send( to_send, timeout=(timeout - (time.time() - start))) if rv_status != FSCommandStatus.SUCCESS.code: # Try to close the directory to_send = FileSystemManager._create_fs_frame( self.__xbee, CloseDirCmdRequest(r_cmd.fs_id)) sender.send(to_send, timeout=(timeout - (time.time() - start))) return rv_status, [] dir_list += r_cmd.fs_entries # This will store the path relative to the directory path id for entry in dir_list: entry.path = os.path.join(dir_path.replace('\\', '/'), entry.name) _log.info(self._log_str("List directory '%s' (%d):\n%s", dir_path, path_id, '\n'.join(map(str, dir_list)))) return rv_status, dir_list
[docs] def premove(self, entry_path, path_id=0, timeout=DEFAULT_TIMEOUT): """ Removes the given file system entry. All files in a directory must be deleted before removing the directory. On XBee 3 802.15.4, DigiMesh, and Zigbee, deleted files are marked as as unusable space unless they are at the "end" of the file system (most-recently created). On these products, deleting a file triggers recovery of any deleted file space at the end of the file system, and can lead to a delayed response. Args: entry_path (String): Path of the entry to remove. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Integer: Status of the file system command execution (see :class:`.FSCommandStatus`). Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(entry_path, str): raise ValueError("Entry path must be a non empty string") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path if entry_path not in (".", ".."): entry_path = os.path.normpath(entry_path.replace('\\', '/')) _log.info(self._log_str("Removing entry '%s' (path id: %d)", entry_path, path_id)) to_send = FileSystemManager._create_fs_frame( self.__xbee, DeleteCmdRequest(path_id, entry_path)) sender = _FSFrameSender(self.__xbee) rv_status, _r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) return rv_status
[docs] def popen_file(self, file_path, path_id=0, options=FileOpenRequestOption.READ, timeout=DEFAULT_TIMEOUT): """ Open a file for reading and/or writing. Use the `FileOpenRequestOption.SECURE` (0x80) bitmask for options to upload a write-only file (one that cannot be downloaded or viewed), useful for protecting files on the device. Returned file id expires if not referenced in 2 minutes. Args: file_path (String): Path of the file to open. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. options (Integer, optional, default=`FileOpenRequestOption.READ`): Bitmask that specifies the options to open the file. It defaults to `FileOpenRequestOption.READ` which means open for reading. See :class:`.FileOpenRequestOption` for more options. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Tuple (Integer, Integer, Integer): Status of the file system command execution (see :class:`.FSCommandStatus`), the file id to use in later requests, and the size of the file (in bytes), 0xFFFFFFFF if unknown. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FileOpenRequestOption` | :class:`.FSCommandStatus` | :meth:`.pclose_file` """ if not isinstance(file_path, str): raise ValueError("File path must be a string") if file_path in ("/", "\\", ".", ".."): raise ValueError("Invalid file path") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not options: options = FileOpenRequestOption.READ if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path path = PurePosixPath(os.path.normpath(file_path.replace('\\', '/'))) _log.info(self._log_str("Opening file '%s' (path id: %d) options: 0x%0.2X", str(path), path_id, options)) to_send = FileSystemManager._create_fs_frame( self.__xbee, OpenFileCmdRequest(path_id, str(path), options)) sender = _FSFrameSender(self.__xbee) rv_status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) _log.info(self._log_str("File open '%s' (%d) options 0x%0.2X", str(path), path_id, options)) return rv_status, r_cmd.fs_id, r_cmd.size
[docs] def pclose_file(self, file_id, timeout=DEFAULT_TIMEOUT): """ Closes an open file and releases its file handle. Args: file_id (Integer): File id returned when opening. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Integer: Status of the file system command execution (see :class:`.FSCommandStatus`). Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` | :meth:`.popen_file` """ if not isinstance(file_id, int): raise ValueError("File id must be an integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT _log.info(self._log_str("Closing file '%d'", file_id)) to_send = FileSystemManager._create_fs_frame( self.__xbee, CloseFileCmdRequest(file_id)) sender = _FSFrameSender(self.__xbee) rv_status, _r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) _log.info(self._log_str("File closed (%d)", file_id)) return rv_status
[docs] def pread_file(self, file_id, offset=-1, size=-1, timeout=DEFAULT_TIMEOUT): """ Reads from the provided file the given amount of bytes starting at the given offset. The file must be opened for reading first. Args: file_id (Integer): File id returned when opening. offset (Integer, optional, default=-1): File offset to start reading. -1 to use current position. size (Integer, optional, default=-1): Number of bytes to read. -1 to read as many as possible. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Tuple (Integer, Integer, Integer, Bytearray): Status of the file system command execution (see :class:`.FSCommandStatus`), the file id, the offset of the read data, and the read data. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` | :meth:`.popen_file` """ if not isinstance(file_id, int): raise ValueError("File id must be an integer") if offset is not None and not isinstance(offset, int) or offset < -1: raise ValueError("Offset must be -1 or greater") if not isinstance(size, int) or not size or size < -1: raise ValueError("Size must be -1 or greater than 0") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT _log.info(self._log_str("Reading file '%d' (offset: %d, size: %d)", file_id, offset, size)) if offset == -1: offset = ReadFileCmdRequest.USE_CURRENT_OFFSET if size == -1: size = ReadFileCmdRequest.READ_AS_MANY to_send = FileSystemManager._create_fs_frame( self.__xbee, ReadFileCmdRequest(file_id, offset, size)) sender = _FSFrameSender(self.__xbee) rv_status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) _log.info(self._log_str("Read %d bytes from '%d' (offset: %d)", len(r_cmd.data), file_id, r_cmd.offset)) return rv_status, r_cmd.fs_id, r_cmd.offset, r_cmd.data
[docs] def pwrite_file(self, file_id, data, offset=-1, timeout=DEFAULT_TIMEOUT): """ Writes to the provided file the given data bytes starting at the given offset. The file must be opened for writing first. Args: file_id (Integer): File id returned when opening. data (Bytearray, bytes or String): Data to write. offset (Integer, optional, default=-1): File offset to start writing. -1 to use current position. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Tuple (Integer, Integer, Integer): Status of the file system command execution (see :class:`.FSCommandStatus`), the file id, and the current offset after writing. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` | :meth:`.popen_file` """ if not isinstance(file_id, int): raise ValueError("File id must be an integer") if not isinstance(data, (bytearray, bytes, str)): raise ValueError("Data must be a bytearray, bytes or a string") if not data: raise ValueError("Data cannot be empty") if offset is not None and not isinstance(offset, int) or offset < -1: raise ValueError("Offset must be -1 or greater") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT if isinstance(data, str): data = bytearray(data, encoding='utf8') elif isinstance(data, bytes): data = bytearray(data) _log.info(self._log_str("Writing to file '%d' (offset: %d, size: %d)", file_id, offset, len(data))) if offset == -1: offset = ReadFileCmdRequest.USE_CURRENT_OFFSET to_send = FileSystemManager._create_fs_frame( self.__xbee, WriteFileCmdRequest(file_id, offset, data=data)) sender = _FSFrameSender(self.__xbee) rv_status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) if rv_status == FSCommandStatus.SUCCESS.code: _log.info(self._log_str("Written %d bytes to '%d' (offset: %d)", len(data), file_id, r_cmd.actual_offset)) return rv_status, r_cmd.fs_id, r_cmd.actual_offset
[docs] def pget_file_hash(self, file_path, path_id=0, timeout=DEFAULT_TIMEOUT): """ Returns the SHA256 hash of the given file. Args: file_path (String): Path of the file to get its hash. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Tuple (Integer, Bytearray): Status of the file system command execution and SHA256 hash of the given file (empty bytearray if status is not 0). Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(file_path, str): raise ValueError("File path must be a non empty string") if file_path in ("/", "\\", ".", ".."): raise ValueError("Invalid file path") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path file_path = os.path.normpath(file_path.replace('\\', '/')) _log.info(self._log_str("Retrieving SHA256 hash of '%s' (path id: %d)", file_path, path_id)) to_send = FileSystemManager._create_fs_frame( self.__xbee, HashFileCmdRequest(path_id, file_path)) sender = _FSFrameSender(self.__xbee) rv_status, r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) if rv_status != FSCommandStatus.SUCCESS.code: return rv_status, bytearray() _log.info(self._log_str("'%s' hash: %s", file_path, utils.hex_to_string(r_cmd.file_hash, pretty=False))) return rv_status, r_cmd.file_hash
[docs] def prename(self, current_path, new_path, path_id=0, timeout=DEFAULT_TIMEOUT): """ Rename provided file. Args: current_path (String): Current path name. It is relative to the directory path id. new_path (String): New name. It is relative to the directory path id. path_id (Integer, optional, default=0): Directory path id. 0 for the root directory. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Integer: Status of the file system command execution (see :class:`.FSCommandStatus`). Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(current_path, str): raise ValueError("Current path name must be a non empty string") if not isinstance(new_path, str): raise ValueError("New path name must be a non empty string") if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT # Sanitize path if current_path not in (".", ".."): current_path = os.path.normpath(current_path.replace('\\', '/')) if new_path not in (".", ".."): new_path = os.path.normpath(new_path.replace('\\', '/')) _log.info(self._log_str("Renaming entry '%s' to '%s' (path id: %d)", current_path, new_path, path_id)) to_send = FileSystemManager._create_fs_frame( self.__xbee, RenameCmdRequest(path_id, current_path, new_path)) sender = _FSFrameSender(self.__xbee) rv_status, _r_cmd, _rv_opts = sender.send(to_send, timeout=timeout) return rv_status
[docs] def prelease_path_id(self, path_id, timeout=DEFAULT_TIMEOUT): """ Releases the provided directory path id. Args: path_id (Integer): Directory path id to release. timeout (Float, optional, default=`DEFAULT_TIMEOUT`): Maximum number of seconds to wait for the operation completion. Returns: Integer: Status of the file system command execution. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. ValueError: If any of the parameters is invalid. .. seealso:: | :class:`.FSCommandStatus` """ if not isinstance(path_id, int) or path_id < 0: raise ValueError("Directory path id must be a positive integer") if not isinstance(timeout, int): timeout = self.DEFAULT_TIMEOUT status, _, _ = self.pget_path_id("/", path_id=path_id, timeout=timeout) if status != FSCommandStatus.SUCCESS.code: _log.error(self._log_str("Error releasing path id '%d'", path_id)) return status
def _cd_to_execute(self, path, path_id, timeout, refresh=True): """ Changes to another directory in path if its longer than the allowed length for the frame transmission. Args: path (String): The path to check and to use for changing. path_id (Integer): Current directory path id. timeout (Float): Maximum number of seconds to wait for the operation completion. refresh (Boolean, optional, default=`True`): `True` to read the NP value of the local XBee, `False` to use the cached one. Returns: Tuple (Integer, String): The new directory path id and the relative path of given path to that new directory path id. Raises: FileSystemException: If there is any error performing the operation or the function is not supported. """ max_len = self._get_np(refresh=refresh) if not max_len: max_len = _DEFAULT_BLOCK_SIZE if len(path) <= max_len: return path_id, path rel_path = path start = time.time() while len(rel_path) > max_len: to_cd = self._get_fit_parent_path(rel_path) rel_path = os.path.relpath(rel_path, to_cd) status, path_id, _f_path = self.pget_path_id( to_cd, path_id=path_id, timeout=(timeout - (time.time() - start))) if status != FSCommandStatus.SUCCESS.code: _raise_exception(status, "Error changing to directory '%s'" % to_cd) return path_id, rel_path def _get_np(self, refresh=False): """ Returns the 'NP' value of the local XBee. Args: refresh (Boolean, optional, default=`False`): `True` to read the NP value of the local XBee, `False` to use the cached one. Returns: Integer: 'NP' value. """ if self.__np_val and not refresh: return self.__np_val xbee = self.__xbee n_extra_bytes = 0 if xbee.is_remote(): xbee = xbee.get_local_xbee_device() # 64-bit address (8), send/receive opts (1), and status (1) length n_extra_bytes = 10 cmd = ATStringCommand.NP try: # Reserve 5 bytes for other frame data self.__np_val = utils.bytes_to_int(xbee.get_parameter(cmd, apply=False)) - 5 # Subtract extra bytes of remote frames self.__np_val -= n_extra_bytes except XBeeException as exc: _log.error(self._log_str( "Error getting maximum number of transmission bytes ('%s'): %s", cmd, str(exc))) self.__np_val = 0 return self.__np_val def _get_fit_parent_path(self, path, refresh=False): """ Returns a parent which length fits the maximum allowed size. Args: path (String): Path to get a fit parent. refresh (Boolean, optional, default=`False`): `True` to read the NP value of the local XBee, `False` to use the cached one. Returns: String: The path that fits the maximum allowed size. """ np_val = self._get_np(refresh=refresh) if not np_val: np_val = _DEFAULT_BLOCK_SIZE if len(path) <= np_val: return path # Reduce the path until is less than 'NP' path = PurePosixPath(path) for parent in path.parents: if len(str(parent)) <= np_val: return str(parent) return path @staticmethod def _create_fs_frame(xbee, cmd, transmit_options=TransmitOptions.NONE.value): """ Creates a local or remote File System Request packet. Args: xbee (:class:`.AbstractXBeeDevice`): The destination XBee. cmd (:class:`.FSCmd` or Bytearray): The command to send. transmit_options (Integer, optional, default=`TransmitOptions.NONE.value`): Options to transmit the packet if `xbee` is remote. Returns: :class:`.XBeeAPIPacket`: class:`.FSRequestPacket` or class:`.RemoteFSRequestPacket` already formed. Raises: ValueError: If `xbee` or `cmd` are invalid. """ from digi.xbee.devices import AbstractXBeeDevice if not isinstance(xbee, AbstractXBeeDevice): raise ValueError("XBee must be a local or remote XBee class") if not isinstance(cmd, (bytearray, FSCmd)): raise ValueError("Command must be a bytearray or a FSCmd") if xbee.is_remote(): if xbee.get_protocol() in (XBeeProtocol.DIGI_MESH, XBeeProtocol.SX): transmit_options |= TransmitOptions.DIGIMESH_MODE.value elif xbee.get_protocol() == XBeeProtocol.DIGI_POINT: transmit_options |= TransmitOptions.POINT_MULTIPOINT_MODE.value return RemoteFSRequestPacket( xbee.get_local_xbee_device().get_next_frame_id(), xbee.get_64bit_addr(), cmd, transmit_options=transmit_options) return FSRequestPacket(xbee.get_next_frame_id(), cmd) def _log_str(self, msg, *args): return "%s: %s" % (str(self), msg % args)
[docs]class LocalXBeeFileSystemManager: """ Helper class used to manage the local XBee file system. """ def __init__(self, xbee_device): """ Class constructor. Instantiates a new :class:`.LocalXBeeFileSystemManager` with the given parameters. Args: xbee_device (:class:`.XBeeDevice`): The local XBee to manage its file system. """ if not xbee_device.serial_port: raise OperationNotSupportedException( message="Only supported in local XBee connected by serial.") # Check target compatibility. if not check_fs_support(xbee_device, max_fw_vers=XB3_MAX_FW_VERSION_FS_OTA_SUPPORT): raise FileSystemNotSupportedException( "LocalXBeeFileSystemManager is not supported, use FileSystemManager") self._xbee_device = xbee_device self._serial_port = xbee_device.serial_port self._supported_functions = [] self._device_was_connected = False self._is_connected = False self._old_read_timeout = _READ_PORT_TIMEOUT def _read_data(self, timeout=_READ_DATA_TIMEOUT, empty_retries=_READ_EMPTY_DATA_RETRIES_DEFAULT): """ Reads data from the serial port waiting for the provided timeout. Args: timeout (Integer, optional): The maximum time to wait for data (seconds). Defaults to 1 second. empty_retries (Integer, optional): The number of consecutive zero-bytes read before considering no more data is available. Returns: String: The read data as string. Raises: SerialException: If there is any problem reading data from the serial port. """ answer_string = "" empty_attempts = 0 deadline = _get_milliseconds() + (timeout * 1000) read_bytes = self._serial_port.read(_READ_BUFFER) while ((len(answer_string) == 0 or empty_attempts < empty_retries) and _get_milliseconds() < deadline): read_string = _filter_non_printable(read_bytes) answer_string += read_string # Continue reading, maybe there is more data. read_bytes = self._serial_port.read(_READ_BUFFER) if len(read_string) == 0: empty_attempts += 1 else: empty_attempts = 0 return answer_string def _is_in_atcmd_mode(self): """ Returns whether the command mode is active or not. Returns: Boolean: `True` if the AT command mode is active, `False` otherwise. """ _log.debug("Checking AT command mode...") try: self._serial_port.write(str.encode(_COMMAND_AT, encoding='utf8')) answer = self._read_data(timeout=_GUARD_TIME) return answer is not None and _COMMAND_MODE_ANSWER_OK in answer except SerialException as exc: _log.exception(exc) return False def _enter_atcmd_mode(self): """ Enters in AT command mode. Returns: Boolean: `True` if entered command mode successfully, `False` otherwise. """ _log.debug("Entering AT command mode...") try: # In some scenarios where the read buffer is constantly being # filled with remote data, it is almost impossible to read the # 'enter command mode' answer, so purge port before. self._serial_port.purge_port() for _ in range(3): self._serial_port.write(str.encode(_COMMAND_MODE_CHAR, encoding='utf8')) answer = self._read_data(timeout=_GUARD_TIME, empty_retries=_READ_EMPTY_DATA_RETRIES) return answer is not None and _COMMAND_MODE_ANSWER_OK in answer except SerialException as exc: _log.exception(exc) return False def _exit_atcmd_mode(self): """ Exits from AT command mode. """ _log.debug("Exiting AT command mode...") try: self._serial_port.write(str.encode(_COMMAND_MODE_EXIT, encoding='utf8')) except SerialException as exc: _log.exception(exc) finally: # It is necessary to wait the guard time before sending data again time.sleep(_GUARD_TIME) def _check_atcmd_mode(self): """ Checks whether AT command mode is active and if not tries to enter AT command mode. Returns: Boolean: `True` if AT command mode is active or entered successfully, `False` otherwise. """ if not self._is_connected: return False if not self._is_in_atcmd_mode(): time.sleep(_GUARD_TIME) return self._enter_atcmd_mode() return True def _supports_filesystem(self): """ Returns whether the device supports file system or not. Returns: Boolean: `True` if the device supports file system, `False` otherwise. """ _log.debug("Checking if device supports file system...") if not self._check_atcmd_mode(): return False try: self._serial_port.write(str.encode(_COMMAND_FILE_SYSTEM, encoding='utf8')) answer = self._read_data() if answer and _ANSWER_ATFS in answer.upper(): self._parse_filesystem_functions(answer.replace("\r", "")) return True return False except SerialException as exc: _log.exception(exc) return False def _parse_filesystem_functions(self, filesystem_answer): """ Parses the file system command response to obtain a list of supported file system functions. Args: filesystem_answer (String): The file system command answer to parse. """ result = re.match(_PATTERN_FILE_SYSTEM_FUNCTIONS, filesystem_answer, flags=re.M | re.DOTALL) if result is None or result.string is not result.group(0) or len(result.groups()) < 1: return self._supported_functions = result.groups()[0].split(_FUNCTIONS_SEPARATOR) def _is_function_supported(self, function): """ Returns whether the specified file system function is supported or not. Args: function (:class:`._FilesystemFunction`): The file system function to check. Returns: Boolean: `True` if the specified file system function is supported, `False` otherwise. """ if not isinstance(function, _FilesystemFunction): return False return function.cmd_name in self._supported_functions @staticmethod def _check_function_error(answer, command): """ Checks the given file system command answer and throws an exception if it contains an error. Args: answer (String): The file system command answer to check for errors. command (String): The file system command executed. Raises: FileSystemException: If any error is found in the answer. """ result = re.match(_PATTERN_FILE_SYSTEM_ERROR, answer, flags=re.M | re.DOTALL) if result is not None and len(result.groups()) > 1: if len(result.groups()) > 2: raise FileSystemException( _ERROR_EXECUTE_COMMAND % ( command.replace("\r", ""), result.groups()[1] + " >" + result.groups()[2])) raise FileSystemException(_ERROR_EXECUTE_COMMAND % ( command.replace("\r", ""), result.groups()[1])) def _xmodem_write_cb(self, data): """ Callback function used to write data to the serial port when requested from the XModem transfer. Args: data (Bytearray): The data to write to serial port from the XModem transfer. Returns: Boolean: `True` if the data was successfully written, `False` otherwise. """ try: self._serial_port.purge_port() self._serial_port.write(data) self._serial_port.flush() return True except SerialException as exc: _log.exception(exc) return False def _xmodem_read_cb(self, size, timeout=_READ_DATA_TIMEOUT): """ Callback function used to read data from the serial port when requested from the XModem transfer. Args: size (Integer): Size of the data to read. timeout (Integer, optional): Maximum time to wait to read the requested data (seconds). Returns: Bytearray: the read data, `None` if data could not be read. """ deadline = _get_milliseconds() + (timeout * 1000) data = bytearray() try: while len(data) < size and _get_milliseconds() < deadline: read_bytes = self._serial_port.read(size - len(data)) if len(read_bytes) > 0: data.extend(read_bytes) return data except SerialException as exc: _log.exception(exc) return None def _execute_command(self, cmd_type, *args, wait_for_answer=True): """ Executes the given command type with its arguments. Args: cmd_type (:class:`._FilesystemFunction`): Command type to execute. args (): Command arguments wait_for_answer (Boolean): `True` to wait for command answer, `False` otherwise. Returns: String: the command answer. Raises: FileSystemException: If there is any error executing the command. """ # Sanity checks. if not self._is_function_supported(cmd_type): raise FileSystemException(_ERROR_FUNCTION_NOT_SUPPORTED % cmd_type.cmd_name) if not self._check_atcmd_mode(): raise FileSystemException(_ERROR_ENTER_CMD_MODE) command = _COMMAND_ATFS % (cmd_type.command % args) try: self._serial_port.write(str.encode(command, encoding='utf8', errors='ignore')) answer = None if wait_for_answer: answer = self._read_data() if not answer: raise FileSystemException(_ERROR_TIMEOUT) self._check_function_error(answer, command) return answer except SerialException as exc: raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), str(exc))) from None @property def is_connected(self): """ Returns whether the file system manager is connected or not. Returns: Boolean: `True` if the file system manager is connected, `False` otherwise. """ return self._is_connected
[docs] def connect(self): """ Connects the file system manager. Raises: FileSystemException: If there is any error connecting the file system manager. FileSystemNotSupportedException: If the device does not support filesystem feature. """ if self._is_connected: return # The file system manager talks directly with the serial port in raw # mode, so disconnect the device. Not disconnecting the device will # cause the internal XBee device frame reader to consume the data # required by the file system manager from the serial port. if self._xbee_device.is_open: self._xbee_device.close() self._device_was_connected = True self._old_read_timeout = self._serial_port.get_read_timeout() try: self._serial_port.set_read_timeout(_READ_PORT_TIMEOUT) self._serial_port.open() self._is_connected = True if not self._supports_filesystem(): raise FileSystemNotSupportedException(ERROR_FILESYSTEM_NOT_SUPPORTED) except (SerialException, FileSystemNotSupportedException) as exc: # Close port if it is open. if self._serial_port.isOpen(): self._serial_port.close() self._is_connected = False try: # Restore serial port timeout. self._serial_port.set_read_timeout(self._old_read_timeout) except SerialException: # Ignore this error as it is not critical and will not provide # much info but confusion. pass if isinstance(exc, SerialException): raise FileSystemException(_ERROR_CONNECT_FILESYSTEM % str(exc)) from None raise exc
[docs] def disconnect(self): """ Disconnects the file system manager and restores the device connection. Raises: XBeeException: If there is any error restoring the XBee connection. """ if not self._is_connected: return # Exit AT command mode. self._exit_atcmd_mode() # Restore serial port timeout. try: self._serial_port.set_read_timeout(self._old_read_timeout) except SerialException: pass self._serial_port.close() self._is_connected = False if self._device_was_connected: time.sleep(0.3) self._xbee_device.open()
[docs] def get_current_directory(self): """ Returns the current device directory. Returns: String: Current device directory. Raises: FileSystemException: If there is any error getting the current directory or the function is not supported. """ _log.info("Retrieving working directory") return self._execute_command(_FilesystemFunction.PWD).replace("\r", "")
[docs] def change_directory(self, directory): """ Changes the current device working directory to the given one. Args: directory (String): New directory to change to. Returns: String: Current device working directory after the directory change. Raises: FileSystemException: If there is any error changing the current directory or the function is not supported. """ # Sanity checks. if not directory: return self.get_current_directory() # Sanitize path. directory = directory.replace('\\', '/') _log.info("Navigating to directory '%s'", directory) return self._execute_command(_FilesystemFunction.CD, directory).replace("\r", "")
[docs] def make_directory(self, directory): """ Creates the provided directory. Args: directory (String): New directory to create. Raises: FileSystemException: If there is any error creating the directory or the function is not supported. """ # Sanity checks. if not directory or directory == "/" or directory == "\\": return # Sanitize path. directory = directory.replace('\\', '/') current_dir = self.get_current_directory() try: # Create intermediate directories in case it is required. temp_path = "/" if directory.startswith("/") else current_dir directory_chunks = directory.split("/") for chunk in directory_chunks: if not chunk: continue if not temp_path.endswith("/"): temp_path += "/" temp_path += chunk # Check if directory exists by navigating to it. try: self.change_directory(temp_path) except FileSystemException: # Directory does not exist, create it. _log.info("Creating directory '%s'", temp_path) self._execute_command(_FilesystemFunction.MD, temp_path) finally: self.change_directory(current_dir)
[docs] def list_directory(self, directory=None): """ Lists the contents of the given directory. Args: directory (String, optional): the directory to list its contents. If not provided, the current directory contents are listed. Returns: List: list of `:class:`.FilesystemElement` objects contained in the given (or current) directory. Raises: FileSystemException: if there is any error listing the directory contents or the function is not supported. """ if not directory: _log.info("Listing directory contents of current dir") answer = self._execute_command(_FilesystemFunction.LS) else: # Sanitize path. directory = directory.replace('\\', '/') _log.info("Listing directory contents of '%s'", directory) answer = self._execute_command(_FilesystemFunction.LS_DIR, directory) path = self.get_current_directory() if directory is None else directory if path != _PATH_SEPARATOR: path += _PATH_SEPARATOR filesystem_elements = [] lines = answer.split("\r") for line in lines: # Ignore empty lines. if len(str.strip(line)) == 0: continue result = re.match(_PATTERN_FILE_SYSTEM_DIRECTORY, line) if result is not None and len(result.groups()) > 0: name = result.groups()[0] filesystem_elements.append(FileSystemElement( name, path + name, is_dir=True, is_secure=name.endswith(_SECURE_ELEMENT_SUFFIX))) else: result = re.match(_PATTERN_FILE_SYSTEM_FILE, line) if result is not None and len(result.groups()) > 1: name = result.groups()[1] size = int(result.groups()[0]) filesystem_elements.append(FileSystemElement( name, path + name, size=size, is_secure=name.endswith(_SECURE_ELEMENT_SUFFIX))) else: _log.warning("Unknown filesystem element entry: %s", line) return filesystem_elements
[docs] def remove_element(self, element_path): """ Removes the given file system element path. Args: element_path (String): Path of the file system element to remove. Raises: FileSystemException: If there is any error removing the element or the function is not supported. """ # Sanity checks. if not element_path: return # Sanitize path. element_path = element_path.replace('\\', '/') _log.info("Removing file '%s'", element_path) self._execute_command(_FilesystemFunction.RM, element_path)
[docs] def move_element(self, source_path, dest_path): """ Moves the given source element to the given destination path. Args: source_path (String): Source path of the element to move. dest_path (String): Destination path of the element to move. Raises: FileSystemException: If there is any error moving the element or the function is not supported. """ # Sanity checks. if not source_path or not dest_path: return # Sanitize paths. source_path = source_path.replace('\\', '/') dest_path = dest_path.replace('\\', '/') _log.info("Moving file '%s' to '%s'", source_path, dest_path) self._execute_command(_FilesystemFunction.MV, source_path, dest_path)
[docs] def put_file(self, source_path, dest_path, secure=False, progress_callback=None): """ Transfers the given file in the specified destination path of the XBee. Args: source_path (String): the path of the file to transfer. dest_path (String): the destination path to put the file in. secure (Boolean, optional, default=`False`): `True` if the file should be stored securely, `False` otherwise. progress_callback (Function, optional): Function to execute to receive progress information. Takes the following arguments: * The progress percentage as integer. Raises: FileSystemException: If there is any error transferring the file or the function is not supported. """ # Sanity checks. if secure and not self._is_function_supported(_FilesystemFunction.XPUT): raise FileSystemException(_ERROR_FUNCTION_NOT_SUPPORTED % _FilesystemFunction.XPUT.cmd_name) if not secure and not self._is_function_supported(_FilesystemFunction.PUT): raise FileSystemException(_ERROR_FUNCTION_NOT_SUPPORTED % _FilesystemFunction.PUT.cmd_name) # Sanitize destination path. dest_path = dest_path.replace('\\', '/') # Create intermediate directories if required. dest_parent = os.path.dirname(dest_path) if len(dest_parent) == 0: dest_parent = self.get_current_directory() self.make_directory(dest_parent) # Initial XBee3 firmware does not allow to overwrite existing files. # If the file to upload already exists, remove it first. if not self._is_function_supported(_FilesystemFunction.MV): dest_name = os.path.basename(dest_path) elements = self.list_directory(dest_parent) for element in elements: if not element.is_dir and element.name == dest_name: self.remove_element(element.path) break _log.info("Uploading file '%s' to '%s'", source_path, dest_path) command = _COMMAND_ATFS % (_FilesystemFunction.XPUT.command % dest_path) if secure else \ _COMMAND_ATFS % (_FilesystemFunction.PUT.command % dest_path) answer = self._execute_command(_FilesystemFunction.XPUT, dest_path) if secure else \ self._execute_command(_FilesystemFunction.PUT, dest_path) if not answer.endswith(xmodem.XMODEM_CRC): raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), "Transfer not ready")) # Transfer the file. try: xmodem.send_file_ymodem( source_path, self._xmodem_write_cb, self._xmodem_read_cb, progress_cb=progress_callback, log=_log) except XModemException as exc: raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), str(exc))) from None # Read operation result. answer = self._read_data(timeout=_READ_DATA_TIMEOUT, empty_retries=_READ_EMPTY_DATA_RETRIES) if not answer: raise FileSystemException(_ERROR_TIMEOUT) self._check_function_error(answer, command)
[docs] def put_dir(self, source_dir, dest_dir=None, progress_callback=None): """ Uploads the given source directory contents into the given destination directory in the device. Args: source_dir (String): Local directory to upload its contents. dest_dir (String, optional): Remote directory to upload the contents to. Defaults to current directory. progress_callback (Function, optional): Function to execute to receive progress information. Takes the following arguments: * The file being uploaded as string. * The progress percentage as integer. Raises: FileSystemException: If there is any error uploading the directory or the function is not supported. """ # Sanity checks. if not source_dir: return # First make sure destination directory exists. if dest_dir is None: dest_dir = self.get_current_directory() else: self.make_directory(dest_dir) # Upload directory contents. for file in listdir(source_dir): if isfile(os.path.join(source_dir, file)): bound_callback = None if progress_callback is None \ else functools.partial(progress_callback, *[str(os.path.join(dest_dir, file))]) self.put_file(str(os.path.join(source_dir, file)), str(os.path.join(dest_dir, file)), progress_callback=bound_callback) else: self.put_dir(str(os.path.join(source_dir, file)), str(os.path.join(dest_dir, file)), progress_callback=progress_callback)
[docs] def get_file(self, source_path, dest_path, progress_callback=None): """ Downloads the given XBee device file in the specified destination path. Args: source_path (String): Path of the XBee device file to download. dest_path (String): Destination path to store the file in. progress_callback (Function, optional): Function to execute to receive progress information. Takes the following arguments: * The progress percentage as integer. Raises: FileSystemException: If there is any error downloading the file or the function is not supported. """ command = _COMMAND_ATFS % (_FilesystemFunction.GET.command % source_path) # Sanitize path. source_path = source_path.replace('\\', '/') _log.info("Downloading file '%s' to '%s'", source_path, dest_path) self._execute_command(_FilesystemFunction.GET, source_path, wait_for_answer=False) try: # Consume data until 'NAK' is received. deadline = _get_milliseconds() + (_NAK_TIMEOUT * 1000) nak_received = False while not nak_received and _get_milliseconds() < deadline: data = self._xmodem_read_cb(1, timeout=_TRANSFER_TIMEOUT) if data and data[0] == xmodem.XMODEM_NAK: nak_received = True if not nak_received: raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), "Transfer not ready")) except SerialException as exc: raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), str(exc))) from None # Receive the file. try: xmodem.get_file_ymodem(dest_path, self._xmodem_write_cb, self._xmodem_read_cb, progress_cb=progress_callback, log=_log) except XModemException as exc: raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), str(exc))) from None # Read operation result. answer = self._read_data() if not answer: raise FileSystemException(_ERROR_TIMEOUT) self._check_function_error(answer, command)
[docs] def format_filesystem(self): """ Formats the device file system. Raises: FileSystemException: If there is any error formatting the file system. """ command = _COMMAND_ATFS % _FilesystemFunction.FORMAT.command _log.info("Formatting file system...") self._execute_command(_FilesystemFunction.FORMAT, wait_for_answer=False) try: deadline = _get_milliseconds() + (_FORMAT_TIMEOUT * 1000) ok_received = False while not ok_received and _get_milliseconds() < deadline: answer = self._read_data() self._check_function_error(answer, command) if _COMMAND_MODE_ANSWER_OK in answer: ok_received = True if not ok_received: raise FileSystemException(_ERROR_TIMEOUT) except SerialException as exc: raise FileSystemException(_ERROR_EXECUTE_COMMAND % (command.replace("\r", ""), str(exc))) from None
[docs] def get_usage_information(self): """ Returns the file system usage information. Returns: Dictionary: Collection of pair values describing the usage information. Raises: FileSystemException: If there is any error retrieving the file system usage information. """ _log.info("Reading file system usage information...") answer = self._execute_command(_FilesystemFunction.INFO) info = {} parts = str.strip(answer).split("\r") for part in parts: result = re.match(_PATTERN_FILE_SYSTEM_INFO, part) if result is not None and len(result.groups()) > 1: info[result.groups()[1]] = result.groups()[0] return info
[docs] def get_file_hash(self, file_path): """ Returns the SHA256 hash of the given file path. Args: file_path (String): Path of the file to get its hash. Returns: String: SHA256 hash of the given file path. Raises: FileSystemException: If there is any error retrieving the file hash. """ # Sanitize path. file_path = file_path.replace('\\', '/') _log.info("Retrieving SHA256 hash of file '%s'...", file_path) answer = self._execute_command(_FilesystemFunction.HASH, file_path) parts = answer.split(_ANSWER_SHA256) if len(parts) <= 1: raise FileSystemException( _ERROR_EXECUTE_COMMAND % ( (_COMMAND_ATFS % (_FilesystemFunction.HASH.command % file_path)).replace("\r", ""), "Invalid hash received")) return str.strip(parts[1])
[docs]def update_remote_filesystem_image(remote_device, ota_filesystem_file, max_block_size=0, timeout=None, progress_callback=None): """ Performs a remote filesystem update operation in the given target. Args: remote_device (:class:`.RemoteXBeeDevice`): Remote XBee to update its filesystem image. ota_filesystem_file (String): Path of the OTA filesystem file to upload. max_block_size (Integer, optional): Maximum size of the ota block to send. timeout (Integer, optional): Timeout to wait for remote frame requests. progress_callback (Function, optional): Function to execute to receive progress information. Receives two arguments: * The current update task as a String * The current update task percentage as an Integer Raises: FileSystemNotSupportedException: If the target does not support filesystem update. FileSystemException: If there is any error updating the remote filesystem image. """ # Import required firmware update components. from digi.xbee.firmware import FirmwareUpdateException, update_remote_filesystem # Check target compatibility. if not check_fs_support(remote_device, max_fw_vers=XB3_MAX_FW_VERSION_FS_OTA_SUPPORT): raise FileSystemNotSupportedException( "Filesystem image support update is not supported") try: update_remote_filesystem( remote_device, ota_filesystem_file, max_block_size=max_block_size, timeout=timeout, progress_callback=progress_callback) except FirmwareUpdateException as exc: _log.error("ERROR: %s", str(exc)) raise FileSystemException(str(exc)) from None
[docs]def check_fs_support(xbee, min_fw_vers=None, max_fw_vers=None): """ Checks if filesystem API feature is supported. Args: xbee (:class:`:AbstractXBeeDevice`): The XBee to check. min_fw_vers (Dictionary, optional, default=`None`): A dictionary with protocol as key, and minimum firmware version with filesystem support as value. max_fw_vers (Dictionary, optional, default=`None`): A dictionary with protocol as key, and maximum firmware version with filesystem support as value. Returns: Boolean: `True` if filesystem is supported, `False` otherwise. """ hw_version = xbee.get_hardware_version() fw_version = xbee.get_firmware_version() if not hw_version or (not fw_version and (min_fw_vers or max_fw_vers)): try: xbee.read_device_info(init=True, fire_event=False) hw_version = xbee.get_hardware_version() fw_version = xbee.get_firmware_version() except XBeeException as exc: _log.error( "Unable to read XBee hardware/firmware version to check " "filesystem support: %s", str(exc)) # Check compatibility supported_hw_versions = LOCAL_SUPPORTED_HW_VERSIONS if xbee.is_remote(): supported_hw_versions = REMOTE_SUPPORTED_HW_VERSIONS if hw_version and hw_version.code not in supported_hw_versions: return False if not fw_version: return True min_fw_version = min_fw_vers[xbee.get_protocol()] if min_fw_vers else None max_fw_version = max_fw_vers[xbee.get_protocol()] if max_fw_vers else None version = utils.bytes_to_int(fw_version) if min_fw_version and version < min_fw_version: return False if max_fw_version and version > max_fw_version: return False return True
[docs]def get_local_file_hash(local_path): """ Returns the SHA256 hash of the given local file. Args: local_path (String): Absolute path of the file to get its hash. Returns: Bytearray: SHA256 hash of the given file. """ import hashlib sha256_hash = hashlib.sha256() with open(local_path, "rb") as file: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: file.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.digest()
def _raise_exception(status, msg): st_msg = "" if status is not None: fs_st = FSCommandStatus.get(status) st_msg = ": %s" % str(fs_st) if fs_st else "Unknown status (0x%0.2X)" % status raise FileSystemException("%s%s" % (msg, st_msg), fs_status=status) def _get_milliseconds(): """ Returns the current time in milliseconds. Returns: Integer: Current time in milliseconds. """ return int(time.time() * 1000.0) def _filter_non_printable(byte_array): """ Filters the non printable characters of the given byte array and returns the resulting string. Args: byte_array (Bytearray): Byte array to filter. Return: String: Resulting string after filtering non printable characters of the byte array. """ return bytes(x for x in byte_array if x in _printable_ascii_bytes).decode(encoding='utf8')