Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6078a49
add debug endpoint; add test configs
hai719 Jul 29, 2025
4b1eb9b
add shard manager
hai719 Jul 31, 2025
de6633b
proxy routing
hai719 Aug 7, 2025
d797c52
route only in target proxy; remap taskID and track ack in proxy.
hai719 Sep 3, 2025
1b20ad0
update
hai719 Sep 4, 2025
fbc4af4
retry when shard not available
hai719 Sep 5, 2025
9f5cd62
add tags to log
hai719 Sep 5, 2025
b51520a
add intra proxy streams
hai719 Sep 23, 2025
85c6a97
fix incorrect stream pair
hai719 Sep 26, 2025
04046e5
add log for debugging
hai719 Sep 26, 2025
796bef6
fix issues
hai719 Oct 13, 2025
6e98448
fix streams
hai719 Oct 13, 2025
ccdb29d
add ring_max_size to debug
hai719 Oct 13, 2025
939d6b2
fix panic; fix memberlist join issue
hai719 Oct 16, 2025
4ed7e50
fix panic
hai719 Oct 17, 2025
5fb2b88
update regarding cluster_connection
hai719 Dec 8, 2025
9a9535a
Merge branch 'main' into hai719/routing
hai719 Dec 10, 2025
4c716c9
fix for cluster_conn; fix routing
hai719 Dec 16, 2025
cf59b2d
fix test error: handle case when no replication task after shard regi…
hai719 Dec 17, 2025
4813e58
Refactor: move channel management to ShardManager
hai719 Dec 17, 2025
d2d52f2
Merge branch 'main' into hai719/routing
hai719 Dec 17, 2025
286f51f
update helm
hai719 Dec 17, 2025
0b7b2d4
remove clusterConnection from adminServiceProxyServer
hai719 Dec 17, 2025
34e07db
fix unit test
hai719 Dec 17, 2025
2f26e6f
fix test error
hai719 Dec 17, 2025
970b6ad
fix intra proxy streams; add connection debug info.
hai719 Dec 20, 2025
bed6b1d
add tcp_proxy for test. add intra_proxy test file.
hai719 Dec 20, 2025
6822113
update tests
hai719 Dec 21, 2025
949ec25
handle late-registered remote shard
hai719 Dec 23, 2025
e1dc330
use make bins to avoid testcore related error
hai719 Dec 23, 2025
6bd8449
fix test error; refactor test
hai719 Dec 24, 2025
40b222c
fix test error
hai719 Dec 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ jobs:
cache: ${{ github.ref == 'refs/heads/main' }} # only update the cache in main.

- name: Run go build
run: go build ./...
run: make bins

- name: Run go unittest
run: make test

- name: Install helm
uses: azure/setup-helm@v4.3.0
with:
version: v3.17.3
version: v3.19.4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for? do the change in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix the helm error that I didn't see in main branch:

