Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 10 additions & 42 deletions cassandra_mirror/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
import time
import sys

from plumbum import local
from plumbum import BG
from plumbum import FG
from plumbum import LocalPath
from plumbum.commands.processes import CommandNotFound
import boto3

from .identity import get_identity
Expand All @@ -20,35 +18,19 @@
from .obsoletion import cleanup_obsoleted
from .obsoletion import mark_obsoleted
from .util import MovingTemporaryDirectory
from .util import S3Path
from .util import compute_top_prefix
from .util import continuity_code
from .util import gof3r
from .util import load_config
from .util import reverse_format_nanoseconds
from .util import serialize_context
from .util import timed_touch

from plumbum.cmd import lz4
from plumbum.cmd import tar
gof3r = None
try:
# local.get raises exceptions, violating Python norms of .get() not
# raising exceptions
gof3r = local.get('gof3r')
except CommandNotFound:
pass

logger = logging.getLogger(__name__)

s3 = boto3.resource('s3')

# despite being class-like, s3.Object is not a class
def S3Path(bucket_name, key):
self = s3.Object(bucket_name, key)
def _with_components(*components):
new_key = '/'.join((self.key,) + components)
return S3Path(self.bucket_name, new_key)
self.with_components = _with_components

return self

# A recursive directory walker
def find(path, depth):
Expand Down Expand Up @@ -161,17 +143,7 @@ def format_generation(self):
return '{:010}'.format(self.generation)



def compute_destination_prefix(top_prefix, cf_path):
return '/'.join((
top_prefix,
cf_path.dirname.name,
cf_path.name,
))

def tar_stream_command(dirname, files):
# By this point the files should be safely hard linked, so mtime and size
# can't change from under us.
size = sum(map(lambda f: f.stat().st_size, files))
max_mtime = max(f.stat().st_mtime_ns for f in files)
filenames = tuple(map(lambda f: f.basename, files))
Expand All @@ -191,8 +163,8 @@ def upload_s3(cmd, s3_object):

cmd | gof3r_cmd & FG
else:
future = cmd & BG
s3_object.upload_fileobj(future.proc.stdout)
with cmd.bgrun() as proc:
s3_object.upload_fileobj(proc.stdout)

def wrap_with_keypipe(cmd, context, config):
# This import patches plumbum's BaseCommand.
Expand Down Expand Up @@ -291,10 +263,9 @@ def upload_sstable_mutable(sstable, destination, encryption_config,
# We namespace the mutable components by their mtime
# If we did not, a point-in-time recovery would get incorrect
# repairedAt times, and we would not repair these SSTables.
reversed_mtime = (1 << 64) - mtime
destination = destination.with_components(
'mutable',
'{:016x}'.format(reversed_mtime)
reverse_format_nanoseconds(mtime)
)

upload_sstable_component(
Expand Down Expand Up @@ -450,13 +421,13 @@ def upload_global_manifest(columnfamilies, destination):
t = time.time()
t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t))
ns_since_epoch = int(time.time() * 1e9)
label = '{:016x} {}'.format((1 << 64) - ns_since_epoch, t_string)
label = '{:016x} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string)

destination = destination.with_components('manifests', label)
body = transform_cf_for_manifest(columnfamilies)

destination.upload_fileobj(body)
return destination
return label

def fix_identity(config, locs):
identity = config['context'].get('identity')
Expand All @@ -473,9 +444,6 @@ def backup_all_sstables(config, locs, destination):
class Locations(namedtuple('Locations', 'data_dir sstables_dir state_dir links_dir')):
pass

def format_s3_url(o):
return 's3://{}/{}'.format(o.bucket_name, o.key)

def do_backup():
logging.basicConfig(stream=sys.stderr)
logger.setLevel(logging.DEBUG)
Expand All @@ -497,8 +465,8 @@ def do_backup():

destination = S3Path(config['s3']['bucket'], compute_top_prefix(config))
cf_specs = backup_all_sstables(config, locs, destination)
manifest = upload_global_manifest(cf_specs, destination)
print(format_s3_url(manifest))
label = upload_global_manifest(cf_specs, destination)
print(label)

mark_obsoleted(locs)
cleanup_obsoleted(locs, 0)
2 changes: 1 addition & 1 deletion cassandra_mirror/obsoletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def mark_cf_obsoleted(orig_cf, generation):
data_dir = generation / 'data'
obsolete_marker = generation / 'obsolete'
if stat_helper(sstable_data_dir) is None:
if stat_helper(data_dir) is None:
# We never successfully linked the sstable
obsolete_marker.touch()
return
Expand Down
Loading