diff --git a/.agent/skills/README.md b/.agent/skills/README.md new file mode 100644 index 000000000000..d02e89c9f2e8 --- /dev/null +++ b/.agent/skills/README.md @@ -0,0 +1,62 @@ + + +# Apache Beam Skills + +This directory contains skills that help the agent perform specialized tasks in the Apache Beam codebase. + +## Available Skills + +| Skill | Description | +|-------|-------------| +| [beam-concepts](beam-concepts/SKILL.md) | Core Beam programming model (PCollections, PTransforms, windowing, triggers) | +| [ci-cd](ci-cd/SKILL.md) | GitHub Actions workflows, debugging CI failures, triggering tests | +| [contributing](contributing/SKILL.md) | PR workflow, issue management, code review, release cycles | +| [gradle-build](gradle-build/SKILL.md) | Build commands, flags, publishing, troubleshooting | +| [io-connectors](io-connectors/SKILL.md) | 51+ I/O connectors, testing patterns, usage examples | +| [java-development](java-development/SKILL.md) | Java SDK development, building, testing, project structure | +| [license-compliance](license-compliance/SKILL.md) | Apache 2.0 license headers for all new files | +| [python-development](python-development/SKILL.md) | Python SDK environment setup, testing, building pipelines | +| [runners](runners/SKILL.md) | Direct, Dataflow, Flink, Spark runner configuration | + +## How Skills Work + +1. **Discovery**: The agent scans skill descriptions to find relevant ones +2. **Activation**: When a skill matches the task, the agent reads the full `SKILL.md` +3. **Execution**: The agent follows the skill's instructions + +## Skill Structure + +Each skill folder contains: +- `SKILL.md` - Main instruction file with YAML frontmatter + +```yaml +--- +name: skill-name +description: Concise description for when to use this skill +--- +# Skill Content +Detailed instructions... +``` + +## Adding New Skills + +1. Create a new folder under `.agent/skills/` +2. Add a `SKILL.md` with YAML frontmatter (`name`, `description`) +3. Write clear, actionable instructions in the markdown body diff --git a/.agent/skills/beam-concepts/SKILL.md b/.agent/skills/beam-concepts/SKILL.md new file mode 100644 index 000000000000..da3dd9fbf319 --- /dev/null +++ b/.agent/skills/beam-concepts/SKILL.md @@ -0,0 +1,246 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: beam-concepts +description: Explains core Apache Beam programming model concepts including PCollections, PTransforms, Pipelines, and Runners. Use when learning Beam fundamentals or explaining pipeline concepts. +--- + +# Apache Beam Core Concepts + +## The Beam Model +Evolved from Google's MapReduce, FlumeJava, and Millwheel projects. Originally called the "Dataflow Model." + +## Key Abstractions + +### Pipeline +A Pipeline encapsulates the entire data processing task, including reading, transforming, and writing data. + +```java +// Java +Pipeline p = Pipeline.create(options); +p.apply(...) + .apply(...) + .apply(...); +p.run().waitUntilFinish(); +``` + +```python +# Python +with beam.Pipeline(options=options) as p: + (p | 'Read' >> beam.io.ReadFromText('input.txt') + | 'Transform' >> beam.Map(process) + | 'Write' >> beam.io.WriteToText('output')) +``` + +### PCollection +A distributed dataset that can be bounded (batch) or unbounded (streaming). + +#### Properties +- **Immutable** - Once created, cannot be modified +- **Distributed** - Elements processed in parallel +- **May be bounded or unbounded** +- **Timestamped** - Each element has an event timestamp +- **Windowed** - Elements assigned to windows + +### PTransform +A data processing operation that transforms PCollections. + +```java +// Java +PCollection output = input.apply(MyTransform.create()); +``` + +```python +# Python +output = input | 'Name' >> beam.ParDo(MyDoFn()) +``` + +## Core Transforms + +### ParDo +General-purpose parallel processing. + +```java +// Java +input.apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(@Element String element, OutputReceiver out) { + out.output(element.length()); + } +})); +``` + +```python +# Python +class LengthFn(beam.DoFn): + def process(self, element): + yield len(element) + +input | beam.ParDo(LengthFn()) +# Or simpler: +input | beam.Map(len) +``` + +### GroupByKey +Groups elements by key. + +```java +PCollection> input = ...; +PCollection>> grouped = input.apply(GroupByKey.create()); +``` + +### CoGroupByKey +Joins multiple PCollections by key. + +### Combine +Combines elements (sum, mean, etc.). + +```java +// Global combine +input.apply(Combine.globally(Sum.ofIntegers())); + +// Per-key combine +input.apply(Combine.perKey(Sum.ofIntegers())); +``` + +### Flatten +Merges multiple PCollections. + +```java +PCollectionList collections = PCollectionList.of(pc1).and(pc2).and(pc3); +PCollection merged = collections.apply(Flatten.pCollections()); +``` + +### Partition +Splits a PCollection into multiple PCollections. + +## Windowing + +### Types +- **Fixed Windows** - Regular, non-overlapping intervals +- **Sliding Windows** - Overlapping intervals +- **Session Windows** - Gaps of inactivity define boundaries +- **Global Window** - All elements in one window (default) + +```java +input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5)))); +``` + +```python +input | beam.WindowInto(beam.window.FixedWindows(300)) +``` + +## Triggers +Control when results are emitted. + +```java +input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))) + .triggering(AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1)))) + .withAllowedLateness(Duration.standardHours(1)) + .accumulatingFiredPanes()); +``` + +## Side Inputs +Additional inputs to ParDo. + +```java +PCollectionView> sideInput = + lookupTable.apply(View.asMap()); + +mainInput.apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Map lookup = c.sideInput(sideInput); + // Use lookup... + } +}).withSideInputs(sideInput)); +``` + +## Pipeline Options +Configure pipeline execution. + +```java +public interface MyOptions extends PipelineOptions { + @Description("Input file") + @Required + String getInput(); + void setInput(String value); +} + +MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); +``` + +## Schema +Strongly-typed access to structured data. + +```java +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class User { + public abstract String getName(); + public abstract int getAge(); +} + +PCollection users = ...; +PCollection rows = users.apply(Convert.toRows()); +``` + +## Error Handling + +### Dead Letter Queue Pattern +```java +TupleTag successTag = new TupleTag<>() {}; +TupleTag failureTag = new TupleTag<>() {}; + +PCollectionTuple results = input.apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + c.output(process(c.element())); + } catch (Exception e) { + c.output(failureTag, c.element()); + } + } +}).withOutputTags(successTag, TupleTagList.of(failureTag))); + +results.get(successTag).apply(WriteToSuccess()); +results.get(failureTag).apply(WriteToDeadLetter()); +``` + +## Cross-Language Pipelines +Use transforms from other SDKs. + +```python +# Use Java Kafka connector from Python +from apache_beam.io.kafka import ReadFromKafka + +result = pipeline | ReadFromKafka( + consumer_config={'bootstrap.servers': 'localhost:9092'}, + topics=['my-topic'] +) +``` + +## Best Practices +1. **Prefer built-in transforms** over custom DoFns +2. **Use schemas** for type-safe operations +3. **Minimize side inputs** for performance +4. **Handle late data** explicitly +5. **Test with DirectRunner** before deploying +6. **Use TestPipeline** for unit tests diff --git a/.agent/skills/ci-cd/SKILL.md b/.agent/skills/ci-cd/SKILL.md new file mode 100644 index 000000000000..6467c16e4744 --- /dev/null +++ b/.agent/skills/ci-cd/SKILL.md @@ -0,0 +1,193 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: ci-cd +description: Guides understanding and working with Apache Beam's CI/CD system using GitHub Actions. Use when debugging CI failures, understanding test workflows, or modifying CI configuration. +--- + +# CI/CD in Apache Beam + +## Overview +Apache Beam uses GitHub Actions for CI/CD. Workflows are located in `.github/workflows/`. + +## Workflow Types + +### PreCommit Workflows +- Run on PRs and merges +- Validate code changes before merge +- Naming: `beam_PreCommit_*.yml` + +### PostCommit Workflows +- Run after merge and on schedule +- More comprehensive testing +- Naming: `beam_PostCommit_*.yml` + +### Scheduled Workflows +- Run nightly on master +- Check for external dependency impacts +- Tag master with `nightly-master` + +## Key Workflows + +### PreCommit +| Workflow | Description | +|----------|-------------| +| `beam_PreCommit_Java.yml` | Java build and tests | +| `beam_PreCommit_Python.yml` | Python tests | +| `beam_PreCommit_Go.yml` | Go tests | +| `beam_PreCommit_RAT.yml` | License header checks | +| `beam_PreCommit_Spotless.yml` | Code formatting | + +### PostCommit - Java +| Workflow | Description | +|----------|-------------| +| `beam_PostCommit_Java.yml` | Full Java test suite | +| `beam_PostCommit_Java_ValidatesRunner_*.yml` | Runner validation tests | +| `beam_PostCommit_Java_Examples_*.yml` | Example pipeline tests | + +### PostCommit - Python +| Workflow | Description | +|----------|-------------| +| `beam_PostCommit_Python.yml` | Full Python test suite | +| `beam_PostCommit_Python_ValidatesRunner_*.yml` | Runner validation | +| `beam_PostCommit_Python_Examples_*.yml` | Examples | + +### Load & Performance Tests +| Workflow | Description | +|----------|-------------| +| `beam_LoadTests_*.yml` | Load testing | +| `beam_PerformanceTests_*.yml` | I/O performance | + +## Triggering Tests + +### Automatic +- PRs trigger PreCommit tests +- Merges trigger PostCommit tests + +### Manual via PR Comment +``` +retest this please +``` + +### Specific Test Suites +Use trigger phrases from [catalog](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) + +### Workflow Dispatch +Most workflows support manual triggering via GitHub UI. + +## Understanding Test Results + +### Finding Logs +1. Go to PR → Checks tab +2. Click on failed workflow +3. Expand failed job +4. View step logs + +### Common Failure Patterns + +#### Flaky Tests +- Random failures unrelated to change +- Solution: Comment `retest this please` + +#### Timeout +- Increase timeout in workflow if justified +- Or optimize test + +#### Resource Exhaustion +- GCP quota issues +- Check project settings + +## GCP Credentials + +Workflows requiring GCP access use these secrets: +- `GCP_PROJECT_ID` - Project ID (e.g., `apache-beam-testing`) +- `GCP_REGION` - Region (e.g., `us-central1`) +- `GCP_TESTING_BUCKET` - Temp storage bucket +- `GCP_PYTHON_WHEELS_BUCKET` - Python wheels bucket +- `GCP_SA_EMAIL` - Service account email +- `GCP_SA_KEY` - Base64-encoded service account key + +Required IAM roles: +- Storage Admin +- Dataflow Admin +- Artifact Registry Writer +- BigQuery Data Editor +- Service Account User + +## Self-hosted vs GitHub-hosted Runners + +### Self-hosted (majority of workflows) +- Pre-configured with dependencies +- GCP credentials pre-configured +- Naming: `beam_*.yml` + +### GitHub-hosted +- Used for cross-platform testing (Linux, macOS, Windows) +- May need explicit credential setup + +## Workflow Structure + +```yaml +name: Workflow Name +on: + push: + branches: [master] + pull_request: + branches: [master] + schedule: + - cron: '0 0 * * *' + workflow_dispatch: + +jobs: + build: + runs-on: [self-hosted, ...] + steps: + - uses: actions/checkout@v4 + - name: Run Gradle + run: ./gradlew :task:name +``` + +## Local Debugging + +### Run Same Commands as CI +Check workflow file's `run` commands: +```bash +./gradlew :sdks:java:core:test +./gradlew :sdks:python:test +``` + +### Common Issues +- Clean gradle cache: `rm -rf ~/.gradle .gradle` +- Remove build directory: `rm -rf build` +- Check Java version matches CI + +## Snapshot Builds + +### Locations +- Java SDK: https://repository.apache.org/content/groups/snapshots/org/apache/beam/ +- SDK Containers: https://gcr.io/apache-beam-testing/beam-sdk +- Portable Runners: https://gcr.io/apache-beam-testing/beam_portability +- Python SDK: gs://beam-python-nightly-snapshots + +## Release Workflows +| Workflow | Purpose | +|----------|---------| +| `cut_release_branch.yml` | Create release branch | +| `build_release_candidate.yml` | Build RC | +| `finalize_release.yml` | Finalize release | +| `publish_github_release_notes.yml` | Publish notes | diff --git a/.agent/skills/contributing/SKILL.md b/.agent/skills/contributing/SKILL.md new file mode 100644 index 000000000000..bac50c5d0cd5 --- /dev/null +++ b/.agent/skills/contributing/SKILL.md @@ -0,0 +1,149 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: contributing +description: Guides the contribution workflow for Apache Beam, including creating PRs, issue management, code review process, and release cycles. Use when contributing code, creating PRs, or understanding the contribution process. +--- + +# Contributing to Apache Beam + +## Getting Started + +### Prerequisites +- GitHub account +- Java JDK 11 (preferred, or 8, 17, 21) +- Latest Go 1.x +- Docker +- Python (any supported version for manual testing, all versions for running test suites) +- For large contributions: signed ICLA to Apache Software Foundation + +### Environment Setup Options + +#### Local Setup (automated) +```bash +./local-env-setup.sh +``` + +#### Docker-based Setup +```bash +./start-build-env.sh +``` + +## Contribution Workflow + +### 1. Find or Create an Issue +- Search existing issues at https://github.com/apache/beam/issues +- Create new issue using appropriate template + +### 2. Claim the Issue +``` +.take-issue # Assigns issue to you +.free-issue # Unassigns issue from you +.close-issue # Closes the issue +``` + +### 3. For Large Changes +- Discuss on dev@beam.apache.org mailing list +- Create design doc using [template](https://s.apache.org/beam-design-doc-template) +- Review [existing design docs](https://s.apache.org/beam-design-docs) + +### 4. Make Your Changes +- Every source file needs Apache license header +- New dependencies must have Apache-compatible open source licenses +- Add unit tests for your changes +- Use descriptive commit messages + +### 5. Create Pull Request +- Link to the issue in PR description +- Pre-commit tests run automatically +- If tests fail unrelated to your change, comment: `retest this please` + +### 6. Code Review +- Reviewers are auto-assigned within a few hours +- Use `R: @username` to request specific reviewer +- No response in 3 days? Email dev@beam.apache.org + +## Code Review Best Practices + +### For Authors +- Provide context in issue and PR description +- Avoid huge mega-changes +- Add follow-up changes as "fixup" commits (don't squash until approved) +- Squash fixup commits after approval + +### For Reviewers +- PRs can only be merged by [Beam committers](https://home.apache.org/phonebook.html?pmc=beam) + +## Testing Workflows + +### Pre-commit Tests +Run automatically on PRs. To run locally: +```bash +./gradlew javaPreCommit # Java +./gradlew :sdks:python:test # Python +./gradlew :sdks:go:test # Go +``` + +### Post-commit Tests +Run after merge. Trigger phrases in PR comments start specific test suites. +See [trigger phrase catalog](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md). + +## Formatting + +### Java +```bash +./gradlew spotlessApply +``` + +### Python +```bash +# Uses yapf, isort, pylint +pre-commit run --all-files +``` + +### CHANGES.md +```bash +./gradlew formatChanges +``` + +## Release Cycle +- Minor releases every 6 weeks +- Check [release calendar](https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com) +- Changes must be in master before release branch is cut + +## Stale PRs +- PRs become stale after 60 days of author inactivity +- Community will close stale PRs +- Authors can reopen closed PRs + +## Key Resources +- [Contribution Guide](https://beam.apache.org/contribute/) +- [PTransform Style Guide](https://beam.apache.org/contribute/ptransform-style-guide) +- [Runner Authoring Guide](https://beam.apache.org/contribute/runner-guide/) +- [Wiki Tips](https://cwiki.apache.org/confluence/display/BEAM/) + - [Git Tips](https://cwiki.apache.org/confluence/display/BEAM/Git+Tips) + - [Java Tips](https://cwiki.apache.org/confluence/display/BEAM/Java+Tips) + - [Python Tips](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) + - [Go Tips](https://cwiki.apache.org/confluence/display/BEAM/Go+Tips) + - [Gradle Tips](https://cwiki.apache.org/confluence/display/BEAM/Gradle+Tips) + +## Communication +- User mailing list: user@beam.apache.org +- Dev mailing list: dev@beam.apache.org +- Slack: [#beam channel](https://s.apache.org/beam-slack-channel) +- Issues: https://github.com/apache/beam/issues diff --git a/.agent/skills/gradle-build/SKILL.md b/.agent/skills/gradle-build/SKILL.md new file mode 100644 index 000000000000..a408a263d66d --- /dev/null +++ b/.agent/skills/gradle-build/SKILL.md @@ -0,0 +1,224 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: gradle-build +description: Guides understanding and using the Gradle build system in Apache Beam. Use when building projects, understanding dependencies, or troubleshooting build issues. +--- + +# Gradle Build System in Apache Beam + +## Overview +Apache Beam is a mono-repo using Gradle as its build system. The entire project (Java, Python, Go, website) is managed as a single Gradle project. + +## Key Files +- `build.gradle.kts` - Root build configuration +- `settings.gradle.kts` - Project structure and module definitions +- `gradle.properties` - Global properties and versions +- `buildSrc/` - Custom Gradle plugins including BeamModulePlugin + +## BeamModulePlugin +Located at `buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy` + +### Purpose +- Manages Java dependencies centrally +- Configures project types (Java, Python, Go, Proto, Docker, etc.) +- Defines common custom tasks + +### Java Project Configuration +```groovy +apply plugin: 'org.apache.beam.module' +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.kafka' +) +``` + +## Common Commands + +### Build +```bash +# Build entire project +./gradlew build + +# Build specific project +./gradlew :sdks:java:core:build +./gradlew -p sdks/java/core build + +# Compile only (no tests) +./gradlew :sdks:java:core:compileJava +``` + +### Test +```bash +# Run tests +./gradlew :sdks:java:core:test + +# Run specific test +./gradlew :sdks:java:core:test --tests *MyTest + +# Skip tests +./gradlew build -x test +``` + +### Clean +```bash +# Clean specific project +./gradlew :sdks:java:core:clean + +# Clean everything +./gradlew clean +``` + +### Formatting +```bash +# Java formatting (Spotless) +./gradlew spotlessApply + +# Check formatting +./gradlew spotlessCheck + +# Format CHANGES.md +./gradlew formatChanges +``` + +### Publishing +```bash +# Publish to Maven Local +./gradlew -Ppublishing :sdks:java:core:publishToMavenLocal + +# Publish all Java artifacts +./gradlew -Ppublishing publishToMavenLocal +``` + +## Pre-commit Tasks + +### Java +```bash +./gradlew javaPreCommit +``` + +### Python +```bash +./gradlew pythonPreCommit +``` + +### Combined +```bash +./gradlew :checkSetup # Validates Go, Java, Python environments +``` + +## Useful Flags + +| Flag | Description | +|------|-------------| +| `-p ` | Run task in specific project directory | +| `-x ` | Exclude task | +| `--tests ` | Filter tests | +| `-Ppublishing` | Enable publishing tasks | +| `-PdisableSpotlessCheck=true` | Disable formatting check | +| `-PdisableCheckStyle=true` | Disable checkstyle | +| `-PskipCheckerFramework` | Skip Checker Framework | +| `--continue` | Continue after failures | +| `--info` | Verbose output | +| `--debug` | Debug output | +| `--scan` | Generate build scan | +| `--parallel` | Parallel execution | + +## GCP-related Properties + +```bash +-PgcpProject=my-project +-PgcpRegion=us-central1 +-PgcpTempRoot=gs://bucket/temp +-PgcsTempRoot=gs://bucket/temp +``` + +## Docker Tasks + +```bash +# Build Java SDK container +./gradlew :sdks:java:container:java11:docker + +# Build Python SDK container +./gradlew :sdks:python:container:py39:docker + +# With custom repository +./gradlew :sdks:java:container:java11:docker \ + -Pdocker-repository-root=gcr.io/project \ + -Pdocker-tag=custom +``` + +## Dependency Management + +### View Dependencies +```bash +./gradlew :sdks:java:core:dependencies +./gradlew :sdks:java:core:dependencies --configuration runtimeClasspath +``` + +### Force Dependency Version +In `build.gradle`: +```groovy +configurations.all { + resolutionStrategy.force 'com.google.guava:guava:32.0.0-jre' +} +``` + +## Troubleshooting + +### Clean Gradle Cache +```bash +rm -rf ~/.gradle/caches +rm -rf .gradle +rm -rf build +``` + +### Common Errors + +#### NoClassDefFoundError +- Run `./gradlew clean` +- Delete gradle cache + +#### Proto-related Errors +- Regenerate protos: `./gradlew generateProtos` + +#### Dependency Conflicts +- Check dependencies: `./gradlew dependencies` +- Use `--scan` for detailed analysis + +### Useful Tasks + +```bash +# List all tasks +./gradlew tasks + +# List tasks for a project +./gradlew :sdks:java:core:tasks + +# Show project structure +./gradlew projects +``` + +## IDE Integration + +### IntelliJ +1. Open repository root as Gradle project +2. Wait for indexing +3. Gradle tool window shows all tasks + +### VS Code +Install Gradle extension for task discovery diff --git a/.agent/skills/io-connectors/SKILL.md b/.agent/skills/io-connectors/SKILL.md new file mode 100644 index 000000000000..596b602add6c --- /dev/null +++ b/.agent/skills/io-connectors/SKILL.md @@ -0,0 +1,197 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: io-connectors +description: Guides development and usage of I/O connectors in Apache Beam. Use when working with I/O connectors, creating new connectors, or debugging data source/sink issues. +--- + +# I/O Connectors in Apache Beam + +## Overview +I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors. + +## Java I/O Connectors Location +`sdks/java/io/` + +### Available Connectors +| Category | Connectors | +|----------|------------| +| Cloud Storage | google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos | +| Databases | jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch | +| Messaging | kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace | +| File Formats | parquet, csv, json, xml, thrift, iceberg | +| Other | snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika | + +## Testing I/O Connectors + +### Unit Tests +```bash +./gradlew :sdks:java:io:kafka:test +./gradlew :sdks:java:io:jdbc:test +``` + +### Integration Tests + +#### On Direct Runner +```bash +./gradlew :sdks:java:io:google-cloud-platform:integrationTest +``` + +#### With Custom GCP Settings +```bash +./gradlew :sdks:java:io:google-cloud-platform:integrationTest \ + -PgcpProject= \ + -PgcpTempRoot=gs:///path +``` + +#### With Explicit Pipeline Options +```bash +./gradlew :sdks:java:io:jdbc:integrationTest \ + -DbeamTestPipelineOptions='["--runner=TestDirectRunner"]' +``` + +## Integration Test Framework +Located at `it/` directory: +- `it/common/` - Common test utilities +- `it/google-cloud-platform/` - GCP-specific test infrastructure +- `it/jdbc/` - JDBC test infrastructure +- `it/kafka/` - Kafka test infrastructure +- `it/testcontainers/` - Testcontainers support + +## Writing Integration Tests + +### Basic Structure +```java +@RunWith(JUnit4.class) +public class MyIOIT { + @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestPipeline writePipeline = TestPipeline.create(); + + @Test + public void testWriteAndRead() { + // Write data + writePipeline.apply(Create.of(testData)) + .apply(MyIO.write().to(destination)); + writePipeline.run().waitUntilFinish(); + + // Read and verify + PCollection results = readPipeline.apply(MyIO.read().from(destination)); + PAssert.that(results).containsInAnyOrder(expectedData); + readPipeline.run().waitUntilFinish(); + } +} +``` + +### Using TestPipeline +```java +@Rule public TestPipeline pipeline = TestPipeline.create(); +``` + +TestPipeline: +- Blocks on run by default (on TestDataflowRunner) +- Has 15-minute default timeout +- Reads options from `beamTestPipelineOptions` system property + +## GCP I/O Connectors + +### BigQuery +```java +// Read +pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table")); + +// Write +data.apply(BigQueryIO.writeTableRows() + .to("project:dataset.table") + .withSchema(schema) + .withWriteDisposition(WriteDisposition.WRITE_APPEND)); +``` + +### Pub/Sub +```java +// Read +pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic")); + +// Write +data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic")); +``` + +### Cloud Storage (TextIO) +```java +// Read +pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt")); + +// Write +data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt")); +``` + +## Kafka Connector +```java +// Read +pipeline.apply(KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("topic") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class)); + +// Write +data.apply(KafkaIO.write() + .withBootstrapServers("localhost:9092") + .withTopic("topic") + .withKeySerializer(StringSerializer.class) + .withValueSerializer(StringSerializer.class)); +``` + +## JDBC Connector +```java +// Read +pipeline.apply(JdbcIO.read() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration + .create("org.postgresql.Driver", "jdbc:postgresql://host/db")) + .withQuery("SELECT * FROM table")); + +// Write +data.apply(JdbcIO.write() + .withDataSourceConfiguration(config) + .withStatement("INSERT INTO table VALUES (?, ?)")); +``` + +## Python I/O Location +`sdks/python/apache_beam/io/` + +### Common Python I/Os +- `textio` - Text files +- `fileio` - General file operations +- `avroio` - Avro files +- `parquetio` - Parquet files +- `gcp/` - GCP connectors (BigQuery, Pub/Sub, Datastore, etc.) + +## Cross-language I/O +Beam supports using I/O connectors from one SDK in another via the expansion service. + +```bash +# Start Java expansion service +./gradlew :sdks:java:io:expansion-service:runExpansionService +``` + +## Creating New Connectors +See [Developing I/O connectors](https://beam.apache.org/documentation/io/developing-io-overview) + +Key components: +1. **Source** - Reads data (bounded or unbounded) +2. **Sink** - Writes data +3. **Read/Write transforms** - User-facing API diff --git a/.agent/skills/java-development/SKILL.md b/.agent/skills/java-development/SKILL.md new file mode 100644 index 000000000000..f7e89beb895d --- /dev/null +++ b/.agent/skills/java-development/SKILL.md @@ -0,0 +1,153 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: java-development +description: Guides Java SDK development in Apache Beam, including building, testing, running examples, and understanding the project structure. Use when working with Java code in sdks/java/, runners/, or examples/java/. +--- + +# Java Development in Apache Beam + +## Project Structure + +### Key Directories +- `sdks/java/core` - Core Java SDK (PCollection, PTransform, Pipeline) +- `sdks/java/harness` - SDK harness (container entrypoint) +- `sdks/java/io/` - I/O connectors (51+ connectors including BigQuery, Kafka, JDBC, etc.) +- `sdks/java/extensions/` - Extensions (SQL, ML, protobuf, etc.) +- `runners/` - Runner implementations: + - `runners/direct-java` - Direct Runner (local execution) + - `runners/flink/` - Flink Runner + - `runners/spark/` - Spark Runner + - `runners/google-cloud-dataflow-java/` - Dataflow Runner +- `examples/java/` - Java examples including WordCount + +### Build System +Apache Beam uses Gradle with a custom `BeamModulePlugin`. Every Java project's `build.gradle` starts with: +```groovy +apply plugin: 'org.apache.beam.module' +applyJavaNature( ... ) +``` + +## Common Commands + +### Build Commands +```bash +# Compile a specific project +./gradlew -p sdks/java/core compileJava + +# Build a project (compile + tests) +./gradlew :sdks:java:harness:build + +# Run WordCount example +./gradlew :examples:java:wordCount +``` + +### Running Unit Tests +```bash +# Run all tests in a project +./gradlew :sdks:java:harness:test + +# Run a specific test class +./gradlew :sdks:java:harness:test --tests org.apache.beam.fn.harness.CachesTest + +# Run tests matching a pattern +./gradlew :sdks:java:harness:test --tests *CachesTest + +# Run a specific test method +./gradlew :sdks:java:harness:test --tests *CachesTest.testClearableCache +``` + +### Running Integration Tests +Integration tests have filenames ending in `IT.java` and use `TestPipeline`. + +```bash +# Run I/O integration tests on Direct Runner +./gradlew :sdks:java:io:google-cloud-platform:integrationTest + +# Run with custom GCP project +./gradlew :sdks:java:io:google-cloud-platform:integrationTest \ + -PgcpProject= -PgcpTempRoot=gs:///path + +# Run on Dataflow Runner +./gradlew :runners:google-cloud-dataflow-java:examplesJavaRunnerV2IntegrationTest \ + -PdisableSpotlessCheck=true -PdisableCheckStyle=true -PskipCheckerFramework \ + -PgcpProject= -PgcpRegion=us-central1 -PgcsTempRoot=gs:///tmp +``` + +### Code Formatting +```bash +# Format Java code +./gradlew spotlessApply +``` + +## Writing Integration Tests + +```java +@Rule public TestPipeline pipeline = TestPipeline.create(); + +@Test +public void testSomething() { + pipeline.apply(...); + pipeline.run().waitUntilFinish(); +} +``` + +Set pipeline options via `-DbeamTestPipelineOptions='[...]'`: +```bash +-DbeamTestPipelineOptions='["--runner=TestDataflowRunner","--project=myproject","--region=us-central1","--stagingLocation=gs://bucket/path"]' +``` + +## Using Modified Beam Code + +### Publish to Maven Local +```bash +# Publish a specific module +./gradlew -Ppublishing -p sdks/java/io/kafka publishToMavenLocal + +# Publish all modules +./gradlew -Ppublishing publishToMavenLocal +``` + +### Building SDK Container +```bash +# Build Java SDK container (for Runner v2) +./gradlew :sdks:java:container:java11:docker + +# Tag and push +docker tag apache/beam_java11_sdk:2.XX.0.dev \ + "us-docker.pkg.dev/your-project/beam/beam_java11_sdk:custom" +docker push "us-docker.pkg.dev/your-project/beam/beam_java11_sdk:custom" +``` + +### Building Dataflow Worker Jar +```bash +./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar +``` + +## Test Naming Conventions +- Unit tests: `*Test.java` +- Integration tests: `*IT.java` + +## JUnit Report Location +After running tests, find HTML reports at: +`/build/reports/tests/test/index.html` + +## IDE Setup (IntelliJ) +1. Open `/beam` (the repository root, NOT `sdks/java`) +2. Wait for indexing to complete +3. Find `examples/java/build.gradle` and click Run next to wordCount task to verify setup diff --git a/.agent/skills/license-compliance/SKILL.md b/.agent/skills/license-compliance/SKILL.md new file mode 100644 index 000000000000..d2fc50e541f3 --- /dev/null +++ b/.agent/skills/license-compliance/SKILL.md @@ -0,0 +1,199 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: license-compliance +description: Ensures all new files include proper Apache 2.0 license headers. Use when creating any new file in the Apache Beam repository. +--- + +# License Compliance in Apache Beam + +## Overview +Every source file in Apache Beam **MUST** include the Apache 2.0 license header. The RAT (Release Audit Tool) check will fail if any file is missing the required license. + +## License Headers by File Type + +### Java, Groovy, Kotlin, Scala +```java +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +``` + +### Python +```python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +``` + +### Go +```go +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +``` + +### Markdown (.md) +```markdown + +``` + +### YAML (.yml, .yaml) and YAML Frontmatter +```yaml +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +``` + +### Shell Scripts (.sh, .bash) +```bash +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +``` + +### XML, HTML +```xml + +``` + +## RAT Check + +### Running Locally +```bash +./gradlew rat +``` + +### Checking Results +If the RAT check fails, view the report: +``` +build/reports/rat/index.html +``` + +## Key Rules + +1. **Every new file needs a license header** - No exceptions for source files +2. **Place header at the very top** - Before any code, imports, or declarations +3. **Use correct comment style** - Match the file type's comment syntax +4. **YAML frontmatter exception** - For files with YAML frontmatter (like SKILL.md), place the license as YAML comments inside the frontmatter block, after the opening `---` +5. **Dependencies must be Apache-compatible** - New dependencies need licenses compatible with Apache 2.0 + +## Common Mistakes + +- Forgetting to add headers to new test files +- Missing headers on configuration files (.yaml, .json, .xml) +- Adding HTML comments before YAML frontmatter (breaks parsing) diff --git a/.agent/skills/python-development/SKILL.md b/.agent/skills/python-development/SKILL.md new file mode 100644 index 000000000000..f8c6c030e9a8 --- /dev/null +++ b/.agent/skills/python-development/SKILL.md @@ -0,0 +1,183 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: python-development +description: Guides Python SDK development in Apache Beam, including environment setup, testing, building, and running pipelines. Use when working with Python code in sdks/python/. +--- + +# Python Development in Apache Beam + +## Project Structure + +### Key Directories +- `sdks/python/` - Python SDK root + - `apache_beam/` - Main Beam package + - `transforms/` - Core transforms (ParDo, GroupByKey, etc.) + - `io/` - I/O connectors + - `ml/` - Beam ML code (RunInference, etc.) + - `runners/` - Runner implementations and wrappers + - `runners/worker/` - SDK worker harness + - `container/` - Docker container configuration + - `test-suites/` - Test configurations + - `scripts/` - Utility scripts + +### Configuration Files +- `setup.py` - Package configuration +- `pyproject.toml` - Build configuration +- `tox.ini` - Test automation +- `pytest.ini` - Pytest configuration +- `.pylintrc` - Linting rules +- `.isort.cfg` - Import sorting +- `mypy.ini` - Type checking + +## Environment Setup + +### Using pyenv (Recommended) +```bash +# Install Python +pyenv install 3.X # Use supported version from gradle.properties + +# Create virtual environment +pyenv virtualenv 3.X beam-dev +pyenv activate beam-dev +``` + +### Install in Editable Mode +```bash +cd sdks/python +pip install -e .[gcp,test] +``` + +### Enable Pre-commit Hooks +```bash +pip install pre-commit +pre-commit install + +# To disable +pre-commit uninstall +``` + +## Running Tests + +### Unit Tests (filename: `*_test.py`) +```bash +# Run all tests in a file +pytest -v apache_beam/io/textio_test.py + +# Run tests in a class +pytest -v apache_beam/io/textio_test.py::TextSourceTest + +# Run a specific test +pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress +``` + +### Integration Tests (filename: `*_it_test.py`) + +#### On Direct Runner +```bash +python -m pytest -o log_cli=True -o log_level=Info \ + apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \ + --test-pipeline-options='--runner=TestDirectRunner' +``` + +#### On Dataflow Runner +```bash +# First build SDK tarball +pip install build && python -m build --sdist + +# Run integration test +python -m pytest -o log_cli=True -o log_level=Info \ + apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \ + --test-pipeline-options='--runner=TestDataflowRunner --project= + --temp_location=gs:///tmp + --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz + --region=us-central1' +``` + +## Building Python SDK + +### Build Source Distribution +```bash +cd sdks/python +pip install build && python -m build --sdist +# Output: sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz +``` + +### Build Wheel (faster installation) +```bash +./gradlew :sdks:python:bdistPy311linux # For Python 3.11 on Linux +``` + +### Build SDK Container +```bash +./gradlew :sdks:python:container:py39:docker \ + -Pdocker-repository-root=gcr.io/your-project -Pdocker-tag=custom +``` + +## Running Pipelines with Modified Code + +```bash +# Install modified SDK +pip install /path/to/apache-beam.tar.gz[gcp] + +# Run pipeline +python my_pipeline.py \ + --runner=DataflowRunner \ + --sdk_location=/path/to/apache-beam.tar.gz \ + --project=my_project \ + --region=us-central1 \ + --temp_location=gs://my-bucket/temp +``` + +## Common Issues + +### `NameError` when running DoFn +Global imports, functions, and variables in the main pipeline module are not serialized by default. Use: +```bash +--save_main_session +``` + +### Specifying Additional Dependencies +Use `--requirements_file=requirements.txt` or custom containers. + +## Test Markers +- `@pytest.mark.it_postcommit` - Include in PostCommit test suite + +## Gradle Commands for Python +```bash +# Run WordCount +./gradlew :sdks:python:wordCount + +# Check environment +./gradlew :checkSetup +``` + +## Code Quality Tools +```bash +# Linting +pylint apache_beam/ + +# Type checking +mypy apache_beam/ + +# Formatting (via yapf) +yapf -i apache_beam/file.py + +# Import sorting +isort apache_beam/file.py +``` diff --git a/.agent/skills/runners/SKILL.md b/.agent/skills/runners/SKILL.md new file mode 100644 index 000000000000..f92943ab097c --- /dev/null +++ b/.agent/skills/runners/SKILL.md @@ -0,0 +1,244 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: runners +description: Guides understanding and working with Apache Beam runners (Direct, Dataflow, Flink, Spark, etc.). Use when configuring pipelines for different execution environments or debugging runner-specific issues. +--- + +# Apache Beam Runners + +## Overview +Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine. + +## Available Runners + +| Runner | Location | Description | +|--------|----------|-------------| +| Direct | `runners/direct-java/` | Local execution for testing | +| Prism | `runners/prism/` | Portable local runner | +| Dataflow | `runners/google-cloud-dataflow-java/` | Google Cloud Dataflow | +| Flink | `runners/flink/` | Apache Flink | +| Spark | `runners/spark/` | Apache Spark | +| Samza | `runners/samza/` | Apache Samza | +| Jet | `runners/jet/` | Hazelcast Jet | +| Twister2 | `runners/twister2/` | Twister2 | + +## Direct Runner +For local development and testing. + +### Java +```java +PipelineOptions options = PipelineOptionsFactory.create(); +options.setRunner(DirectRunner.class); +Pipeline p = Pipeline.create(options); +``` + +### Python +```python +options = PipelineOptions() +options.view_as(StandardOptions).runner = 'DirectRunner' +p = beam.Pipeline(options=options) +``` + +### Command Line +```bash +--runner=DirectRunner +``` + +## Dataflow Runner + +### Prerequisites +- GCP project with Dataflow API enabled +- Service account with Dataflow Admin role +- GCS bucket for staging + +### Java Usage +```java +DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); +options.setRunner(DataflowRunner.class); +options.setProject("my-project"); +options.setRegion("us-central1"); +options.setTempLocation("gs://my-bucket/temp"); +``` + +### Python Usage +```python +options = PipelineOptions([ + '--runner=DataflowRunner', + '--project=my-project', + '--region=us-central1', + '--temp_location=gs://my-bucket/temp' +]) +``` + +### Runner v2 +```bash +--experiments=use_runner_v2 +``` + +### Custom SDK Container +```bash +--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom +``` + +## Flink Runner + +### Embedded Mode +```java +FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); +options.setRunner(FlinkRunner.class); +options.setFlinkMaster("[local]"); +``` + +### Cluster Mode +```java +options.setFlinkMaster("host:port"); +``` + +### Portable Mode (Python) +```python +options = PipelineOptions([ + '--runner=FlinkRunner', + '--flink_master=host:port', + '--environment_type=LOOPBACK' # or DOCKER, EXTERNAL +]) +``` + +## Spark Runner + +### Java +```java +SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); +options.setRunner(SparkRunner.class); +options.setSparkMaster("local[*]"); # or spark://host:port +``` + +### Python (Portable) +```python +options = PipelineOptions([ + '--runner=SparkRunner', + '--spark_master_url=local[*]' +]) +``` + +## Testing with Runners + +### ValidatesRunner Tests +Tests that validate runner correctness: +```bash +# Direct Runner +./gradlew :runners:direct-java:validatesRunner + +# Flink Runner +./gradlew :runners:flink:1.18:validatesRunner + +# Spark Runner +./gradlew :runners:spark:3:validatesRunner + +# Dataflow Runner +./gradlew :runners:google-cloud-dataflow-java:validatesRunner +``` + +### TestPipeline with Runners +```java +@Rule public TestPipeline pipeline = TestPipeline.create(); + +// Set runner via system property +-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]' +``` + +## Portable Runners + +### Concept +- SDK-independent execution via Fn API +- SDK runs in container, communicates via gRPC + +### Environment Types +- `DOCKER` - SDK in Docker container +- `LOOPBACK` - SDK in same process (testing) +- `EXTERNAL` - SDK at specified address +- `PROCESS` - SDK in subprocess + +### Job Server +Start Flink job server: +```bash +./gradlew :runners:flink:1.18:job-server:runShadow +``` + +Start Spark job server: +```bash +./gradlew :runners:spark:3:job-server:runShadow +``` + +## Runner-Specific Options + +### Dataflow +| Option | Description | +|--------|-------------| +| `--project` | GCP project | +| `--region` | GCP region | +| `--tempLocation` | GCS temp location | +| `--stagingLocation` | GCS staging | +| `--numWorkers` | Initial workers | +| `--maxNumWorkers` | Max workers | +| `--workerMachineType` | VM type | + +### Flink +| Option | Description | +|--------|-------------| +| `--flinkMaster` | Flink master address | +| `--parallelism` | Default parallelism | +| `--checkpointingInterval` | Checkpoint interval | + +### Spark +| Option | Description | +|--------|-------------| +| `--sparkMaster` | Spark master URL | +| `--sparkConf` | Additional Spark config | + +## Building Runner Artifacts + +### Dataflow Worker Jar +```bash +./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar +``` + +### Flink Job Server +```bash +./gradlew :runners:flink:1.18:job-server:shadowJar +``` + +### Spark Job Server +```bash +./gradlew :runners:spark:3:job-server:shadowJar +``` + +## Debugging + +### Direct Runner +- Enable logging: `-Dorg.slf4j.simpleLogger.defaultLogLevel=debug` +- Use `--targetParallelism=1` for deterministic execution + +### Dataflow +- Check Dataflow UI: console.cloud.google.com/dataflow +- Use `--experiments=upload_graph` for graph debugging +- Worker logs in Cloud Logging + +### Portable Runners +- Enable debug logging on job server +- Check SDK harness logs in worker containers diff --git a/build.gradle.kts b/build.gradle.kts index 3ae49afa3908..2465d581228a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -218,7 +218,10 @@ tasks.rat { "learning/prompts/**/*.md", // Ignore terraform lock files - "**/.terraform.lock.hcl" + "**/.terraform.lock.hcl", + + // Ignore pytest cache files + "**/.pytest_cache/**" ) // Add .gitignore excludes to the Apache Rat exclusion list. We re-create the behavior