Error: plugin is installed but unusable: failed to load plugin at "/home/runner/.local/share/helm/plugins/helm-unittest.git/plugin.yaml": error unmarshaling JSON: while decoding JSON: json: unknown field "platformHooks"
Error: Process completed with exit code 1.```


- name: Install helm-unittest
run: helm plugin install https://github.com/helm-unittest/helm-unittest.git
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ GO_GET_TOOL = go get -tool -modfile=$(TOOLS_MOD_FILE)

# Disable cgo by default.
CGO_ENABLED ?= 0
TEST_ARG ?= -race -timeout=5m -tags test_dep
TEST_ARG ?= -race -timeout=15m -tags test_dep -count=1
BENCH_ARG ?= -benchtime=5000x

ALL_SRC := $(shell find . -name "*.go")
Expand Down
9 changes: 7 additions & 2 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ func buildCLIOptions() *cli.App {
return app
}

func startPProfHTTPServer(logger log.Logger, c config.ProfilingConfig) {
func startPProfHTTPServer(logger log.Logger, c config.ProfilingConfig, proxyInstance *proxy.Proxy) {
addr := c.PProfHTTPAddress
if len(addr) == 0 {
return
}

// Add debug endpoint handler
http.HandleFunc("/debug/connections", func(w http.ResponseWriter, r *http.Request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice stuff but prefer having a separate PR for this

proxy.HandleDebugInfo(w, r, proxyInstance, logger)
})

go func() {
logger.Info("Start pprof http server", tag.NewStringTag("address", addr))
if err := http.ListenAndServe(addr, nil); err != nil {
Expand Down Expand Up @@ -101,7 +106,7 @@ func startProxy(c *cli.Context) error {
}

cfg := proxyParams.ConfigProvider.GetS2SProxyConfig()
startPProfHTTPServer(proxyParams.Logger, cfg.ProfilingConfig)
startPProfHTTPServer(proxyParams.Logger, cfg.ProfilingConfig, proxyParams.Proxy)

if err := proxyParams.Proxy.Start(); err != nil {
return err
Expand Down
37 changes: 37 additions & 0 deletions common/intra_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package common

import (
"context"

"google.golang.org/grpc/metadata"
)

const (
// Intra-proxy identification and tracing headers
IntraProxyHeaderKey = "x-s2s-intra-proxy"
IntraProxyHeaderValue = "1"
IntraProxyOriginProxyIDHeader = "x-s2s-origin-proxy-id"
IntraProxyHopCountHeader = "x-s2s-hop-count"
IntraProxyTraceIDHeader = "x-s2s-trace-id"
)

// IsIntraProxy checks incoming context metadata for intra-proxy marker.
func IsIntraProxy(ctx context.Context) bool {
if md, ok := metadata.FromIncomingContext(ctx); ok {
if vals := md.Get(IntraProxyHeaderKey); len(vals) > 0 && vals[0] == IntraProxyHeaderValue {
return true
}
}
return false
}

// WithIntraProxyHeaders returns a new outgoing context with intra-proxy headers set.
func WithIntraProxyHeaders(ctx context.Context, headers map[string]string) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
md.Set(IntraProxyHeaderKey, IntraProxyHeaderValue)
for k, v := range headers {
md.Set(k, v)
}
return metadata.NewOutgoingContext(ctx, md)
}
1 change: 1 addition & 0 deletions config/cluster_conn_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type (
OutboundHealthCheck HealthCheckConfig `yaml:"outboundHealthCheck"`
InboundHealthCheck HealthCheckConfig `yaml:"inboundHealthCheck"`
ShardCountConfig ShardCountConfig `yaml:"shardCount"`
MemberlistConfig *MemberlistConfig `yaml:"memberlist"`
}
StringTranslator struct {
Mappings []StringMapping `yaml:"mappings"`
Expand Down
32 changes: 31 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ShardCountMode string
const (
ShardCountDefault ShardCountMode = ""
ShardCountLCM ShardCountMode = "lcm"
ShardCountRouting ShardCountMode = "routing"
)

type HealthCheckProtocol string
Expand Down Expand Up @@ -153,7 +154,11 @@ type (
// TODO: Soon to be deprecated! Create an item in ClusterConnections instead
HealthCheck *HealthCheckConfig `yaml:"healthCheck"`
// TODO: Soon to be deprecated! Create an item in ClusterConnections instead
OutboundHealthCheck *HealthCheckConfig `yaml:"outboundHealthCheck"`
OutboundHealthCheck *HealthCheckConfig `yaml:"outboundHealthCheck"`
// TODO: Soon to be deprecated! Create an item in ClusterConnections instead
ShardCountConfig ShardCountConfig `yaml:"shardCount"`
// TODO: Soon to be deprecated! Create an item in ClusterConnections instead
MemberlistConfig *MemberlistConfig `yaml:"memberlist"`
NamespaceNameTranslation NameTranslationConfig `yaml:"namespaceNameTranslation"`
SearchAttributeTranslation SATranslationConfig `yaml:"searchAttributeTranslation"`
Metrics *MetricsConfig `yaml:"metrics"`
Expand Down Expand Up @@ -217,6 +222,31 @@ type (
LoggingConfig struct {
ThrottleMaxRPS float64 `yaml:"throttleMaxRPS"`
}

MemberlistConfig struct {
// Enable distributed shard management using memberlist
Enabled bool `yaml:"enabled"`
// Node name for this proxy instance in the cluster
NodeName string `yaml:"nodeName"`
// Bind address for memberlist cluster communication
BindAddr string `yaml:"bindAddr"`
// Bind port for memberlist cluster communication
BindPort int `yaml:"bindPort"`
// List of existing cluster members to join
JoinAddrs []string `yaml:"joinAddrs"`
// Shard assignment strategy (deprecated - now uses actual ownership tracking)
ShardStrategy string `yaml:"shardStrategy"`
// Map of node names to their proxy service addresses for forwarding
ProxyAddresses map[string]string `yaml:"proxyAddresses"`
// Use TCP-only transport (disables UDP) for restricted networks
TCPOnly bool `yaml:"tcpOnly"`
// Disable TCP pings when using TCP-only mode
DisableTCPPings bool `yaml:"disableTCPPings"`
// Probe timeout for memberlist health checks
ProbeTimeoutMs int `yaml:"probeTimeoutMs"`
// Probe interval for memberlist health checks
ProbeIntervalMs int `yaml:"probeIntervalMs"`
}
)

func FromServerTLSConfig(cfg ServerTLSConfig) encryption.TLSConfig {
Expand Down
2 changes: 2 additions & 0 deletions config/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func ToClusterConnConfig(config S2SProxyConfig) S2SProxyConfig {
SearchAttributeTranslation: config.SearchAttributeTranslation,
OutboundHealthCheck: flattenNilHealthCheck(config.OutboundHealthCheck),
InboundHealthCheck: flattenNilHealthCheck(config.HealthCheck),
ShardCountConfig: config.ShardCountConfig,
MemberlistConfig: config.MemberlistConfig,
},
},
Metrics: config.Metrics,
Expand Down
23 changes: 23 additions & 0 deletions develop/config/cluster-a-mux-client-proxy-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
inbound:
name: "a-inbound-server"
server:
type: "mux"
mux: "muxed"
client:
tcp:
serverAddress: "localhost:7233"
outbound:
name: "a-outbound-server"
server:
tcp:
listenAddress: "0.0.0.0:6133"
client:
type: "mux"
mux: "muxed"
mux:
- name: "muxed"
mode: "client"
client:
serverAddress: "localhost:7003"
profiling:
pprofAddress: "localhost:6060"
23 changes: 23 additions & 0 deletions develop/config/cluster-a-mux-client-proxy-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
inbound:
name: "a-inbound-server"
server:
type: "mux"
mux: "muxed"
client:
tcp:
serverAddress: "localhost:7233"
outbound:
name: "a-outbound-server"
server:
tcp:
listenAddress: "0.0.0.0:6233"
client:
type: "mux"
mux: "muxed"
mux:
- name: "muxed"
mode: "client"
client:
serverAddress: "localhost:7003"
profiling:
pprofAddress: "localhost:6061"
46 changes: 46 additions & 0 deletions develop/config/cluster-b-mux-server-proxy-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
inbound:
name: "b-inbound-server"
server:
type: "mux"
mux: "muxed"
client:
tcp:
serverAddress: "localhost:8233"
outbound:
name: "b-outbound-server"
server:
tcp:
listenAddress: "0.0.0.0:6333"
client:
type: "mux"
mux: "muxed"
mux:
- name: "muxed"
mode: "server"
server:
listenAddress: "0.0.0.0:6334"
# shardCount:
# mode: "lcm"
# localShardCount: 3
# remoteShardCount: 2
Comment on lines +22 to +25
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove comment code? we can create separate config if needed.

shardCount:
mode: "routing"
localShardCount: 3
remoteShardCount: 2
profiling:
pprofAddress: "localhost:6070"
memberlist:
enabled: true
nodeName: "proxy-node-b-1"
bindAddr: "127.0.0.1"
bindPort: 6335
# joinAddrs:
# - "localhost:6435"
proxyAddresses:
"proxy-node-b-1": "localhost:6333"
"proxy-node-b-2": "localhost:6433"
# # TCP-only configuration for restricted networks
tcpOnly: true # Use TCP transport only, disable UDP
# disableTCPPings: true # Disable TCP pings for faster convergence
# probeTimeoutMs: 1000 # Longer timeout for network latency
# probeIntervalMs: 2000 # Less frequent probes to reduce network noise
46 changes: 46 additions & 0 deletions develop/config/cluster-b-mux-server-proxy-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
inbound:
name: "b-inbound-server"
server:
type: "mux"
mux: "muxed"
client:
tcp:
serverAddress: "localhost:8233"
outbound:
name: "b-outbound-server"
server:
tcp:
listenAddress: "0.0.0.0:6433"
client:
type: "mux"
mux: "muxed"
mux:
- name: "muxed"
mode: "server"
server:
listenAddress: "0.0.0.0:6434"
# shardCount:
# mode: "lcm"
# localShardCount: 3
# remoteShardCount: 2
shardCount:
mode: "routing"
localShardCount: 3
remoteShardCount: 2
profiling:
pprofAddress: "localhost:6071"
memberlist:
enabled: true
nodeName: "proxy-node-b-2"
bindAddr: "127.0.0.1"
bindPort: 6435
joinAddrs:
- "localhost:6335"
proxyAddresses:
"proxy-node-b-1": "localhost:6333"
"proxy-node-b-2": "localhost:6433"
# # TCP-only configuration for restricted networks
# tcpOnly: true # Use TCP transport only, disable UDP
# disableTCPPings: true # Disable TCP pings for faster convergence
# probeTimeoutMs: 1000 # Longer timeout for network latency
# probeIntervalMs: 2000 # Less frequent probes to reduce network noise
8 changes: 6 additions & 2 deletions develop/config/dynamic-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ history.ReplicationEnableUpdateWithNewTaskMerge:
history.enableWorkflowExecutionTimeoutTimer:
- value: true
history.EnableReplicationTaskTieredProcessing:
- value: true
- value: false
history.persistenceMaxQPS:
- value: 100000
constraints: {}
frontend.persistenceMaxQPS:
- value: 100000
constraints: {}
constraints: {}
history.shardUpdateMinInterval:
- value: 1s
history.ReplicationStreamSendEmptyTaskDuration:
Comment on lines +30 to +32
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these config for?

- value: 10s
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gogo/status v1.1.1
github.com/golang/mock v1.7.0-rc.1
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0
github.com/hashicorp/memberlist v0.5.1
github.com/hashicorp/yamux v0.1.2
github.com/keilerkonzept/visit v1.1.1
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -40,6 +41,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/aws/aws-sdk-go v1.55.6 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand All @@ -66,6 +68,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand All @@ -76,6 +79,12 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/go-multierror v1.0.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand All @@ -89,6 +98,7 @@ require (
github.com/lib/pq v1.10.9 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
Expand All @@ -106,6 +116,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
Expand Down
Loading