Source code for hashtree.cli

"""hashtree command"""

import atexit
import shlex
import shutil
import subprocess
import sys
from pathlib import Path
from tempfile import NamedTemporaryFile

import click
import click.core

from .clio import CLIO
from .exception_handler import ExceptionHandler
from .hash import DEFAULT_HASH, HASHES, HashDigest
from .progress import ProgressReader
from .shell import _shell_completion
from .version import __timestamp__, __version__

header = f"{__name__.split('.')[0]} v{__version__} {__timestamp__}"


def _ehandler(ctx, option, debug):
    ctx.obj = dict(ehandler=ExceptionHandler(debug))
    ctx.obj["debug"] = debug


HASH_CHOICES = list(HASHES)

DEFAULT_SORT_ARGS = "-ifdk1.1"


@click.command("hashtree", context_settings={"auto_envvar_prefix": "HASHTREE", "show_default": True})
@click.version_option(message=header)
@click.option(
    "-d",
    "--debug",
    is_eager=True,
    is_flag=True,
    callback=_ehandler,
    help="debug mode",
)
@click.option(
    "--shell-completion",
    is_flag=False,
    flag_value="[auto]",
    callback=_shell_completion,
    help="configure shell completion",
)
@click.option(
    "-h",
    "--hash",
    type=click.Choice(HASH_CHOICES),
    default=DEFAULT_HASH,
    help="select checksum hash",
)
@click.option("-p/-P", "--progress/--no-progress", is_flag=True, help="show/hide progress bar")
@click.option("-a", "--ascii", is_flag=True, help="ASCII progress bar")
@click.option("-w", "--width", type=int, help="progress bar width")
@click.option("-f/-F", "--find/--no-find", is_flag=True, default=True, help="generate file list with 'find'")
@click.option(
    "-s/-S", "--sort-files/--no-sort-files", is_flag=True, default=True, help="sort input/generated file list"
)
@click.option("-o/-O", "--sort-output/--no-sort-output", is_flag=True, default=False, help="sort output with 'sort'")
@click.option("-k", "--sort-args", default=DEFAULT_SORT_ARGS, help="args passed to sort")
@click.option(
    "-b",
    "--base-dir",
    type=click.Path(file_okay=False, readable=True),
    default=".",
    help="base directory for file list",
)
@click.argument("INFILE", type=click.Path(dir_okay=False), default="-")
@click.argument("OUTFILE", type=click.Path(dir_okay=False, writable=True), default="-")
@click.pass_context
def cli(
    ctx,
    debug,
    shell_completion,
    base_dir,
    ascii,
    find,
    hash,
    sort_files,
    sort_output,
    sort_args,
    progress,
    width,
    infile,
    outfile,
):
    """generate hash digest for list of files"""

    return hashtree(
        base_dir,
        infile,
        outfile,
        ascii=ascii,
        find=find,
        hash=hash,
        sort_files=sort_files,
        sort_output=sort_output,
        sort_args=sort_args,
        progress=progress,
        width=width
    )


[docs] def hashtree( base_dir, infile, outfile, *, ascii=False, find=True, hash=None, sort_files=True, sort_output=False, sort_args=None, progress=False, width=None, ): base = Path(base_dir) if sort_args is None: sort_args = DEFAULT_SORT_ARGS if progress is None and is_stdio(outfile): progress = False if find: infile = find_files(base, infile) elif is_stdio(infile): infile = spool_stdin() if sort_files: sort_file(infile, sort_args) # if sorting output, redirect to tempfile if sort_output: sorted_outfile = outfile outfile = create_tempfile() else: sorted_outfile = None progress_kwargs = {} if ascii: progress_kwargs["ascii"] = True if progress is not None: progress_kwargs["disable"] = not progress if width is not None: progress_kwargs["ncols"] = width generate_hash_digests(base, infile, outfile, hash, progress_kwargs) if sorted_outfile: write_sorted_output(outfile, sorted_outfile, sort_args)
[docs] def generate_hash_digests(base, infile, outfile, hash, progress_kwargs): hasher = HashDigest(base, hash) with CLIO(infile, outfile) as clio: with ProgressReader(clio.ifp, **progress_kwargs) as reader: for filename in reader.readlines(): if filename.startswith(str(base)): filename = Path(filename).relative_to(base) digest = hasher.file_digest(filename) clio.ofp.write(digest + "\n")
[docs] def write_sorted_output(spool, outfile, sort_args): sort_file(spool, sort_args) with Path(spool).open("r") as ifp: if is_stdio(outfile): copy_stream(ifp, sys.stdout) else: with Path(outfile).open("w") as ofp: copy_stream(ifp, ofp)
[docs] def copy_stream(in_stream, out_stream): while True: buf = in_stream.read() if buf: out_stream.write(buf) else: return
[docs] def spool_stdin(): tempfile = create_tempfile() with Path(tempfile).open("w") as ofp: copy_stream(sys.stdin, ofp) return tempfile
[docs] def is_stdio(filename): return filename in ("-", None)
[docs] def sort_file(filename, sort_args): if is_stdio(filename): raise RuntimeError("cannot sort stdio") cmd = shlex.split(sort_args) cmd.insert(0, "sort") # use system sort to handle huge streams with NamedTemporaryFile(delete=False, dir=".") as ofp: with Path(filename).open("r") as ifp: subprocess.run(cmd, stdin=ifp, stdout=ofp, check=True, text=True) ofp.close() tempfile = Path(ofp.name) try: tempfile.rename(filename) except OSError as exc: if exc.errno == 18: # Invalid cross-device link shutil.copyfile(tempfile, filename) tempfile.unlink() else: raise
[docs] def create_tempfile(): filename = NamedTemporaryFile(delete=False, dir=".", prefix="hashtree_file_list").name atexit.register(reaper, filename) return filename
[docs] def find_files(base, filename): if filename in [".", "-", None]: filename = create_tempfile() with Path(filename).open("w") as ofp: subprocess.run(["find", ".", "-type", "f"], cwd=str(base), stdout=ofp, check=True, text=True) return filename
[docs] def reaper(filename): """delete a file""" Path(filename).unlink()
if __name__ == "__main__": sys.exit(cli()) # pragma: no cover