Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 34 additions & 83 deletions build-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,101 +3,52 @@ set -o nounset
set -o pipefail
set -o xtrace

### Versioning and image tagging ###
#
# Three build scenarios:
# 1. CI release build: triggered by git tag
# - Stable (vX.Y.Z): tagged with major, minor, patch, and "latest"
# - Pre-release (vX.Y.Z-suffix): tagged only with exact version
# 2. CI branch build: version + CI job ID, tagged with branch name (+ "latest" if main)
# 3. Local build: version from git describe, tagged with that version

# Get version info from git (used by branch and local builds)
# --tags: use any tag, not just annotated ones
# --match='v[0-9]*': only version tags (starts with v and a digit)
# --always: fall back to commit ID if no tag found
# e.g., v2.1.1-45-ga1b2c3d means commit a1b2c3d, 45 commits ahead of tag v2.1.1
VERSION_FROM_GIT=$(git describe --tags --match='v[0-9]*' --always)

if [[ -n "${CIRCLE_TAG:-}" ]]; then
# Release build (triggered by git tag)
VERSION=${CIRCLE_TAG#"v"}

if [[ "${CIRCLE_TAG}" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# Stable release: tag with major, minor, patch, and latest
# e.g., v2.1.1 -> "2", "2.1", "2.1.1", "latest"
MAJOR_VERSION=${VERSION%%.*}
MINOR_VERSION=${VERSION%.*}
TAGS="$MAJOR_VERSION,$MINOR_VERSION,$VERSION,latest"
else
# Pre-release: only the exact version tag
# e.g., v3.0.0-rc1 -> "3.0.0-rc1"
TAGS="$VERSION"
fi

elif [[ -n "${CIRCLE_BRANCH:-}" ]]; then
# CI branch build
# Version from git describe + CI job ID
# e.g., 2.1.1-45-ga1b2c3d-ci8675309
VERSION="${VERSION_FROM_GIT#'v'}-ci${CIRCLE_BUILD_NUM}"
BRANCH_TAG=${CIRCLE_BRANCH//\//-}
TAGS="${VERSION},branch-${BRANCH_TAG}"

# Main branch builds are tagged "latest" in the private registry
if [[ "${CIRCLE_BRANCH}" == "main" ]]; then
TAGS+=",latest"
fi

else
# Local build
# Version from git describe only
# e.g., 2.1.1-45-ga1b2c3d
VERSION=${VERSION_FROM_GIT#'v'}
TAGS="${VERSION}"
fi

GIT_COMMIT=${CIRCLE_SHA1:-$(git rev-parse HEAD)}
GCLOUD_REGISTRY="gcr.io/sre-team-418623"

# Parse flags
PUSH=false

while [[ $# -gt 0 ]]; do
case "$1" in
--push)
PUSH=true
shift
;;
*)
echo "Usage: $0 [--push]"
echo " --push Build and push to ${GCLOUD_REGISTRY}/refinery"
echo " (default) Build locally only"
exit 1
;;
esac
done

VERSION=$(git describe --tags --match='v[0-9]*' --always)
VERSION=${VERSION#v}
GIT_COMMIT=$(git rev-parse HEAD)

unset GOOS
unset GOARCH
export GOFLAGS="-ldflags=-X=main.BuildID=$VERSION"
export SOURCE_DATE_EPOCH=${SOURCE_DATE_EPOCH:-$(make latest_modification_time)}

# Build the image once, either to a remote registry designated by PRIMARY_DOCKER_REPO
# or to the local repository as "ko.local/refinery:<tags>" if PRIMARY_DOCKER_REPO is not set.
export KO_DOCKER_REPO="${PRIMARY_DOCKER_REPO:-ko.local}"
# Force IPv4 to avoid IPv6 connectivity issues when pulling base image layers
export GODEBUG=preferIPv4=1

if [[ "$PUSH" == "true" ]]; then
export KO_DOCKER_REPO="$GCLOUD_REGISTRY"
else
export KO_DOCKER_REPO="ko.local"
fi

echo "Building image locally with ko for multi-registry push..."
# shellcheck disable=SC2086
IMAGE_REF=$(./ko publish \
--tags "${TAGS}" \
IMAGE_REF=$(ko publish \
--tags "${VERSION}" \
--base-import-paths \
--platform "linux/amd64,linux/arm64" \
--image-label org.opencontainers.image.source=https://github.com/honeycombio/refinery \
--image-label org.opencontainers.image.source=https://github.com/khan/refinery \
--image-label org.opencontainers.image.licenses=Apache-2.0 \
--image-label org.opencontainers.image.revision=${GIT_COMMIT} \
./cmd/refinery)

echo "Built image: ${IMAGE_REF}"

# If COPY_DOCKER_REPOS is set, copy the built image to each of the listed registries.
# This is a comma-separated list of registry/repo names, e.g.
# "public.ecr.aws/honeycombio,ghcr.io/honeycombio/refinery"
if [[ -n "${COPY_DOCKER_REPOS:-}" ]]; then
echo "Pushing to multiple registries: ${COPY_DOCKER_REPOS}"

IFS=',' read -ra REPOS <<< "$COPY_DOCKER_REPOS"
for REPO in "${REPOS[@]}"; do
REPO=$(echo "$REPO" | xargs) # trim whitespace
echo "Tagging and pushing to: $REPO"

# Tag for each tag in the TAGS list
IFS=',' read -ra TAG_LIST <<< "$TAGS"
for TAG in "${TAG_LIST[@]}"; do
TAG=$(echo "$TAG" | xargs) # trim whitespace
TARGET_IMAGE="$REPO/refinery:$TAG"
echo "Copying $IMAGE_REF to $TARGET_IMAGE"
./crane copy "$IMAGE_REF" "$TARGET_IMAGE"
done
done
fi
88 changes: 87 additions & 1 deletion collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ type InMemCollector struct {

hostname string

memMetricSample []rtmetrics.Sample // Memory monitoring using runtime/metrics
memMetricSample []rtmetrics.Sample // Memory monitoring using runtime/metrics
spanCounters []config.SpanCounter
}

// These are the names of the metrics we use to track the number of events sent to peers through the router.
Expand Down Expand Up @@ -171,6 +172,7 @@ func (i *InMemCollector) Start() error {
i.Logger.Info().WithField("num_workers", numWorkers).Logf("Starting InMemCollector with %d workers", numWorkers)

i.StressRelief.UpdateFromConfig()
i.initSpanCounters()
// Set queue capacity metrics for stress relief calculations
i.Metrics.Store(DENOMINATOR_INCOMING_CAP, float64(imcConfig.IncomingQueueSize))
i.Metrics.Store(DENOMINATOR_PEER_CAP, float64(imcConfig.PeerQueueSize))
Expand Down Expand Up @@ -240,6 +242,7 @@ func (i *InMemCollector) reloadConfigs() {
i.SamplerFactory.ClearDynsamplers()

i.StressRelief.UpdateFromConfig()
i.initSpanCounters()

// Send reload signals to all workers to clear their local samplers
// so that the new configuration will be propagated
Expand Down Expand Up @@ -691,13 +694,89 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) {
}
}

// initSpanCounters loads and initializes span counters from the current config.
// Must be called at startup and on config reload.
func (i *InMemCollector) initSpanCounters() {
counters := i.Config.GetSpanCounters()
for j := range counters {
if err := counters[j].Init(); err != nil {
i.Logger.Error().WithField("error", err).Logf("failed to initialize span counter %q", counters[j].Key)
}
}
i.mutex.Lock()
i.spanCounters = counters
i.mutex.Unlock()
}

// findSuitableRootSpan returns the root span of the trace if one is present.
// If no root span has been identified, it falls back to the non-annotation
// span (i.e. not a span event or link) with the earliest timestamp, which is
// the most likely root. Returns nil if no suitable span exists.
func findSuitableRootSpan(t sendableTrace) *types.Span {
if t.RootSpan != nil {
return t.RootSpan
}
var best *types.Span
for _, sp := range t.GetSpans() {
if sp.AnnotationType() != types.SpanAnnotationTypeSpanEvent &&
sp.AnnotationType() != types.SpanAnnotationTypeLink {
if best == nil || sp.Timestamp.Before(best.Timestamp) {
best = sp
}
}
}
return best
}

// computeCustomCounts computes each counter's value by iterating all spans in the trace
// and attaches the results to the root span.
// Returns nil, nil if there are no counters configured or no suitable target span.
//
// Stress relief note: this runs inside sendTraces(), the sole consumer of the
// tracesToSend channel. Work is O(N×M) — N spans × M counters — so large
// traces with many counters slow the consumer, which deepens the outgoing
// queue. The stress relief system monitors queue depth as one of its stress
// inputs, so heavy custom-count configurations can raise the measured stress
// level and trigger earlier activation of stress relief. Additionally, spans
// processed via ProcessSpanImmediately (the stress-relief fast path) bypass the
// trace buffer entirely and never reach sendTraces, so custom counts are not
// computed or attached to stress-sampled traces.
func (i *InMemCollector) computeCustomCounts(t sendableTrace) (*types.Span, map[string]int64) {
i.mutex.RLock()
counters := i.spanCounters
i.mutex.RUnlock()

if len(counters) == 0 {
return nil, nil
}

targetSpan := findSuitableRootSpan(t)
if targetSpan == nil {
return nil, nil
}

var rootData config.SpanData = &targetSpan.Data
counts := make(map[string]int64, len(counters))
for _, sp := range t.GetSpans() {
for _, counter := range counters {
if counter.MatchesSpan(&sp.Data, rootData) {
counts[counter.Key]++
}
}
}

return targetSpan, counts
}

func (i *InMemCollector) sendTraces() {
defer i.sendTracesWG.Done()

for t := range i.tracesToSend {
i.Metrics.Histogram("collector_outgoing_queue", float64(len(i.tracesToSend)))
_, span := otelutil.StartSpanMulti(context.Background(), i.Tracer, "sendTrace", map[string]interface{}{"num_spans": t.DescendantCount(), "tracesToSend_size": len(i.tracesToSend)})

customCountTarget, customCounts := i.computeCustomCounts(t)

for _, sp := range t.GetSpans() {

if i.Config.GetAddRuleReasonToTrace() {
Expand All @@ -721,6 +800,13 @@ func (i *InMemCollector) sendTraces() {
}
}

// set custom span counts on the target span (root if present, else best fallback)
if sp == customCountTarget {
for k, v := range customCounts {
sp.Data.Set(k, v)
}
}

isDryRun := i.Config.GetIsDryRun()
if isDryRun {
sp.Data.Set(config.DryRunFieldName, t.shouldSend)
Expand Down
Loading