Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
"""
Cockroach source module
"""
import re
import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Iterable, List, Optional, Tuple

from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.engine.reflection import Inspector

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
Expand Down Expand Up @@ -69,6 +71,13 @@

PGDialect.ischema_names = ischema_names

# Regex pattern to identify CockroachDB hidden shard columns.
# These columns are created for hash-sharded indexes with names like
# 'crdb_internal_id_shard_16'. They are marked as NOT VISIBLE and
# should be filtered from constraint columns.
# See: https://www.cockroachlabs.com/docs/stable/hash-sharded-indexes
HIDDEN_SHARD_COLUMN_PATTERN = re.compile(r"^crdb_internal_.*_shard_\d+$")


class CockroachSource(CommonDbSourceService, MultiDBSource):
"""
Expand Down Expand Up @@ -107,6 +116,47 @@ def get_schema_description(self, schema_name: str) -> Optional[str]:
"""
return self.schema_desc_map.get((self.context.get().database, schema_name))

@staticmethod
def _is_hidden_shard_column(column_name: str) -> bool:
"""
Check if a column is a CockroachDB hidden shard column.

CockroachDB creates hidden virtual columns for hash-sharded indexes
with names like 'crdb_internal_id_shard_16'. These columns are marked
as NOT VISIBLE and are not returned by get_columns(), but they appear
in primary key constraints, causing validation errors.

See: https://www.cockroachlabs.com/docs/stable/hash-sharded-indexes
"""
return bool(HIDDEN_SHARD_COLUMN_PATTERN.match(column_name))

def _get_columns_with_constraints(
self, schema_name: str, table_name: str, inspector: Inspector
) -> Tuple[List, List, List]:
"""
Get columns with constraints, filtering out hidden shard columns
from primary key constraints.

CockroachDB uses hidden virtual columns (NOT VISIBLE) for hash-sharded
indexes. These columns appear in primary key constraints but are not
included in the column list, causing a mismatch that results in
'Invalid column name found in table constraint' errors from the server.
"""
(
pk_columns,
unique_columns,
foreign_columns,
) = super()._get_columns_with_constraints(schema_name, table_name, inspector)

# Filter out hidden shard columns from primary key constraints
if pk_columns:
filtered_pk_columns = [
col for col in pk_columns if not self._is_hidden_shard_column(col)
]
pk_columns = filtered_pk_columns

return pk_columns, unique_columns, foreign_columns

def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
Expand Down
166 changes: 166 additions & 0 deletions ingestion/tests/unit/topology/database/test_cockroach.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
ConstraintType,
DataType,
TableType,
)
Expand Down Expand Up @@ -267,3 +268,168 @@ def test_datatype(self):
def test_close_connection(self, engine, connection):
connection.return_value = True
self.cockroach_source.close()

def test_is_hidden_shard_column(self):
"""
Test that hidden shard columns are correctly identified.

CockroachDB creates hidden virtual columns for hash-sharded indexes
with names like 'crdb_internal_id_shard_16'. These columns are marked
as NOT VISIBLE and should be filtered from constraint columns.

See: https://www.cockroachlabs.com/docs/stable/hash-sharded-indexes
"""
# These should be identified as hidden shard columns
self.assertTrue(
CockroachSource._is_hidden_shard_column("crdb_internal_id_shard_16")
)
self.assertTrue(
CockroachSource._is_hidden_shard_column("crdb_internal_user_id_shard_8")
)
self.assertTrue(
CockroachSource._is_hidden_shard_column(
"crdb_internal_my_column_name_shard_32"
)
)
self.assertTrue(
CockroachSource._is_hidden_shard_column(
"crdb_internal_start_time_end_time_shard_4"
)
)

# These should NOT be identified as hidden shard columns
self.assertFalse(CockroachSource._is_hidden_shard_column("id"))
self.assertFalse(CockroachSource._is_hidden_shard_column("user_id"))
self.assertFalse(CockroachSource._is_hidden_shard_column("crdb_internal"))
self.assertFalse(CockroachSource._is_hidden_shard_column("crdb_internal_shard"))
self.assertFalse(
CockroachSource._is_hidden_shard_column("crdb_internal_id_shard")
)
self.assertFalse(
CockroachSource._is_hidden_shard_column("my_crdb_internal_id_shard_16")
)
self.assertFalse(
CockroachSource._is_hidden_shard_column("crdb_internal_id_shard_16_extra")
)

