diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 239eedf6d3c..98f29b48c0f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1822,9 +1822,9 @@ def melt( Arguments correspond to pandas.melt arguments. """ # TODO: Implement col_level and ignore_index - value_labels: pd.Index = pd.Index( - [self.col_id_to_label[col_id] for col_id in value_vars] - ) + value_labels: pd.Index = self.column_labels[ + [self.value_columns.index(col_id) for col_id in value_vars] + ] id_labels = [self.col_id_to_label[col_id] for col_id in id_vars] unpivot_expr, (var_col_ids, unpivot_out, passthrough_cols) = unpivot( @@ -3417,6 +3417,7 @@ def unpivot( joined_array, (labels_mapping, column_mapping) = labels_array.relational_join( array_value, type="cross" ) + new_passthrough_cols = [column_mapping[col] for col in passthrough_columns] # Last column is offsets index_col_ids = [labels_mapping[col] for col in labels_array.column_ids[:-1]] @@ -3426,20 +3427,24 @@ def unpivot( unpivot_exprs: List[ex.Expression] = [] # Supports producing multiple stacked ouput columns for stacking only part of hierarchical index for input_ids in unpivot_columns: - # row explode offset used to choose the input column - # we use offset instead of label as labels are not necessarily unique - cases = itertools.chain( - *( - ( - ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), - ex.deref(column_mapping[id_or_null]) - if (id_or_null is not None) - else ex.const(None), + col_expr: ex.Expression + if not input_ids: + col_expr = ex.const(None) + else: + # row explode offset used to choose the input column + # we use offset instead of label as labels are not necessarily unique + cases = itertools.chain( + *( + ( + ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), + ex.deref(column_mapping[id_or_null]) + if (id_or_null is not None) + else ex.const(None), + ) + for i, id_or_null in enumerate(input_ids) ) - for i, id_or_null in enumerate(input_ids) ) - ) - col_expr = ops.case_when_op.as_expr(*cases) + col_expr = ops.case_when_op.as_expr(*cases) unpivot_exprs.append(col_expr) joined_array, unpivot_col_ids = joined_array.compute_values(unpivot_exprs) @@ -3457,19 +3462,48 @@ def _pd_index_to_array_value( Create an ArrayValue from a list of label tuples. The last column will be row offsets. """ + id_gen = bigframes.core.identifiers.standard_id_strings() + col_ids = [next(id_gen) for _ in range(index.nlevels)] + offset_id = next(id_gen) + rows = [] labels_as_tuples = utils.index_as_tuples(index) for row_offset in range(len(index)): - id_gen = bigframes.core.identifiers.standard_id_strings() row_label = labels_as_tuples[row_offset] row_label = (row_label,) if not isinstance(row_label, tuple) else row_label row = {} - for label_part, id in zip(row_label, id_gen): - row[id] = label_part if pd.notnull(label_part) else None - row[next(id_gen)] = row_offset + for label_part, col_id in zip(row_label, col_ids): + row[col_id] = label_part if pd.notnull(label_part) else None + row[offset_id] = row_offset rows.append(row) - return core.ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=session) + import pyarrow as pa + + if not rows: + from bigframes.dtypes import bigframes_dtype_to_arrow_dtype + + dtypes_list = getattr(index, "dtypes", None) + if dtypes_list is None: + dtypes_list = ( + [index.dtype] if hasattr(index, "dtype") else [pd.Float64Dtype()] + ) + + fields = [] + for col_id, dtype in zip(col_ids, dtypes_list): + try: + pa_type = bigframes_dtype_to_arrow_dtype(dtype) + except Exception: + pa_type = pa.string() + fields.append(pa.field(col_id, pa_type)) + fields.append(pa.field(offset_id, pa.int64())) + schema = pa.schema(fields) + pt = pa.Table.from_pylist([], schema=schema) + else: + pt = pa.Table.from_pylist(rows) + # Ensure correct column names + pt = pt.rename_columns([*col_ids, offset_id]) + + return core.ArrayValue.from_pyarrow(pt, session=session) def _resolve_index_col( diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 25cedda8f4a..c4f1eefc6a5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4301,6 +4301,32 @@ def to_gbq( result_table = result.query_job.destination assert result_table is not None + obj_ref_dest_cols = [] + for col_id in id_overrides.keys(): + try: + if ( + export_array.get_column_type(col_id) + == bigframes.dtypes.OBJ_REF_DTYPE + ): + obj_ref_dest_cols.append(id_overrides[col_id]) + except Exception: + pass + + if obj_ref_dest_cols: + table = self._session.bqclient.get_table(result_table) + new_schema = [] + for field in table.schema: + if field.name in obj_ref_dest_cols: + field_dict = field.to_api_repr() + field_dict["description"] = "bigframes_dtype: OBJ_REF_DTYPE" + new_schema.append( + google.cloud.bigquery.SchemaField.from_api_repr(field_dict) + ) + else: + new_schema.append(field) + table.schema = new_schema + self._session.bqclient.update_table(table, ["schema"]) + if temp_table_ref: bigframes.session._io.bigquery.set_table_expiration( self._session.bqclient, diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index a2abe9b817a..37ee904b60e 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -757,6 +757,13 @@ def convert_schema_field( ) -> typing.Tuple[str, Dtype]: is_repeated = field.mode == "REPEATED" if field.field_type == "RECORD": + if field.description == "bigframes_dtype: OBJ_REF_DTYPE": + bf_dtype = OBJ_REF_DTYPE # type: ignore + if is_repeated: + pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(bf_dtype)) + bf_dtype = pd.ArrowDtype(pa_type) + return field.name, bf_dtype + mapped_fields = map(convert_schema_field, field.fields) fields = [] for name, dtype in mapped_fields: diff --git a/bigframes/pandas/core/methods/describe.py b/bigframes/pandas/core/methods/describe.py index 6fd7960daf3..34c116ba27d 100644 --- a/bigframes/pandas/core/methods/describe.py +++ b/bigframes/pandas/core/methods/describe.py @@ -56,9 +56,10 @@ def describe( "max", ] ).intersection(describe_block.column_labels.get_level_values(-1)) - describe_block = describe_block.stack(override_labels=stack_cols) - - return dataframe.DataFrame(describe_block).droplevel(level=0) + if not stack_cols.empty: + describe_block = describe_block.stack(override_labels=stack_cols) + return dataframe.DataFrame(describe_block).droplevel(level=0) + return dataframe.DataFrame(describe_block) def _describe( @@ -120,5 +121,7 @@ def _get_aggs_for_dtype(dtype) -> list[aggregations.UnaryAggregateOp]: dtypes.TIME_DTYPE, ]: return [aggregations.count_op, aggregations.nunique_op] + elif dtypes.is_json_like(dtype) or dtype == dtypes.OBJ_REF_DTYPE: + return [aggregations.count_op] else: return [] diff --git a/tests/system/small/pandas/test_describe.py b/tests/system/small/pandas/test_describe.py index 6f288115128..def5da632eb 100644 --- a/tests/system/small/pandas/test_describe.py +++ b/tests/system/small/pandas/test_describe.py @@ -352,3 +352,41 @@ def test_series_groupby_describe(scalars_dfs): check_dtype=False, check_index_type=False, ) + + +def test_describe_json_and_obj_ref_returns_count(session): + # Test describe() works on JSON and OBJ_REF types (without nunique, which fails) + sql = """ + SELECT + PARSE_JSON('{"a": 1}') AS json_col, + 'gs://cloud-samples-data/vision/ocr/sign.jpg' AS uri_col + """ + df = session.read_gbq(sql) + + df["obj_ref_col"] = df["uri_col"].str.to_blob() + df = df.drop(columns=["uri_col"]) + + res = df.describe(include="all").to_pandas() + + assert "count" in res.index + assert res.loc["count", "json_col"] == 1.0 + assert res.loc["count", "obj_ref_col"] == 1.0 + + +def test_describe_with_unsupported_type_returns_empty_dataframe(session): + df = session.read_gbq("SELECT ST_GEOGPOINT(1.0, 2.0) AS geo_col") + + res = df.describe().to_pandas() + + assert len(res.columns) == 0 + assert len(res.index) == 1 + + +def test_describe_empty_dataframe_returns_empty_dataframe(session): + df = session.read_gbq("SELECT 1 AS int_col LIMIT 0") + df = df.drop(columns=["int_col"]) + + res = df.describe().to_pandas() + + assert len(res.columns) == 0 + assert len(res.index) == 1 diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 8caeabb98bc..9fba01aa396 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -5876,6 +5876,31 @@ def test_to_gbq_table_labels(scalars_df_index): assert table.labels["test"] == "labels" +def test_to_gbq_obj_ref_persists(session): + # Test that saving and loading an Object Reference retains its dtype + bdf = session.from_glob_path( + "gs://cloud-samples-data/vision/ocr/*.jpg", name="uris" + ).head(1) + + destination_table = "bigframes-dev.bigframes_tests_sys.test_obj_ref_persistence" + bdf.to_gbq(destination_table, if_exists="replace") + + loaded_df = session.read_gbq(destination_table) + assert loaded_df["uris"].dtype == dtypes.OBJ_REF_DTYPE + + +def test_dataframe_melt_multiindex(session): + # Tests that `melt` operations via count do not cause MultiIndex drops in Arrow + df = pd.DataFrame({"A": [1], "B": ["string"], "C": [3]}) + df.columns = pd.MultiIndex.from_tuples( + [("Group1", "A"), ("Group2", "B"), ("Group1", "C")] + ) + bdf = session.read_pandas(df) + + count_df = bdf.count().to_pandas() + assert count_df.shape[0] == 3 + + @pytest.mark.parametrize( ("col_names", "ignore_index"), [ diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index cce230ae17d..58b0affe6e3 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -1002,6 +1002,31 @@ def test_to_gbq_timedelta_tag_ignored_when_appending(bigquery_client, dataset_id assert table.schema[0].description is None +def test_to_gbq_obj_ref(session, dataset_id: str, bigquery_client): + destination_table = f"{dataset_id}.test_to_gbq_obj_ref" + sql = """ + SELECT + 'gs://cloud-samples-data/vision/ocr/sign.jpg' AS uri_col + """ + df = session.read_gbq(sql) + df["obj_ref_col"] = df["uri_col"].str.to_blob() + df = df.drop(columns=["uri_col"]) + + # Save the dataframe to bigquery + df.to_gbq(destination_table) + + # Verify the table schema description is added + table = bigquery_client.get_table(destination_table) + obj_ref_field = next(f for f in table.schema if f.name == "obj_ref_col") + assert obj_ref_field.field_type == "RECORD" + assert obj_ref_field.description == "bigframes_dtype: OBJ_REF_DTYPE" + + # Verify reloading it correctly restores the dtype + reloaded_df = session.read_gbq(destination_table) + assert reloaded_df["obj_ref_col"].dtype == dtypes.OBJ_REF_DTYPE + assert len(reloaded_df) == 1 + + @pytest.mark.parametrize( ("index"), [True, False], diff --git a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py index b95e4280538..341b25ca1c5 100644 --- a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py +++ b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py @@ -1394,9 +1394,17 @@ def _generate_groups(groups): return map(sge.convert, range(1, len(groups) + 1)) def visit_Aggregate(self, op, *, parent, groups, metrics): - sel = sg.select( - *self._cleanup_names(groups), *self._cleanup_names(metrics), copy=False - ).from_(parent, copy=False) + exprs = [] + if groups: + exprs.extend(self._cleanup_names(groups)) + if metrics: + exprs.extend(self._cleanup_names(metrics)) + + if not exprs: + # Empty aggregated projections are invalid in BigQuery + exprs = [sge.Literal.number(1)] + + sel = sg.select(*exprs, copy=False).from_(parent, copy=False) if groups: sel = sel.group_by(*self._generate_groups(groups.values()), copy=False) diff --git a/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py b/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py index 1fa5432a166..cd462f9e8f5 100644 --- a/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py +++ b/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py @@ -540,6 +540,15 @@ def visit_TimestampFromUNIX(self, op, *, arg, unit): def visit_Cast(self, op, *, arg, to): from_ = op.arg.dtype + if to.is_null(): + return sge.Null() + if arg is NULL or ( + isinstance(arg, sge.Cast) + and getattr(arg, "to", None) is not None + and str(arg.to).upper() == "NULL" + ): + if to.is_struct() or to.is_array(): + return sge.Cast(this=NULL, to=self.type_mapper.from_ibis(to)) if from_.is_timestamp() and to.is_integer(): return self.f.unix_micros(arg) elif from_.is_integer() and to.is_timestamp():