Source code for labgrid.util.managedfile

import hashlib
import logging
import os
import subprocess

import attr

from .helper import get_user
from .ssh import sshmanager
from ..resource.common import Resource, NetworkResource
from ..driver.exception import ExecutionError


[docs]class ManagedFileError(Exception): pass
[docs]@attr.s class ManagedFile: """ The ManagedFile allows the synchronisation of a file to a remote host. It has to be created with the to be synced file and the target resource as argument: :: from labgrid.util.managedfile import ManagedFile ManagedFile("/tmp/examplefile", <your-resource>) Synchronisation is done with the sync_to_resource method. """ local_path = attr.ib( validator=attr.validators.instance_of(str), converter=lambda x: os.path.abspath(str(x)) ) resource = attr.ib( validator=attr.validators.instance_of(Resource), ) detect_nfs = attr.ib(default=True, validator=attr.validators.instance_of(bool))
[docs] def __attrs_post_init__(self): if not os.path.isfile(self.local_path): raise FileNotFoundError(f"Local file {self.local_path} not found") self.logger = logging.getLogger(f"{self}") self.hash = None self.rpath = None self._on_nfs_cached = None
[docs] def sync_to_resource(self, symlink=None): """sync the file to the host specified in a resource Raises: ExecutionError: if the SSH connection/copy fails """ if isinstance(self.resource, NetworkResource): host = self.resource.host conn = sshmanager.open(host) if self._on_nfs(conn): self.logger.info("File %s is accessible on %s, skipping copy", self.local_path, host) self.rpath = os.path.dirname(self.local_path) + "/" else: self.rpath = f"/var/cache/labgrid/{get_user()}/{self.get_hash()}/" self.logger.info("Synchronizing %s to %s", self.local_path, host) conn.run_check(f"mkdir -p {self.rpath}") conn.put_file( self.local_path, f"{self.rpath}{os.path.basename(self.local_path)}" ) if symlink is not None: self.logger.info("Linking") try: conn.run_check(f"test ! -e {symlink} -o -L {symlink}") except ExecutionError: raise ManagedFileError(f"Path {symlink} exists but is not a symlink.") conn.run_check( f"ln --symbolic --force --no-dereference {self.rpath}{os.path.basename(self.local_path)} {symlink}" # pylint: disable=line-too-long )
def _on_nfs(self, conn): if self._on_nfs_cached is not None: return self._on_nfs_cached if not self.detect_nfs: return False self._on_nfs_cached = False fmt = "inode=%i,size=%s,modified=%Y" local = subprocess.run(["stat", "--format", fmt, self.local_path], stdout=subprocess.PIPE) if local.returncode != 0: self.logger.debug("local: stat: unsuccessful error code %d", local.returncode) return False remote = conn.run(f"stat --format '{fmt}' {self.local_path}", decodeerrors="backslashreplace") if remote[2] != 0: self.logger.debug("remote: stat: unsuccessful error code %d", remote[2]) return False localout = local.stdout.decode("utf-8", "backslashreplace").split('\n') localout.pop() # remove trailing empty element if remote[0] != localout: self.logger.debug("state: local (%s) and remote (%s) output don't match", remote[0], localout) return False self.rpath = os.path.dirname(self.local_path) + "/" self._on_nfs_cached = True return True
[docs] def get_remote_path(self): """Retrieve the remote file path Returns: str: path to the file on the remote host """ if isinstance(self.resource, NetworkResource): return f"{self.rpath}{os.path.basename(self.local_path)}" return self.local_path
[docs] def get_hash(self): """Retrieve the hash of the file Returns: str: SHA256 hexdigest of the file """ if self.hash is not None: return self.hash hasher = hashlib.sha256() with open(self.local_path, 'rb') as f: for block in iter(lambda: f.read(1048576), b''): hasher.update(block) self.hash = hasher.hexdigest() return self.hash