def test_hidden_shard_columns_filtered_from_pk_constraints(self):
"""
Test that hidden shard columns are filtered from primary key constraints.

When a table uses hash-sharded indexes, CockroachDB adds hidden shard
columns to the primary key. These should be filtered out to prevent
'Invalid column name found in table constraint' errors.
"""
inspector = types.SimpleNamespace()
inspector.get_columns = (
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
# Simulate a primary key with both regular and hidden shard columns
inspector.get_pk_constraint = lambda table_name, schema_name: {
"constrained_columns": [
"crdb_internal_id_shard_16", # Hidden shard column - should be filtered
"username", # Regular column - should be kept
],
"name": "test_table_pkey",
}
inspector.get_unique_constraints = lambda table_name, schema_name: []
inspector.get_foreign_keys = lambda table_name, schema_name: []

(
columns,
table_constraints,
_,
) = self.cockroach_source.get_columns_and_constraints(
"public", "test_table", "cockroach", inspector, TableType.Regular
)

# Find the column named 'username' and check its constraint
username_col = next(
(col for col in columns if col.name.root == "username"), None
)
self.assertIsNotNone(username_col)
# Since we now have only one pk_column after filtering, it should be a column-level constraint
self.assertEqual(username_col.constraint, Constraint.PRIMARY_KEY)

def test_all_hidden_shard_columns_filtered_results_in_no_pk(self):
"""
Test edge case where all primary key columns are hidden shard columns.

If all columns in the primary key are hidden shard columns, the
resulting pk_columns list should be empty.
"""
inspector = types.SimpleNamespace()
inspector.get_columns = (
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
# All primary key columns are hidden shard columns
inspector.get_pk_constraint = lambda table_name, schema_name: {
"constrained_columns": [
"crdb_internal_id_shard_16",
"crdb_internal_timestamp_shard_8",
],
"name": "test_table_pkey",
}
inspector.get_unique_constraints = lambda table_name, schema_name: []
inspector.get_foreign_keys = lambda table_name, schema_name: []

(
columns,
table_constraints,
_,
) = self.cockroach_source.get_columns_and_constraints(
"public", "test_table", "cockroach", inspector, TableType.Regular
)

# No table-level primary key constraint should be created
pk_constraints = [
tc
for tc in (table_constraints or [])
if tc.constraintType == ConstraintType.PRIMARY_KEY
]
self.assertEqual(len(pk_constraints), 0)

# No column should have PRIMARY_KEY constraint
for col in columns:
self.assertNotEqual(col.constraint, Constraint.PRIMARY_KEY)

def test_multi_column_pk_with_hidden_shard_column(self):
"""
Test that multi-column primary keys with hidden shard columns work correctly.

When a table has a composite primary key with both regular columns and
hidden shard columns, only the regular columns should remain in the
constraint after filtering.
"""
inspector = types.SimpleNamespace()
inspector.get_columns = (
lambda table_name, schema_name, table_type, db_name: MOCK_COLUMN_VALUE
)
# Simulate a composite primary key with hidden shard column
inspector.get_pk_constraint = lambda table_name, schema_name: {
"constrained_columns": [
"crdb_internal_id_shard_16", # Hidden - should be filtered
"username", # Regular column
"geom_c", # Regular column
],
"name": "test_table_pkey",
}
inspector.get_unique_constraints = lambda table_name, schema_name: []
inspector.get_foreign_keys = lambda table_name, schema_name: []

(
columns,
table_constraints,
_,
) = self.cockroach_source.get_columns_and_constraints(
"public", "test_table", "cockroach", inspector, TableType.Regular
)

# Should have a table-level PRIMARY_KEY constraint with the two visible columns
pk_constraints = [
tc
for tc in (table_constraints or [])
if tc.constraintType == ConstraintType.PRIMARY_KEY
]
self.assertEqual(len(pk_constraints), 1)
self.assertEqual(pk_constraints[0].columns, ["username", "geom_c"])
Loading