diff --git a/cassandra_mirror/backup.py b/cassandra_mirror/backup.py index 98900fd..e80156d 100644 --- a/cassandra_mirror/backup.py +++ b/cassandra_mirror/backup.py @@ -7,11 +7,9 @@ import time import sys -from plumbum import local from plumbum import BG from plumbum import FG from plumbum import LocalPath -from plumbum.commands.processes import CommandNotFound import boto3 from .identity import get_identity @@ -20,35 +18,19 @@ from .obsoletion import cleanup_obsoleted from .obsoletion import mark_obsoleted from .util import MovingTemporaryDirectory +from .util import S3Path from .util import compute_top_prefix from .util import continuity_code +from .util import gof3r from .util import load_config +from .util import reverse_format_nanoseconds from .util import serialize_context from .util import timed_touch from plumbum.cmd import lz4 from plumbum.cmd import tar -gof3r = None -try: - # local.get raises exceptions, violating Python norms of .get() not - # raising exceptions - gof3r = local.get('gof3r') -except CommandNotFound: - pass - logger = logging.getLogger(__name__) -s3 = boto3.resource('s3') - -# despite being class-like, s3.Object is not a class -def S3Path(bucket_name, key): - self = s3.Object(bucket_name, key) - def _with_components(*components): - new_key = '/'.join((self.key,) + components) - return S3Path(self.bucket_name, new_key) - self.with_components = _with_components - - return self # A recursive directory walker def find(path, depth): @@ -161,17 +143,7 @@ def format_generation(self): return '{:010}'.format(self.generation) - -def compute_destination_prefix(top_prefix, cf_path): - return '/'.join(( - top_prefix, - cf_path.dirname.name, - cf_path.name, - )) - def tar_stream_command(dirname, files): - # By this point the files should be safely hard linked, so mtime and size - # can't change from under us. size = sum(map(lambda f: f.stat().st_size, files)) max_mtime = max(f.stat().st_mtime_ns for f in files) filenames = tuple(map(lambda f: f.basename, files)) @@ -191,8 +163,8 @@ def upload_s3(cmd, s3_object): cmd | gof3r_cmd & FG else: - future = cmd & BG - s3_object.upload_fileobj(future.proc.stdout) + with cmd.bgrun() as proc: + s3_object.upload_fileobj(proc.stdout) def wrap_with_keypipe(cmd, context, config): # This import patches plumbum's BaseCommand. @@ -291,10 +263,9 @@ def upload_sstable_mutable(sstable, destination, encryption_config, # We namespace the mutable components by their mtime # If we did not, a point-in-time recovery would get incorrect # repairedAt times, and we would not repair these SSTables. - reversed_mtime = (1 << 64) - mtime destination = destination.with_components( 'mutable', - '{:016x}'.format(reversed_mtime) + reverse_format_nanoseconds(mtime) ) upload_sstable_component( @@ -450,13 +421,13 @@ def upload_global_manifest(columnfamilies, destination): t = time.time() t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t)) ns_since_epoch = int(time.time() * 1e9) - label = '{:016x} {}'.format((1 << 64) - ns_since_epoch, t_string) + label = '{:016x} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string) destination = destination.with_components('manifests', label) body = transform_cf_for_manifest(columnfamilies) destination.upload_fileobj(body) - return destination + return label def fix_identity(config, locs): identity = config['context'].get('identity') @@ -473,9 +444,6 @@ def backup_all_sstables(config, locs, destination): class Locations(namedtuple('Locations', 'data_dir sstables_dir state_dir links_dir')): pass -def format_s3_url(o): - return 's3://{}/{}'.format(o.bucket_name, o.key) - def do_backup(): logging.basicConfig(stream=sys.stderr) logger.setLevel(logging.DEBUG) @@ -497,8 +465,8 @@ def do_backup(): destination = S3Path(config['s3']['bucket'], compute_top_prefix(config)) cf_specs = backup_all_sstables(config, locs, destination) - manifest = upload_global_manifest(cf_specs, destination) - print(format_s3_url(manifest)) + label = upload_global_manifest(cf_specs, destination) + print(label) mark_obsoleted(locs) cleanup_obsoleted(locs, 0) diff --git a/cassandra_mirror/obsoletion.py b/cassandra_mirror/obsoletion.py index 0139ce6..037e2f8 100644 --- a/cassandra_mirror/obsoletion.py +++ b/cassandra_mirror/obsoletion.py @@ -8,7 +8,7 @@ def mark_cf_obsoleted(orig_cf, generation): data_dir = generation / 'data' obsolete_marker = generation / 'obsolete' - if stat_helper(sstable_data_dir) is None: + if stat_helper(data_dir) is None: # We never successfully linked the sstable obsolete_marker.touch() return diff --git a/cassandra_mirror/restore.py b/cassandra_mirror/restore.py index 3e587af..e7d94a6 100644 --- a/cassandra_mirror/restore.py +++ b/cassandra_mirror/restore.py @@ -1,4 +1,5 @@ from collections import namedtuple +from io import BytesIO from subprocess import PIPE from tempfile import TemporaryDirectory import argparse @@ -10,23 +11,25 @@ import sys import time -import keypipe from concurrent import futures from concurrent.futures import ThreadPoolExecutor -from keypipe.plumbum_helpers import ThreadCommand +from plumbum import BG from plumbum import FG from plumbum import LocalPath +from plumbum.commands.modifiers import PIPE import boto3 +from .util import S3Path from .util import MovingTemporaryDirectory from .util import compute_top_prefix from .util import continuity_code +from .util import gof3r from .util import load_config +from .util import reverse_format_nanoseconds from .util import serialize_context from .util import timed_touch -from plumbum.cmd import s3gof3r from plumbum.cmd import lz4 from plumbum.cmd import tar @@ -44,10 +47,43 @@ def get_common_prefixes(bucket, prefix): for i in result.search('CommonPrefixes'): yield i['Prefix'][cut:-1] -def get_columnfamilies(bucket, prefix): - for ks in get_common_prefixes(bucket, '{}/'.format(prefix)): - for cf in get_common_prefixes(bucket, '{}/{}/'.format(prefix, ks)): - yield ks, cf +def get_manifest_entries(source, label): + source = source.with_components('manifests', label) + manifest = source.read_utf8() + for i in manifest.splitlines(keepends=False): + max_generation, ks_cf = i.split() + ks, cf = ks_cf.split('/') + yield ks, cf, max_generation + +def keypipe_cmd(provider_args, context): + import keypipe + from keypipe.plumbum_helpers import ThreadCommand + keypipe_partial = functools.partial( + keypipe.unseal, + provider_args, + context, + ) + + return ThreadCommand(keypipe_partial) + +def pipe(): + (r, w) = os.pipe() + return BytesIO(r), BytesIO(w) + + +def download_s3(cmd, s3_object): + if gof3r: + gof3r_cmd = s3gof3r[ + 'get', + '--no-md5', + '-b', s3_object.bucket_name, + '-k', s3_object.key, + ] + gof3r_cmd | cmd & FG + else: + with cmd.bgrun(stdin=PIPE) as future: + s3_object.download_fileobj(future.stdin) + def download_to_path( marker_path, @@ -59,30 +95,17 @@ def download_to_path( context = serialize_context(encryption_context) logger.debug("Invoking keypipe with context %s", context) - keypipe_partial = functools.partial( - keypipe.unseal, - provider_args, - context, - ) - - gof3r_cmd = s3gof3r[ - 'get', - '--no-md5', - '-b', s3_object.bucket_name, - '-k', s3_object.key, - ] prefix = marker_path.name + '.' with TemporaryDirectory(prefix=prefix, dir=destination.up()) as d: temp_destination = LocalPath(d) + cmd = lz4['-d'] | tar['-C', temp_destination, '-x'] + if provider_args: + cmd = keypipe_cmd(provider_args, context) | cmd + start_time = time.time() - ( - gof3r_cmd / - keypipe_partial | - lz4['-d'] | - tar['-C', temp_destination, '-x'] - ) & FG + download_s3(cmd, s3_object) finish_time = time.time() elapsed = finish_time - start_time @@ -129,9 +152,11 @@ def download_sstable_to_path( """ return - provider_args = { - config['encryption']['provider']: config['encryption']['args'] - } + provider_args = None + if 'encryption' in config: + provider_args = { + config['encryption']['provider']: config['encryption']['args'] + } uploaded_dir = path / 'uploaded' uploaded_dir.mkdir() @@ -158,16 +183,11 @@ def download_sstable_to_path( ManifestEntry = namedtuple('ManifestEntry', 'generation, mtime') -def get_sstables_to_download_for_cf(config, bucket, prefix, ks, cf): - for i in bucket.objects.filter( - Prefix='{}/{}/{}/manifest/'.format(prefix, ks, cf), - ).limit(1): - manifest_lines = i.get()['Body'].read().decode('utf8').splitlines() - return [ - ManifestEntry(*line.split()) - for line in manifest_lines - ] +def get_sstables_to_download_for_cf(source, ks, cf, max_gen): + manifest = source.with_components('data', ks, cf, max_gen, 'manifest') + manifest_lines = manifest.read_utf8().splitlines(keepends=False) + return [ManifestEntry(*line.split()) for line in manifest_lines] def create_metadata_directories(sstables): for cf_dir, ks, cf, manifest_entries in sstables: @@ -176,47 +196,52 @@ def create_metadata_directories(sstables): uploaded_dir.mkdir() (uploaded_dir / 'manifest').mkdir() +def _get_sstable_download_instructions( + cf_dir, + source, + context, + entry, +): + generation, mutable_mtime = entry + generation_dir = cf_dir / entry.generation + source = source.with_components(entry.generation) + context['generation'] = int(generation) + + mutable_mtime = int(entry.mtime) + reversed_mtime = reverse_format_nanoseconds(mutable_mtime) + + objects = ( + ('mutable', source.with_components('mutable', reversed_mtime)), + ('immutable', source.with_components('immutable')), + ) + + return generation_dir, objects, context, mutable_mtime + def _get_download_instructions_for_cf( identity, - bucket, - prefix, - sstable + source, + sstable, ): cf_dir, ks, cf, manifest_entries = sstable - for entry in manifest_entries: - generation, mutable_mtime = entry - generation_dir = cf_dir / generation - sstable_prefix = '{}/{}/{}/data/{}'.format(prefix, ks, cf, generation) - - mutable_mtime = int(mutable_mtime) - reversed_mtime = (1<<64) - mutable_mtime - - context = dict( - identity=identity, - keyspace=ks, - columnfamily=cf, - generation=int(generation), - ) + source = source.with_components('data', ks, cf) + context = dict( + identity=identity, + keyspace=ks, + columnfamily=cf, + ) - objects = ( - ('mutable', bucket.Object('{}/mutable/{:020}'.format( - sstable_prefix, - reversed_mtime - ))), - ('immutable', bucket.Object('{}/immutable'.format( - sstable_prefix - ))), + for entry in manifest_entries: + yield _get_sstable_download_instructions( + cf_dir, source, dict(context), entry ) - yield generation_dir, objects, context, mutable_mtime - -def get_download_instructions(identity, bucket, prefix, sstables): +def get_download_instructions(identity, source, sstables): for i in sstables: - yield from _get_download_instructions_for_cf(identity, bucket, prefix, i) + yield from _get_download_instructions_for_cf(identity, source, i) -def get_sstables_to_download(config, bucket, prefix): - for ks, cf in get_columnfamilies(bucket, prefix): - tables = get_sstables_to_download_for_cf(config, bucket, prefix, ks, cf) +def get_sstables_to_download(source, label): + for ks, cf, max_gen in get_manifest_entries(source, label): + tables = get_sstables_to_download_for_cf(source, ks, cf, max_gen) yield ks, cf, tables def compute_cf_dirs(base, sstables): @@ -241,21 +266,21 @@ def copy_back(src, dst): dst ) -def restore(identity): +def restore(identity, manifest_label, workers): config = load_config() s3 = boto3.resource('s3') - bucket = s3.Bucket(config['s3']['bucket']) - prefix = compute_top_prefix(config, identity) + source = S3Path(config['s3']['bucket'], compute_top_prefix(config)) - i = get_sstables_to_download(config, bucket, prefix) + i = get_sstables_to_download(source, manifest_label) sstables = list(compute_cf_dirs(LocalPath('mirrored'), i)) create_metadata_directories(sstables) - instructions = get_download_instructions(identity, bucket, prefix, sstables) + instructions = get_download_instructions(identity, source, sstables) - # It is assumed we'll be disk-bound, so I've chosen a typical disk queue depth. - with futures.ThreadPoolExecutor(max_workers=32) as executor: + # It is assumed we'll be disk-bound, so I've chosen a typical disk queue + # depth. + with futures.ThreadPoolExecutor(workers) as executor: fs = [ executor.submit(download_sstable_to_path, config, *i) for i in instructions @@ -279,9 +304,12 @@ def do_restore(): parser = argparse.ArgumentParser( description='Restore a Cassandra backup into the current working directory.' ) + parser.add_argument('--workers', '-w', type=int, default=32) parser.add_argument('source_identity', help='The identity (typically UUID) of the node whose backup should be restored' ) + parser.add_argument('manifest_label', nargs='?') + args = parser.parse_args() logging.basicConfig(stream=sys.stderr) @@ -291,7 +319,7 @@ def do_restore(): logger.info('Skipping restoration because a data directory already exists') return - restore(args.source_identity) + restore(args.source_identity, args.manifest_label, args.workers) if __name__ == '__main__': sys.exit(do_restore()) diff --git a/cassandra_mirror/util.py b/cassandra_mirror/util.py index 1cc4990..f817c4f 100644 --- a/cassandra_mirror/util.py +++ b/cassandra_mirror/util.py @@ -9,6 +9,36 @@ import json import yaml +from plumbum import local +from plumbum.commands.processes import CommandNotFound +gof3r = None +try: + # local.get raises exceptions, violating Python norms of .get() not + # raising exceptions + gof3r = local.get('gof3r') +except CommandNotFound: + pass + +s3 = boto3.resource('s3') + +# despite being class-like, s3.Object is not a class +def S3Path(bucket_name, key): + self = s3.Object(bucket_name, key) + def _with_components(*components): + new_key = '/'.join((self.key,) + components) + return S3Path(self.bucket_name, new_key) + + def _read_utf8(): + return self.get()["Body"].read().decode("utf-8") + + self.with_components = _with_components + self.read_utf8 = _read_utf8 + + return self + +def reverse_format_nanoseconds(ns): + return '{:016x}'.format((1 << 64) - ns) + # This is incorporated into the encryption context, representing the program that uploaded the # file. The goal is to disambiguate the files produced by this utility from files produced by some # other utility. It is called a continuity code because it will remain the same for the life of @@ -39,7 +69,7 @@ def timed_touch(path, mtime): except FileNotFoundError: with NamedTemporaryFile(dir=path.dirname) as f: os.utime(f.fileno(), ns=(mtime, mtime)) - os.link(f.name, str(path)) + os.link(f.name, str(path)) def compose(f): def wrapper(g): diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index dec7387..0000000 --- a/requirements.txt +++ /dev/null @@ -1,16 +0,0 @@ -Cython==0.25.2 -PyYAML==3.12 -boto3==1.4.2 -botocore==1.4.81 -cffi==1.9.1 -contexter==0.1.3 -docutils==0.12 -jmespath==0.9.0 -plumbum==1.6.3 -git+https://github.com/hashbrowncipher/keypipe.git@ea9455c2129b626c9d63d6101bf6ee854207b8a8 -pycparser==2.17 -python-dateutil==2.6.0 -s3transfer==0.1.9 -six==1.10.0 -git+https://github.com/kivy/pyjnius.git@5b14aaf76213295f91596db99a7b340b2efb9f1a -cachetools==2.0.0 diff --git a/setup.py b/setup.py index 8509486..e168468 100644 --- a/setup.py +++ b/setup.py @@ -18,4 +18,7 @@ 'PyYAML>=3.12', 'plumbum>=1.6.3', ], + dependency_links = [ + 'git+https://github.com/hashbrowncipher/plumbum.git@d57e53955536423857be87ec394e6eb376acaddf#egg=plumbum', + ] )