Skip to content

DM-54070: Add support for APDB record updates#33

Draft
JeremyMcCormick wants to merge 44 commits intomainfrom
tickets/DM-54070
Draft

DM-54070: Add support for APDB record updates#33
JeremyMcCormick wants to merge 44 commits intomainfrom
tickets/DM-54070

Conversation

@JeremyMcCormick
Copy link
Contributor

No description provided.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end support for exporting, uploading, deduplicating, and merging APDB update records into PPDB BigQuery tables, along with supporting SQL resources and test coverage updates.

Changes:

  • Introduces BigQuery “updates” subsystem (expand → load → deduplicate → merge) with SQL MERGE resources.
  • Extends chunk export/upload metadata to track update-record presence and GCS location (gcs_uri), and adds promotable-chunk SQL/query utilities.
  • Refactors tests/config to use a resource-based test schema path and adds multiple BigQuery/GCS integration tests.

Reviewed changes

Copilot reviewed 36 out of 41 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
tests/test_updates_table.py BigQuery UpdatesTable integration tests
tests/test_updates_merger.py BigQuery MERGE integration tests
tests/test_updates_manager.py End-to-end updates manager test
tests/test_update_records.py UpdateRecords JSON + GCS uploader tests
tests/test_update_record_expander.py Unit tests for update expansion
tests/test_ppdb_sql.py Switch tests to schema resource URI
tests/test_ppdb_bigquery.py Adds BigQuery test case scaffolding
requirements.txt Removes ppdbx-gcp from base reqs
python/lsst/dax/ppdb/tests/config/init.py Test config package init
python/lsst/dax/ppdb/tests/_updates.py Shared synthetic update-record fixtures
python/lsst/dax/ppdb/tests/_ppdb.py Adds test schema URI + mixin refactor
python/lsst/dax/ppdb/tests/_bigquery.py BigQuery test mixins + uploader stub helpers
python/lsst/dax/ppdb/sql/_ppdb_sql.py Engine creation API change for DB init
python/lsst/dax/ppdb/sql/_ppdb_sql_base.py Refactors engine/connect-args helpers
python/lsst/dax/ppdb/ppdb.py Imports config from new module
python/lsst/dax/ppdb/ppdb_config.py New pydantic-based config loader
python/lsst/dax/ppdb/config/sql/select_promotable_chunks.sql New SQL for promotable chunk selection
python/lsst/dax/ppdb/config/sql/merge_diasource_updates.sql New MERGE SQL for DiaSource updates
python/lsst/dax/ppdb/config/sql/merge_diaobject_updates.sql New MERGE SQL for DiaObject updates
python/lsst/dax/ppdb/config/sql/merge_diaforcedsource_updates.sql New MERGE SQL for DiaForcedSource updates
python/lsst/dax/ppdb/config/schemas/test_apdb_schema.yaml Adds test schema as package data
python/lsst/dax/ppdb/bigquery/updates/updates_table.py Creates/loads/dedups updates table
python/lsst/dax/ppdb/bigquery/updates/updates_merger.py Merger classes for applying updates
python/lsst/dax/ppdb/bigquery/updates/updates_manager.py Orchestrates download/expand/load/merge
python/lsst/dax/ppdb/bigquery/updates/update_records.py Pydantic model for update records JSON
python/lsst/dax/ppdb/bigquery/updates/update_record_expander.py Expands logical updates into field updates
python/lsst/dax/ppdb/bigquery/updates/expanded_update_record.py Model for a single expanded update row
python/lsst/dax/ppdb/bigquery/updates/init.py Exports updates public API
python/lsst/dax/ppdb/bigquery/sql_resource.py Loads SQL from package resources
python/lsst/dax/ppdb/bigquery/replica_chunk_promoter.py Promotion workflow for staged chunks
python/lsst/dax/ppdb/bigquery/query_runner.py Utility for running/logging BQ jobs
python/lsst/dax/ppdb/bigquery/ppdb_replica_chunk_extended.py Adds gcs_uri to chunk metadata
python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py Writes update_records.json; adds promotable-chunks query
python/lsst/dax/ppdb/bigquery/manifest.py Tracks whether updates are included
python/lsst/dax/ppdb/bigquery/chunk_uploader.py Uploads update_records.json; stores gcs_uri
python/lsst/dax/ppdb/bigquery/init.py Exports ChunkUploader
python/lsst/dax/ppdb/_factory.py Updates config import location
python/lsst/dax/ppdb/init.py Re-exports new config module
pyproject.toml Adds package data + gcp extra dep
docker/Dockerfile.replication Adds build deps for Python packages
.gitignore Ignores .scratch directory
Comments suppressed due to low confidence (1)

python/lsst/dax/ppdb/tests/_bigquery.py:33

  • This module imports google.cloud.storage unconditionally. Because _bigquery.py is used by multiple test cases/mixins, this can break test collection in environments where optional GCP dependencies aren’t installed. Wrap these imports in try/except (similar to other tests) and skip/disable GCP-dependent helpers when unavailable.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +296 to 317
# Next two steps are inapplicable to empty chunks.
if not is_empty:
# 3) Update status and GCS URI in the database
gcs_prefix = posixpath.join(self.bucket_name, gcs_prefix)
updated_replica_chunk = replica_chunk.with_new_status(ChunkStatus.UPLOADED).with_new_gcs_uri(
f"gs://{gcs_prefix}"
)
try:
self._bq.store_chunk(replica_chunk.with_new_status(ChunkStatus.UPLOADED), True)
self._bq.store_chunk(updated_replica_chunk, True)
_LOG.info(
"Updated replica chunk %d in database with status 'uploaded' and GCS URI: %s",
chunk_id,
gcs_prefix,
)
except Exception as e:
raise ChunkUploadError(
chunk_id, "failed to update replica chunk status in database"
) from e
raise ChunkUploadError(chunk_id, "Failed to update replica chunk in database") from e

