Source code for PythonCK.ioutils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""

Collections of tools related to I/O.

"""

import sys
import os
import hashlib
import tempfile
import contextlib
import subprocess

## Python2/3 compat
from six import StringIO

## Local
from . import logger
from .decorators import cache_to_file

#===============================================================================

[docs]def dump_to_file(obj, destination=None, suffix=''): """ Quickly dump file, creating folder/file along the way. For instance, where python/xml option file is needed to be created on-the-fly, in order to supply the path to that file for another process. Args: obj (object): Arbitary object to write. The str(obj) will be called. If it's string and looks like URL, it'll download instead. destination (str): String for the destination location. If None, create temp file at system's temp dir. File name will be made from SHA-1 of given object, this is useful for caching to reused dumped object. suffix (str): Suffix (extension) to append to outputfile. Needed in some case where suffix will be looked ahead. Returns: String of path to dumped file. >>> tmpdir = getfixture('tmpdir') >>> oldcwd = tmpdir.chdir() >>> dump_to_file('some_data').split('/')[-1] '256be736caed19be589e439b0d5b8392340d82bc' >>> dump_to_file('some_data', suffix='.py').split('/')[-1] '256be736caed19be589e439b0d5b8392340d82bc.py' >>> dump_to_file('some_data', 'target').replace(str(tmpdir), '...') '.../target' >>> dump_to_file('some_data', 'target', suffix='.py').replace(str(tmpdir), '...') '.../target.py' >>> dump_to_file('some_data', 'target.py', suffix='.py').replace(str(tmpdir), '...') '.../target.py' >>> dump_to_file('some_data', 'dname/target.py').replace(str(tmpdir), '...') '.../dname/target.py' >>> _ = oldcwd.chdir() """ ## Prepare the destination. if not destination: fname = hashlib.sha1(repr(obj).encode('utf-8')).hexdigest() destination = os.path.join(tempfile.gettempdir(), fname) ## Add request suffix if suffix and not destination.endswith(suffix): destination += suffix ## Make sure the destination dir exists destination = os.path.abspath(destination) dname = os.path.split(destination)[0] if not os.path.exists(dname): os.makedirs(dname) ## Finally, write out the output with open(destination, 'w') as fout: fout.write(str(obj)) return destination
#===============================================================================
[docs]def checksum(filepath): """ Given filepath to specific file, return its checksum (unique identifier). Optimized for large file, buffered reading. Args: filepath (str): String to filepath Return: hex string checksum >>> checksum('tests/res/ioutils/checksum.txt') '220e9a9970406e4c688e2c27b8858073f6e2bd33' """ BLOCKSIZE = 65536 hasher = hashlib.sha1() with open(filepath, 'rb') as afile: buf = afile.read(BLOCKSIZE) while len(buf) > 0: hasher.update(buf) buf = afile.read(BLOCKSIZE) return hasher.hexdigest()
#=============================================================================== @cache_to_file def _get_size_and_date(path): """ Helper method to retrieve both dirsize and its modified date, result is cached. Used internally by other function. """ ## Sum the content size in dir arg = "find %s -type f -exec ls -l {} \\; | awk '{sum += $5} END {print sum}'" stdout = subprocess.check_output(arg%path, shell=True) ## Can be null string for completely empty dir size = int(stdout.split()[0]) if stdout.strip() else 0 mtime = os.stat(path).st_mtime return size, mtime
[docs]def size(path, force_reload=False, early_giveup=False): """ Retrive the total size of given path to directory. Try to be smart by caching the directory size, and invalidate result by checking the st_mtime of that path. Args: path (str): Path to directory to check force_reload (bool): If True, the new result will be calculated regardless the cache. early_giveup (bool): If True, will return 0 immediately is this request has no previous result cached. Return: int representing size in BYTES Caveats: - Because disc usage != file size, use `ls` instead of `du` (compatibility) with OSX's BSD's `du` Usage:: >>> import time >>> tmpdir = getfixture('chtmpdir') ## In case of non-existent path, return zero. >>> size('non/existent/path') 0 ## New content >>> tmpdir.join("hello.txt").write("content") >>> size(tmpdir) 7 ## Second file in the same directory, invalidate the cache. >>> time.sleep(1) # Need at least 1 second for st_mtime to propagate >>> tmpdir.join("hello2.txt").write("contents".encode('utf-8')) >>> size(tmpdir) 15 ## Make sure that if directory disappear, the cache will be deleted. >>> tmpdir.mkdir('sub').join('hello3.txt').write('content') >>> size(tmpdir.join('sub')) 7 >>> tmpdir.join('sub').remove() >>> size(tmpdir.join('sub')) 0 ## DEV: early_giveup, force_reload >>> tmpdir.mkdir('sub2').join('test.txt').write('foobar') >>> size(tmpdir.join('sub2'), early_giveup=True) 0 >>> size(tmpdir.join('sub2'), force_reload=True) 6 REF: http://superuser.com/questions/22460/how-do-i-get-the-size-of-a-linux-or-mac-os-x-directory-from-the-command-line """ path = str(path) # Pytest compat logger.debug('Sizing: %r'%path) ## Early abort if not exists if not os.path.exists(path): logger.debug('Path not existed: '+path) return 0 ## Make a call, potentially from cache. val = _get_size_and_date(path, force_reload=force_reload, early_giveup=early_giveup) if val is None: # early gaveup return 0 size,mtime0 = val # unpack ## Guarantee to have the newest result already if force_reload: logger.debug('Sizing: return with force_reload=True') return size ## Deal with the expiration mtime = os.stat(path).st_mtime if mtime == mtime0: # Cache result is usable because folder hasn't changed. logger.debug('Sizing: return with good cache: %r' % mtime) return size ## Outdated, do force reload. logger.debug('outdated: %r != %r'%(mtime, mtime0)) return _get_size_and_date(path, force_reload=True)[0]
#=============================================================================== _suffixes = ('B', 'KB', 'MB', 'GB', 'TB', 'PB')
[docs]def humansize(nbytes): """ Given the value in bytes, return the human-readible string. Args: nbytes (int): Size in BYTES Return: str: Human-readible size in string. Usage: >>> humansize(None) is None True >>> humansize(0) '0 B' >>> humansize(12) '12.00 B' >>> humansize(12000) '11.72 KB' >>> humansize(1E10) '9.31 GB' """ if nbytes is None: return None if nbytes == 0: return '0 B' i = 0 while nbytes >= 1024 and i < len(_suffixes)-1: nbytes /= 1024. i += 1 return '%.2f %s' % (nbytes, _suffixes[i])
#===============================================================================
[docs]@contextlib.contextmanager def capture(): r""" REF: http://stackoverflow.com/questions/5136611/capture-stdout-from-a-script-in-python Usage: >>> with capture() as std_out_err: ... print('hi') >>> out,err = std_out_err >>> out 'hi\n' >>> err '' """ oldout,olderr = sys.stdout,sys.stderr try: out = [StringIO(), StringIO()] sys.stdout,sys.stderr = out yield out finally: sys.stdout,sys.stderr = oldout, olderr out[0] = out[0].getvalue() out[1] = out[1].getvalue()
#===============================================================================