Skip to content
125 changes: 125 additions & 0 deletions dandiapi/api/management/commands/correct_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

from copy import deepcopy
import sys
import typing
from typing import Any

from django.db import transaction
import djclick as click

from dandiapi.api.manifests import write_dandiset_jsonld, write_dandiset_yaml
from dandiapi.api.models import Version
from dandiapi.api.services import audit


@click.command(
help='Correct corrupted metadata. If `--all` is provided, apply the correction to '
'all Dandiset versions. Otherwise, provide the Dandiset to '
'apply the correction to.'
)
@click.argument('dandiset', required=False)
@click.option(
'--all',
'apply_to_all',
is_flag=True,
default=False,
help='Apply the correction to all Dandiset versions '
'(cannot be combined with dandiset argument).',
)
@click.option(
'--check',
is_flag=True,
help="Don't perform any changes, just check for corrupted metadata.",
)
def correct_metadata( # noqa: C901
*, dandiset: str | None, apply_to_all: bool, check: bool
):
if apply_to_all:
if dandiset is not None:
raise click.UsageError('Cannot specify `--all` together with `dandiset` argument.')
elif dandiset is None:
raise click.UsageError('Either `--all` or `dandiset` argument must be provided.')

# Get version queryset
vers = Version.objects.all()
if not apply_to_all:
dandiset = typing.cast(str, dandiset)
vers = vers.filter(dandiset=int(dandiset), version='draft')

if not vers.exists():
click.echo('No matching versions found')
return

# For each version, find and fix metadata corruptions, along with saving out manifest files
for ver in vers.iterator():
new_meta = correct_affiliation_corruption(ver.metadata)
if new_meta is None:
continue

click.echo(f'Found corruption in {ver}')
if check:
continue

# Save each version in a separate transaction to avoid de-sync with dandiset yaml/jsonld
with transaction.atomic():
write_dandiset_yaml(ver)
write_dandiset_jsonld(ver)
click.echo(f'\tWrote dandiset yaml and json for version {ver}')

ver.metadata = new_meta
ver.save()

audit.update_metadata(
dandiset=ver.dandiset,
metadata=new_meta,
user=None,
admin=True,
description='Apply metadata correction from https://github.com/dandi/dandi-schema/issues/276',
)

# Remaining check is not needed since no data was modified
if check:
return

# If we find any un-fixed instances, raise exception
remaining = [
ver for ver in vers.iterator() if correct_affiliation_corruption(ver.metadata) is not None
]
if remaining:
click.echo(
click.style(f'\nFound remaining corrupted versions: {remaining}', fg='red', bold=True)
)
sys.exit(1)


def correct_affiliation_corruption(meta: dict) -> dict | None:
"""
Correct corruptions in JSON objects with the `"schemaKey"` of `"Affiliation"`.

:param meta: The Dandiset metadata instance potentially containing the objects to be corrected.
:return: If there is correction to be made, return the corrected metadata; otherwise, return
`None`.

Note: This function corrects the corruptions described in
https://github.com/dandi/dandi-schema/issues/276
"""
new_meta = deepcopy(meta)
correct_objs(new_meta)

return new_meta if new_meta != meta else None


def correct_objs(data: Any) -> None:
if isinstance(data, dict):
if 'schemaKey' in data and data['schemaKey'] == 'Affiliation':
data.pop('contactPoint', None)
data.pop('includeInCitation', None)
data.pop('roleName', None)
for value in data.values():
correct_objs(value)
elif isinstance(data, list):
for item in data:
correct_objs(item)
else:
return
74 changes: 74 additions & 0 deletions dandiapi/api/tests/test_correct_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from __future__ import annotations

from copy import deepcopy

import pytest

from dandiapi.api.management.commands.correct_metadata import correct_affiliation_corruption


@pytest.mark.parametrize(
('input_meta', 'expected_output'),
[
# No Affiliation object: nothing to change.
(
{'key': 'value'},
None,
),
# Affiliation exists but has no unwanted fields: returns None.
(
{'affiliation': {'schemaKey': 'Affiliation', 'name': 'Alice'}},
None,
),
# Single unwanted field ("contactPoint") should be removed.
(
{'affiliation': {'schemaKey': 'Affiliation', 'name': 'Alice', 'contactPoint': 'info'}},
{'affiliation': {'schemaKey': 'Affiliation', 'name': 'Alice'}},
),
# Multiple unwanted fields should all be removed.
(
{
'affiliation': {
'schemaKey': 'Affiliation',
'name': 'Test',
'contactPoint': 'a',
'includeInCitation': 'b',
'roleName': 'c',
}
},
{'affiliation': {'schemaKey': 'Affiliation', 'name': 'Test'}},
),
# Nested Affiliation objects should be corrected.
(
{
'users': [
{'profile': {'schemaKey': 'Affiliation', 'name': 'Bob', 'roleName': 'Member'}},
{'profile': {'schemaKey': 'Affiliation', 'name': 'Charlie'}},
],
'data': {'schemaKey': 'NotAffiliation', 'contactPoint': 'should not be touched'},
},
{
'users': [
{'profile': {'schemaKey': 'Affiliation', 'name': 'Bob'}},
{'profile': {'schemaKey': 'Affiliation', 'name': 'Charlie'}},
],
'data': {'schemaKey': 'NotAffiliation', 'contactPoint': 'should not be touched'},
},
),
],
)
def test_correct_affiliation_corruption(input_meta, expected_output):
"""
Test `correct_affiliation_corruption()`.

Ensure that it returns the correct modified metadata (if any corrections are needed)
while not mutating the original input.
"""
# Make a deep copy of the input to ensure immutability.
original_meta = deepcopy(input_meta)
result = correct_affiliation_corruption(input_meta)

assert result == expected_output

# Verify that the original metadata has not been mutated.
assert input_meta == original_meta, 'The input metadata should remain unchanged.'