# 4) Publish Pub/Sub staging message to trigger BigQuery load, but
# not for empty chunks. (Empty chunks cannot be staged.)
if not is_empty:
# 4) Publish Pub/Sub event to trigger staging of the chunk in
# BigQuery
try:
self._post_to_stage_chunk_topic(self.bucket_name, gcs_prefix, chunk_id)
except Exception as e:
Comment on lines +45 to +92
def __init__(self, client: bigquery.Client, target_table_name: str = None) -> None:
"""
Parameters
----------
client
BigQuery client.
target_table_name
Optional name of the target table. If not provided, the class-level
TABLE_NAME will be used.
"""
self._client: bigquery.Client = client
self._target_table_name = target_table_name or self.TABLE_NAME

@property
def target_table_name(self) -> str:
"""Get the name of the target table this merger applies to."""
return self._target_table_name

@target_table_name.setter
def target_table_name(self, value: str) -> None:
"""Set the name of the target table this merger applies to."""
self._target_table_name = value

def merge(self, *, updates_table_fqn: str, target_dataset_fqn: str) -> bigquery.QueryJob:
"""
Apply updates from the updates table specified by `updates_table_fqn`
to the target table in the `target_dataset_fqn` dataset.

Parameters
----------
updates_table_fqn
Fully-qualified BigQuery table name containing updates.
target_dataset_fqn
Fully-qualified BigQuery dataset name containing the target table.

Returns
-------
google.cloud.bigquery.job.QueryJob
The completed BigQuery job.
"""
sql = SqlResource(
self.SQL_RESOURCE_NAME,
format_args={"updates_table": updates_table_fqn, "target_dataset": target_dataset_fqn},
).sql
job = self._client.query(sql)
job.result()

return job
Comment on lines +59 to +66
self._bq_client = bigquery.Client()

self._updates_table = UpdatesTable(
self._bq_client,
f"{self._ppdb._config.project_id}.{self._ppdb._config.dataset_id}.{updates_table_name}",
)
self._updates_table.create()

Comment on lines +144 to +161
rows: list[dict[str, Any]] = [
{
"table_name": r.table_name,
"record_id": r.record_id,
"record_id_hash": self._compute_record_id_hash(r.record_id),
"field_name": r.field_name,
"value_json": r.value_json,
"replica_chunk_id": r.replica_chunk_id,
"update_order": r.update_order,
"update_time_ns": r.update_time_ns,
}
for r in records
]

print("Inserting rows into BigQuery:", rows) # Debug print to verify the data being loaded

job = self._client.load_table_from_json(
rows,
Comment on lines +50 to +66
@staticmethod
def _compute_record_id_hash(record_id: list[int]) -> str:
"""Compute MD5 hash of a record_id list for deduplication.

Parameters
----------
record_id : list[int]
The record ID as a list of integers.

Returns
-------
str
Full 64-character hexadecimal MD5 hash of the record_id list.
"""
record_id_str = ",".join(str(x) for x in record_id)
return hashlib.md5(record_id_str.encode()).hexdigest()

Comment on lines +169 to +191
# Get the record ID
record_id = cls._get_record_id(update_record)
record_id_hash = cls._compute_record_id_hash(record_id)

expanded_records = []
for field_name in field_names:
if not hasattr(update_record, field_name):
raise ValueError(
f"Update record of type {update_type} is missing expected field {field_name}"
)

value = getattr(update_record, field_name)

expanded_record = ExpandedUpdateRecord(
table_name=table_name,
record_id=record_id,
record_id_hash=record_id_hash,
field_name=field_name,
value_json=value,
replica_chunk_id=replica_chunk_id,
update_order=update_record.update_order,
update_time_ns=update_record.update_time_ns,
)
Comment on lines +139 to +149
# WHERE table_name = 'DiaForcedSource'
results = list(self.client.query(query).result())

print(results) # Debug print to see what was inserted
# Should have one DiaForcedSource record
# self.assertEqual(len(results), 1)
# row = results[0]
# self.assertEqual(row.table_name, "DiaForcedSource")
# self.assertEqual(row.record_id, [200001, 12345, 42])
# self.assertEqual(row.field_name, "timeWithdrawnMjdTai")
# self.assertEqual(row.replica_chunk_id, self.replica_chunk_id)
Comment on lines +291 to +299
# Should have 8 total expanded records:
# - 1 from ApdbReassignDiaSourceToDiaObjectRecord
# - 2 from ApdbReassignDiaSourceToSSObjectRecord
# - 1 from ApdbWithdrawDiaSourceRecord
# - 1 from ApdbWithdrawDiaForcedSourceRecord
# - 2 from ApdbCloseDiaObjectValidityRecord
# - 1 from ApdbUpdateNDiaSourcesRecord
self.assertEqual(len(expanded), 10)

This adds an option for getting the PPDB Postgres password from the
Google Secrets Manager if the `PPDB_USE_SECRETS_MANAGER` environment
variable is set to `true`.
This follows DM naming conventions, since the module defines the class
`PpdbConfig`.
This functionality is moved into this repository, so that the cloud
functions may access it.
Some test modules require that there are valid Google credentials
available. This adds a check so that if these are not present, the
tests will be skipped, e.g., in GitHub CI where they should not run
but failures should be avoided.
from lsst.dax.ppdb.tests._updates import _create_test_update_records


@unittest.skipIf(updates is None, "Google Cloud environment not available")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should check the Google credentials instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants