diff --git a/internal/admin/keyviz_fanout.go b/internal/admin/keyviz_fanout.go new file mode 100644 index 00000000..def7c8bd --- /dev/null +++ b/internal/admin/keyviz_fanout.go @@ -0,0 +1,566 @@ +package admin + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + "sync" + "time" + + pkgerrors "github.com/cockroachdb/errors" + "github.com/goccy/go-json" +) + +// errKeyVizPeer is the sentinel wrapped by every fan-out failure so +// callers can errors.Is() against it without parsing strings. Each +// concrete failure adds its own %w-wrapped detail. +var errKeyVizPeer = errors.New("keyviz fan-out peer error") + +// keyVizFanoutDefaultTimeout matches the design 9 open-question 2 +// proposed default: 2 seconds per peer call. Operators on weird +// networks can override via WithTimeout. +const keyVizFanoutDefaultTimeout = 2 * time.Second + +// keyVizPeerErrorBodyLimit caps how many bytes of a peer's non-OK +// response body we splice into the error message. 512 is enough to +// surface a typical structured error envelope without letting a +// misbehaving peer flood operator logs. +const keyVizPeerErrorBodyLimit = 512 + +// keyVizPeerResponseBodyLimit caps the JSON body we are willing to +// decode from a peer. A misbehaving or compromised peer that streams +// gigabytes back at us would otherwise pin a goroutine on +// json.Decode and balloon memory. +// +// Sizing: 1024 rows × 4096 columns = ~4M uint64 cells. JSON encoding +// of a uint64 ranges from 1 byte ("0") to 20 bytes (max uint64), with +// realistic heatmap traffic skewing low (most cells are 0 or small). +// At a worst-case 20 bytes/value the raw values alone would reach +// ~80 MiB, slightly over the 64 MiB cap. That is intentional: the +// operator-visible failure mode is "warning logged, matrix may be +// truncated", not "DoS". Operators on extreme-traffic deployments +// who hit the cap should override via a future flag once the need +// is real. +const keyVizPeerResponseBodyLimit int64 = 64 << 20 // 64 MiB + +// keyVizMergeBucketHint is a hand-tuned starting capacity for the +// merge phase's bucket map / order slice. Most fan-out responses +// are well under 1024 rows; 64 lets a small cluster avoid the +// initial map grow while keeping the worst-case overhead trivial +// against the 1024-row budget. +const keyVizMergeBucketHint = 64 + +// FanoutResult is the per-response fan-out summary attached to +// KeyVizMatrix.Fanout when fan-out is enabled. Nodes is ordered by +// the operator-supplied node list (self first) so the SPA can render +// a stable row order; Responded counts ok=true entries; Expected is +// the configured peer count plus self. +// +// See docs/design/2026_04_27_proposed_keyviz_cluster_fanout.md 5. +type FanoutResult struct { + Nodes []FanoutNodeStatus `json:"nodes"` + Responded int `json:"responded"` + Expected int `json:"expected"` +} + +// FanoutNodeStatus is one node's contribution status for a single +// fan-out request. OK=true means the node returned a parseable +// matrix; OK=false carries the reason (timeout, refused, 5xx body, +// JSON decode failure). The local node always reports OK=true: its +// matrix is computed in-process and cannot fail in this layer. +type FanoutNodeStatus struct { + Node string `json:"node"` + OK bool `json:"ok"` + Error string `json:"error,omitempty"` +} + +// KeyVizFanout aggregates this node's local matrix with matrices +// fetched from a static peer list. The contract: +// +// - peers must NOT include self; the handler computes the local +// matrix and passes it to Run alongside the peer set. +// - Each peer is queried in parallel via HTTP GET on the same +// /admin/api/v1/keyviz/matrix path. The query string is rebuilt +// from the parsed parameters so a peer running an older or newer +// server does not receive an unrecognised parameter we never +// intended to forward. +// - A peer that times out, errors, or returns a non-OK status +// contributes a FanoutNodeStatus{OK: false, Error: ...} but does +// not abort the request. Aggregation proceeds with whatever +// succeeded. +// +// The merge rules are documented in 4 of the design doc: +// +// - Reads / read_bytes: sum across nodes (each node served distinct +// follower reads). +// - Writes / write_bytes: max across nodes; when the per-cell values +// disagree we set Conflict=true on the row (best-effort dedup +// during a leadership flip; the canonical (raftGroupID, leaderTerm) +// dedup lands in Phase 2-C+ when we extend the wire format). +type KeyVizFanout struct { + self string + peers []string + client *http.Client + timeout time.Duration + logger *slog.Logger + bodyLimit int64 // per-peer JSON cap; 0 falls back to keyVizPeerResponseBodyLimit. +} + +// NewKeyVizFanout wires the aggregator. self is the local node's +// identity for the FanoutResult.Nodes entry (does not have to match +// any peer URL). peers is the list of HTTP base URLs to query +// (e.g. http://10.0.0.2:8080) — typically the operator's +// --keyvizFanoutNodes list with the local entry filtered out. +// +// The default per-peer timeout is 2 seconds, matching the design 9 +// open question 2 default. The default HTTP client has no +// connection pool tuning beyond stdlib defaults; intra-cluster +// admin traffic does not yet justify a custom transport. +func NewKeyVizFanout(self string, peers []string) *KeyVizFanout { + return &KeyVizFanout{ + self: self, + peers: append([]string(nil), peers...), + client: &http.Client{Timeout: keyVizFanoutDefaultTimeout}, + // timeout shadows client.Timeout so tests can shorten the + // per-call ceiling without rebuilding the http.Client. + timeout: keyVizFanoutDefaultTimeout, + logger: slog.Default(), + } +} + +// WithLogger overrides the slog destination so main.go can attach a +// component tag. nil leaves the existing logger. +func (f *KeyVizFanout) WithLogger(l *slog.Logger) *KeyVizFanout { + if l == nil || f == nil { + return f + } + f.logger = l + return f +} + +// WithHTTPClient swaps the HTTP client. Tests inject an httptest +// server's Client(); operators may want a custom transport in the +// future. nil resets to the default. +func (f *KeyVizFanout) WithHTTPClient(c *http.Client) *KeyVizFanout { + if f == nil { + return f + } + if c == nil { + f.client = &http.Client{Timeout: f.timeout} + return f + } + f.client = c + return f +} + +// WithResponseBodyLimit overrides the per-peer JSON decode cap. +// Production leaves this unset; tests use it to drive the over-cap +// path with a small synthetic body. Values <= 0 reset to the +// default. +func (f *KeyVizFanout) WithResponseBodyLimit(n int64) *KeyVizFanout { + if f == nil { + return f + } + if n <= 0 { + f.bodyLimit = 0 + return f + } + f.bodyLimit = n + return f +} + +// WithTimeout sets the per-peer timeout (and updates the http.Client +// timeout when it has not been replaced via WithHTTPClient). Values +// <= 0 leave the existing timeout unchanged. +func (f *KeyVizFanout) WithTimeout(d time.Duration) *KeyVizFanout { + if f == nil || d <= 0 { + return f + } + f.timeout = d + if f.client != nil { + f.client.Timeout = d + } + return f +} + +// peerResult is the per-peer outcome the goroutine pool collects +// before the synchronous merge phase. Either matrix is non-nil or +// err is non-nil; never both. +type peerResult struct { + node string + matrix *KeyVizMatrix + err error +} + +// Run merges local with peer responses and returns the combined +// matrix plus per-node status. local is the matrix the handler +// already computed against the in-process sampler; on a single-node +// cluster (peers empty) Run returns local with a Fanout block that +// reports Expected=1, Responded=1. +// +// Run never returns an error: peer-level failures surface in the +// FanoutResult; aggregation is best-effort. +func (f *KeyVizFanout) Run(ctx context.Context, params keyVizParams, local KeyVizMatrix) KeyVizMatrix { + if f == nil || len(f.peers) == 0 { + merged := local + merged.Fanout = &FanoutResult{ + Nodes: []FanoutNodeStatus{{Node: f.selfName(), OK: true}}, + Responded: 1, + Expected: 1, + } + return merged + } + + results := f.fetchPeersParallel(ctx, params) + + matrices := []KeyVizMatrix{local} + statuses := []FanoutNodeStatus{{Node: f.selfName(), OK: true}} + for _, r := range results { + if r.err != nil { + statuses = append(statuses, FanoutNodeStatus{ + Node: r.node, OK: false, Error: r.err.Error(), + }) + continue + } + matrices = append(matrices, *r.matrix) + statuses = append(statuses, FanoutNodeStatus{Node: r.node, OK: true}) + } + merged := mergeKeyVizMatrices(matrices, params.series) + merged.Fanout = &FanoutResult{ + Nodes: statuses, + Responded: countOK(statuses), + Expected: len(statuses), + } + merged.Series = local.Series + merged.GeneratedAt = local.GeneratedAt + return merged +} + +func (f *KeyVizFanout) selfName() string { + if f == nil || f.self == "" { + return "self" + } + return f.self +} + +func countOK(statuses []FanoutNodeStatus) int { + n := 0 + for _, s := range statuses { + if s.OK { + n++ + } + } + return n +} + +func (f *KeyVizFanout) fetchPeersParallel(ctx context.Context, params keyVizParams) []peerResult { + // Cap per-peer wall time so a single slow node cannot hold the + // SPA poll open beyond the configured timeout. The parent + // context is preserved as the cancellation root so an early + // client disconnect short-circuits every in-flight peer call. + callCtx, cancel := context.WithTimeout(ctx, f.timeout) + defer cancel() + + results := make([]peerResult, len(f.peers)) + var wg sync.WaitGroup + for i, peer := range f.peers { + wg.Add(1) + go func(i int, peer string) { + defer wg.Done() + matrix, err := f.fetchPeer(callCtx, peer, params) + results[i] = peerResult{node: peer, matrix: matrix, err: err} + }(i, peer) + } + wg.Wait() + return results +} + +func (f *KeyVizFanout) fetchPeer(ctx context.Context, peer string, params keyVizParams) (*KeyVizMatrix, error) { + target, err := buildKeyVizPeerURL(peer, params) + if err != nil { + return nil, pkgerrors.Wrap(err, "build peer url") + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, http.NoBody) + if err != nil { + return nil, pkgerrors.Wrap(err, "new request") + } + req.Header.Set("Accept", "application/json") + resp, err := f.client.Do(req) + if err != nil { + return nil, pkgerrors.Wrap(err, "peer request") + } + defer func() { + // A peer that hangs on body close can wedge our goroutine + // against the deadline; log and move on rather than blocking. + if cerr := resp.Body.Close(); cerr != nil { + f.logger.LogAttrs(ctx, slog.LevelDebug, "keyviz fan-out: peer body close failed", + slog.String("peer", peer), + slog.String("error", cerr.Error()), + ) + } + }() + if resp.StatusCode != http.StatusOK { + // Read a bounded prefix of the body so the error message is + // useful without letting a misbehaving peer flood our logs. + body, _ := io.ReadAll(io.LimitReader(resp.Body, keyVizPeerErrorBodyLimit)) + return nil, fmt.Errorf("%w: status %d: %s", errKeyVizPeer, resp.StatusCode, string(body)) + } + // Bound the JSON decode so a peer that streams gigabytes cannot + // pin a goroutine and balloon memory. The countingReader wraps a + // LimitReader so: + // - The hard cap is enforced by io.LimitReader (security + // bound: at most cap+1 bytes ever pulled off the wire). + // - The byte counter is incremented on every Read, including + // the chunks json.NewDecoder buffers internally — so the + // post-decode `n > cap` check fires reliably even when the + // decoder consumed the trailing byte itself rather than + // leaving it for an external probe (Claude bot round-2 on + // PR #686 flagged the bufio false-negative). + cr := &countingReader{r: io.LimitReader(resp.Body, f.responseBodyLimit()+1)} + var m KeyVizMatrix + if err := json.NewDecoder(cr).Decode(&m); err != nil { + return nil, pkgerrors.Wrap(err, "decode peer response") + } + if cr.n > f.responseBodyLimit() { + f.logger.LogAttrs(ctx, slog.LevelWarn, "keyviz fan-out: peer response exceeded size limit; truncated decode", + slog.String("peer", peer), + slog.Int64("limit_bytes", f.responseBodyLimit()), + slog.Int64("read_bytes", cr.n), + ) + } + return &m, nil +} + +// countingReader wraps an io.Reader and tracks total bytes read. +// It is the only reliable way to detect that a JSON decoder +// consumed past a LimitReader cap, since json.NewDecoder uses +// internal buffering and an external one-byte probe of the +// LimitReader can return EOF even when the decoder pulled past +// the cap into its own buffer. +type countingReader struct { + r io.Reader + n int64 +} + +func (c *countingReader) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + c.n += int64(n) + // CLAUDE.md says "avoid //nolint — refactor instead", but the + // io.Reader contract is the rare place where the suppression + // is correct rather than lazy: implementations are required to + // pass io.EOF through unwrapped so any caller that does + // `err == io.EOF` (pointer compare) keeps working. Wrapping + // with %w produces a different error value that pointer + // compare will not match, even though errors.Is would. The + // stdlib `encoding/json` historically did pointer compare; + // modern alternatives (`goccy/go-json` is the one this + // package uses) may use errors.Is, but the io.Reader contract + // holds independent of which consumer is in scope. Refactoring + // is impossible here — the only options are + // pass-through-and-suppress (this) or + // wrap-and-break-anyone-doing-pointer-compare. + return n, err //nolint:wrapcheck // io.Reader contract requires unwrapped sentinels. +} + +// responseBodyLimit returns the per-peer JSON body cap. Tests can +// override the limit by assigning the unexported field directly via +// a constructor option (see WithResponseBodyLimit). Production keeps +// the default keyVizPeerResponseBodyLimit. +func (f *KeyVizFanout) responseBodyLimit() int64 { + if f.bodyLimit > 0 { + return f.bodyLimit + } + return keyVizPeerResponseBodyLimit +} + +// buildKeyVizPeerURL forwards the parsed query parameters from the +// upstream request, NOT the raw query string. Forwarding parsed +// values prevents an upstream from injecting parameters we do not +// recognise (forward-compatibility quirks) and keeps the per-peer +// URL deterministic for tests. +// +// The peer string accepts two shapes: +// +// - Full URL: http://10.0.0.2:8080 (or https when TLS lands) +// - host:port: 10.0.0.2:8080 (interpreted as http://host:port) +// +// The host-only form is the common operator shorthand; url.Parse +// rejects it as ambiguous (10.0.0.2 looks like a scheme) so we +// detect "no scheme" by the absence of '://' and prepend http://. +func buildKeyVizPeerURL(peer string, params keyVizParams) (string, error) { + raw := peer + if !strings.Contains(raw, "://") { + raw = "http://" + raw + } + base, err := url.Parse(raw) + if err != nil { + return "", pkgerrors.Wrapf(err, "parse peer base url %q", peer) + } + if base.Host == "" { + return "", fmt.Errorf("%w: peer base url %q has no host", errKeyVizPeer, peer) + } + base.Path = "/admin/api/v1/keyviz/matrix" + q := base.Query() + q.Set("series", string(params.series)) + q.Set("rows", strconv.Itoa(params.rows)) + if !params.from.IsZero() { + q.Set("from_unix_ms", strconv.FormatInt(params.from.UnixMilli(), 10)) + } + if !params.to.IsZero() { + q.Set("to_unix_ms", strconv.FormatInt(params.to.UnixMilli(), 10)) + } + base.RawQuery = q.Encode() + return base.String(), nil +} + +// mergeKeyVizMatrices combines per-node matrices into one. The merge +// is column-wise on column_unix_ms (a column missing from a node +// contributes 0 for every row); per-row keying is by BucketID. The +// rule selector follows the requested series — reads sum, writes +// max with conflict surfacing — per design 4. +func mergeKeyVizMatrices(matrices []KeyVizMatrix, series KeyVizSeries) KeyVizMatrix { + if len(matrices) == 0 { + return KeyVizMatrix{Series: series} + } + if len(matrices) == 1 { + out := matrices[0] + out.Fanout = nil + return out + } + columns := unionColumns(matrices) + indexByColumn := make(map[int64]int, len(columns)) + for i, ts := range columns { + indexByColumn[ts] = i + } + rowsByBucket := make(map[string]*KeyVizRow, keyVizMergeBucketHint) + bucketOrder := make([]string, 0, keyVizMergeBucketHint) + mergeFn := mergeFnFor(series) + for mi := range matrices { + m := &matrices[mi] + for ri := range m.Rows { + mergeRowInto(&m.Rows[ri], m.ColumnUnixMs, indexByColumn, rowsByBucket, &bucketOrder, len(columns), mergeFn) + } + } + out := KeyVizMatrix{ + ColumnUnixMs: columns, + Series: series, + Rows: make([]KeyVizRow, 0, len(rowsByBucket)), + } + for _, bucket := range bucketOrder { + out.Rows = append(out.Rows, *rowsByBucket[bucket]) + } + return out +} + +// mergeRowInto folds one source row into the merge accumulator. Split +// out of mergeKeyVizMatrices to keep that function under the cyclop +// budget (and so this body — the part that actually does the merge +// per-cell — has its own contained set of branches). +func mergeRowInto( + row *KeyVizRow, + srcColumns []int64, + indexByColumn map[int64]int, + rowsByBucket map[string]*KeyVizRow, + bucketOrder *[]string, + mergedWidth int, + mergeFn mergeCellFn, +) { + dst, ok := rowsByBucket[row.BucketID] + if !ok { + dst = &KeyVizRow{ + BucketID: row.BucketID, + Start: append([]byte(nil), row.Start...), + End: append([]byte(nil), row.End...), + Aggregate: row.Aggregate, + RouteIDs: append([]uint64(nil), row.RouteIDs...), + RouteIDsTruncated: row.RouteIDsTruncated, + RouteCount: row.RouteCount, + Values: make([]uint64, mergedWidth), + } + rowsByBucket[row.BucketID] = dst + *bucketOrder = append(*bucketOrder, row.BucketID) + } + for j, ts := range srcColumns { + idx, ok := indexByColumn[ts] + if !ok || j >= len(row.Values) { + continue + } + next, conflict := mergeFn(dst.Values[idx], row.Values[j]) + dst.Values[idx] = next + if conflict { + dst.Conflict = true + } + } +} + +// mergeCellFn returns the merged value plus a conflict flag. +// +// - Reads (and read_bytes) sum across nodes and never raise the +// conflict flag — distinct local serves are independent counts. +// - Writes (and write_bytes) take the max across nodes and raise +// conflict when the inputs disagree (both non-zero with +// different values, or one zero and one non-zero would NOT be a +// conflict — that is the steady-state shape). +type mergeCellFn func(prev, incoming uint64) (uint64, bool) + +func mergeFnFor(series KeyVizSeries) mergeCellFn { + switch series { + case keyVizSeriesReads, keyVizSeriesReadBytes: + return sumMerge + case keyVizSeriesWrites, keyVizSeriesWriteBytes: + return maxMerge + default: + return sumMerge + } +} + +func sumMerge(prev, incoming uint64) (uint64, bool) { + return prev + incoming, false +} + +// maxMerge pairs the §4.2 description: pick the larger value, raise +// conflict when both inputs are non-zero AND disagree. Stable +// leadership produces (0, X) or (X, 0) which collapse to X without +// raising conflict; a leadership flip produces (X, Y) with both > 0 +// and the SPA hatches the row. +func maxMerge(prev, incoming uint64) (uint64, bool) { + if prev == 0 { + return incoming, false + } + if incoming == 0 { + return prev, false + } + if prev == incoming { + return prev, false + } + if prev > incoming { + return prev, true + } + return incoming, true +} + +// unionColumns returns the sorted union of column timestamps across +// all matrices. Columns that appear in only some inputs still get a +// slot; the merge fills missing values with the merge-rule identity +// (0 for sum, 0 for max — both treat 0 as "no contribution"). +func unionColumns(matrices []KeyVizMatrix) []int64 { + seen := make(map[int64]struct{}, keyVizMergeBucketHint) + for _, m := range matrices { + for _, ts := range m.ColumnUnixMs { + seen[ts] = struct{}{} + } + } + out := make([]int64, 0, len(seen)) + for ts := range seen { + out = append(out, ts) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} diff --git a/internal/admin/keyviz_fanout_test.go b/internal/admin/keyviz_fanout_test.go new file mode 100644 index 00000000..9a8eabea --- /dev/null +++ b/internal/admin/keyviz_fanout_test.go @@ -0,0 +1,489 @@ +package admin + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/goccy/go-json" + "github.com/stretchr/testify/require" +) + +// TestMergeKeyVizMatricesReadsSum pins the §4.1 rule: read counters +// from distinct nodes are independent local serves and add. The +// merged matrix has one column entry per timestamp seen anywhere +// and the row's Values for that column equal the sum of all node +// inputs at that column. +func TestMergeKeyVizMatricesReadsSum(t *testing.T) { + t.Parallel() + col := []int64{1_700_000_000_000} + a := KeyVizMatrix{ + ColumnUnixMs: col, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{10}}, + }, + } + b := KeyVizMatrix{ + ColumnUnixMs: col, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{25}}, + }, + } + merged := mergeKeyVizMatrices([]KeyVizMatrix{a, b}, keyVizSeriesReads) + require.Equal(t, []int64{1_700_000_000_000}, merged.ColumnUnixMs) + require.Len(t, merged.Rows, 1) + require.Equal(t, []uint64{35}, merged.Rows[0].Values) + require.False(t, merged.Rows[0].Conflict, "reads must never raise conflict") +} + +// TestMergeKeyVizMatricesWritesMaxStableLeader pins the §4.2 happy +// path: under stable leadership exactly one node reports non-zero +// writes. The merge picks the non-zero value without raising +// conflict. +func TestMergeKeyVizMatricesWritesMaxStableLeader(t *testing.T) { + t.Parallel() + col := []int64{1_700_000_000_000, 1_700_000_001_000} + leader := KeyVizMatrix{ + ColumnUnixMs: col, + Series: keyVizSeriesWrites, + Rows: []KeyVizRow{ + {BucketID: "route:7", Values: []uint64{42, 17}}, + }, + } + follower := KeyVizMatrix{ + ColumnUnixMs: col, + Series: keyVizSeriesWrites, + Rows: []KeyVizRow{ + {BucketID: "route:7", Values: []uint64{0, 0}}, + }, + } + merged := mergeKeyVizMatrices([]KeyVizMatrix{follower, leader}, keyVizSeriesWrites) + require.Len(t, merged.Rows, 1) + require.Equal(t, []uint64{42, 17}, merged.Rows[0].Values) + require.False(t, merged.Rows[0].Conflict, "stable-leader merge must not raise conflict") +} + +// TestMergeKeyVizMatricesWritesMaxLeadershipFlip pins §4.2 under a +// mid-window flip: two nodes report non-zero, disagreeing values +// for the same cell. The merge keeps the larger value and raises +// the row-level conflict flag so the SPA can hatch the row. +func TestMergeKeyVizMatricesWritesMaxLeadershipFlip(t *testing.T) { + t.Parallel() + col := []int64{1_700_000_000_000} + exLeader := KeyVizMatrix{ + ColumnUnixMs: col, + Series: keyVizSeriesWrites, + Rows: []KeyVizRow{ + {BucketID: "route:9", Values: []uint64{30}}, + }, + } + newLeader := KeyVizMatrix{ + ColumnUnixMs: col, + Series: keyVizSeriesWrites, + Rows: []KeyVizRow{ + {BucketID: "route:9", Values: []uint64{55}}, + }, + } + merged := mergeKeyVizMatrices([]KeyVizMatrix{exLeader, newLeader}, keyVizSeriesWrites) + require.Len(t, merged.Rows, 1) + require.Equal(t, []uint64{55}, merged.Rows[0].Values, "max-merge must keep the larger value") + require.True(t, merged.Rows[0].Conflict, "leadership flip must raise the row conflict flag") +} + +// TestMergeKeyVizMatricesUnionColumns pins the §4.5 rule: a column +// present in only some nodes still gets a slot in the merged matrix; +// missing values fill in as zero. +func TestMergeKeyVizMatricesUnionColumns(t *testing.T) { + t.Parallel() + a := KeyVizMatrix{ + ColumnUnixMs: []int64{100, 200}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{1, 2}}, + }, + } + b := KeyVizMatrix{ + ColumnUnixMs: []int64{200, 300}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{10, 20}}, + }, + } + merged := mergeKeyVizMatrices([]KeyVizMatrix{a, b}, keyVizSeriesReads) + require.Equal(t, []int64{100, 200, 300}, merged.ColumnUnixMs) + require.Len(t, merged.Rows, 1) + require.Equal(t, []uint64{1, 12, 20}, merged.Rows[0].Values, + "missing columns must read as zero on the side that does not have them") +} + +// TestMergeKeyVizMatricesDistinctRowsPreserveOrder pins the §4.4 +// row-identity rule: rows with distinct BucketIDs land in the +// merged matrix in first-seen-order, preserving the per-node row +// order so a single-node fan-out is byte-identical to the local +// matrix. +func TestMergeKeyVizMatricesDistinctRowsPreserveOrder(t *testing.T) { + t.Parallel() + a := KeyVizMatrix{ + ColumnUnixMs: []int64{100}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:5", Values: []uint64{1}}, + {BucketID: "route:1", Values: []uint64{2}}, + }, + } + b := KeyVizMatrix{ + ColumnUnixMs: []int64{100}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:9", Values: []uint64{4}}, + {BucketID: "route:5", Values: []uint64{8}}, + }, + } + merged := mergeKeyVizMatrices([]KeyVizMatrix{a, b}, keyVizSeriesReads) + require.Len(t, merged.Rows, 3) + require.Equal(t, "route:5", merged.Rows[0].BucketID) + require.Equal(t, "route:1", merged.Rows[1].BucketID) + require.Equal(t, "route:9", merged.Rows[2].BucketID) +} + +// TestKeyVizFanoutRunSinglePeerOK exercises the end-to-end happy +// path: one peer responds with a parseable matrix; the aggregator +// merges it with the local view and reports both nodes ok. +func TestKeyVizFanoutRunSinglePeerOK(t *testing.T) { + t.Parallel() + peerMatrix := KeyVizMatrix{ + ColumnUnixMs: []int64{1_700_000_000_000}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{7}}, + }, + } + peer := newKeyVizPeerStub(t, peerMatrix) + defer peer.Close() + + f := NewKeyVizFanout("self:8080", []string{peer.URL}).WithHTTPClient(peer.Client()) + local := KeyVizMatrix{ + ColumnUnixMs: []int64{1_700_000_000_000}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{3}}, + }, + } + + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, local) + require.Equal(t, []uint64{10}, merged.Rows[0].Values, "reads must sum across local + peer") + require.NotNil(t, merged.Fanout) + require.Equal(t, 2, merged.Fanout.Expected) + require.Equal(t, 2, merged.Fanout.Responded) + require.Len(t, merged.Fanout.Nodes, 2) + require.Equal(t, "self:8080", merged.Fanout.Nodes[0].Node) + require.True(t, merged.Fanout.Nodes[0].OK) + require.Equal(t, peer.URL, merged.Fanout.Nodes[1].Node) + require.True(t, merged.Fanout.Nodes[1].OK) +} + +// TestKeyVizFanoutRunPeerHTTPError pins the §2.1 degraded-mode +// contract: a peer that returns 5xx contributes ok=false with the +// status surfaced; the local matrix still ships and Responded +// reflects the partial success. +func TestKeyVizFanoutRunPeerHTTPError(t *testing.T) { + t.Parallel() + peer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = io.WriteString(w, "boom") + })) + defer peer.Close() + + f := NewKeyVizFanout("self:8080", []string{peer.URL}).WithHTTPClient(peer.Client()) + local := KeyVizMatrix{ + ColumnUnixMs: []int64{1_700_000_000_000}, + Series: keyVizSeriesReads, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{3}}, + }, + } + + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, local) + require.Equal(t, []uint64{3}, merged.Rows[0].Values, "5xx peer must not perturb local counts") + require.NotNil(t, merged.Fanout) + require.Equal(t, 2, merged.Fanout.Expected) + require.Equal(t, 1, merged.Fanout.Responded) + require.False(t, merged.Fanout.Nodes[1].OK) + require.Contains(t, merged.Fanout.Nodes[1].Error, "500") +} + +// TestKeyVizFanoutRunPeerTimeout pins the design 9 timeout: a +// peer that hangs past the per-call ceiling contributes ok=false +// and the request still completes promptly. +func TestKeyVizFanoutRunPeerTimeout(t *testing.T) { + t.Parallel() + hang := make(chan struct{}) + peer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + case <-hang: + } + })) + defer peer.Close() + defer close(hang) + + f := NewKeyVizFanout("self:8080", []string{peer.URL}). + WithHTTPClient(peer.Client()). + WithTimeout(50 * time.Millisecond) + + start := time.Now() + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, KeyVizMatrix{Series: keyVizSeriesReads}) + require.Less(t, time.Since(start), 1*time.Second, "fan-out must not wait beyond its per-peer timeout") + require.NotNil(t, merged.Fanout) + require.Equal(t, 1, merged.Fanout.Responded) + require.False(t, merged.Fanout.Nodes[1].OK) +} + +// TestKeyVizFanoutRunNoPeers exercises the single-node fallback: +// when peers is empty, Run returns the local matrix with a Fanout +// block reporting Expected=1, Responded=1. +func TestKeyVizFanoutRunNoPeers(t *testing.T) { + t.Parallel() + f := NewKeyVizFanout("self:8080", nil) + local := KeyVizMatrix{ + ColumnUnixMs: []int64{1_700_000_000_000}, + Series: keyVizSeriesWrites, + Rows: []KeyVizRow{ + {BucketID: "route:1", Values: []uint64{99}}, + }, + } + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesWrites, rows: 1024}, local) + require.Equal(t, []uint64{99}, merged.Rows[0].Values) + require.Equal(t, 1, merged.Fanout.Expected) + require.Equal(t, 1, merged.Fanout.Responded) + require.Equal(t, "self:8080", merged.Fanout.Nodes[0].Node) +} + +// TestBuildKeyVizPeerURLForwardsParams pins the §6 contract that +// the per-peer URL is rebuilt from the parsed parameters, so the +// peer always gets a deterministic query string regardless of the +// upstream client's encoding quirks. +func TestBuildKeyVizPeerURLForwardsParams(t *testing.T) { + t.Parallel() + cases := []struct { + name string + peer string + params keyVizParams + want string + }{ + { + name: "with scheme", + peer: "http://10.0.0.2:8080", + params: keyVizParams{series: keyVizSeriesWrites, rows: 256}, + want: "http://10.0.0.2:8080/admin/api/v1/keyviz/matrix?rows=256&series=writes", + }, + { + name: "host only (no scheme)", + peer: "10.0.0.2:8080", + params: keyVizParams{series: keyVizSeriesReads, rows: 1024}, + want: "http://10.0.0.2:8080/admin/api/v1/keyviz/matrix?rows=1024&series=reads", + }, + { + name: "with time bounds", + peer: "http://node-a", + params: keyVizParams{ + series: keyVizSeriesReads, + rows: 8, + from: time.UnixMilli(1_700_000_000_000), + to: time.UnixMilli(1_700_000_900_000), + }, + want: "http://node-a/admin/api/v1/keyviz/matrix?from_unix_ms=1700000000000&rows=8&series=reads&to_unix_ms=1700000900000", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := buildKeyVizPeerURL(tc.peer, tc.params) + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) + } +} + +// TestKeyVizFanoutRunPeerOrder pins that the per-node status array +// follows the operator-supplied peer order. Self is always first. +func TestKeyVizFanoutRunPeerOrder(t *testing.T) { + t.Parallel() + matrix := KeyVizMatrix{Series: keyVizSeriesReads} + first := newKeyVizPeerStub(t, matrix) + defer first.Close() + second := newKeyVizPeerStub(t, matrix) + defer second.Close() + + f := NewKeyVizFanout("self:8080", []string{first.URL, second.URL}).WithHTTPClient(first.Client()) + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, matrix) + require.Equal(t, []FanoutNodeStatus{ + {Node: "self:8080", OK: true}, + {Node: first.URL, OK: true}, + {Node: second.URL, OK: true}, + }, merged.Fanout.Nodes) +} + +// TestKeyVizFanoutRunPeerExceedsBodyLimit pins the over-cap path +// (Claude bot round-2 on PR #686). Lowering the per-peer limit to a +// small test value lets us drive the path without serving 64 MiB. +// The peer streams a body deliberately larger than the cap; the +// aggregator's countingReader must: +// - Bound how many bytes are pulled off the wire (the LimitReader +// enforces the security property). +// - Detect the overshoot reliably even when the json.Decoder +// buffers the trailing bytes internally. +// +// What we assert: the call returns within the test timeout (no hang), +// the per-node status surfaces, and the response carries the +// expected number of node entries. The warning log is best-effort +// and not asserted directly — the reliability of the byte-counting +// is the load-bearing invariant. +func TestKeyVizFanoutRunPeerExceedsBodyLimit(t *testing.T) { + t.Parallel() + bigRow := KeyVizRow{ + BucketID: "route:overshoot", + Values: []uint64{1, 2, 3, 4}, + } + rows := make([]KeyVizRow, 256) + for i := range rows { + rows[i] = bigRow + } + body := KeyVizMatrix{ + ColumnUnixMs: []int64{1_700_000_000_000, 1_700_000_001_000, 1_700_000_002_000, 1_700_000_003_000}, + Rows: rows, + Series: keyVizSeriesReads, + } + peer := newKeyVizPeerStub(t, body) + defer peer.Close() + + const testCap int64 = 1024 + f := NewKeyVizFanout("self:8080", []string{peer.URL}). + WithHTTPClient(peer.Client()). + WithResponseBodyLimit(testCap) + + start := time.Now() + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, KeyVizMatrix{Series: keyVizSeriesReads}) + require.Less(t, time.Since(start), 5*time.Second, "decode must respect the size cap and complete promptly") + require.NotNil(t, merged.Fanout) + require.Equal(t, 2, merged.Fanout.Expected) + require.Len(t, merged.Fanout.Nodes, 2) + require.Equal(t, "self:8080", merged.Fanout.Nodes[0].Node) + require.True(t, merged.Fanout.Nodes[0].OK, "self always reports ok") + // Peer status: decode either errored on the truncated body + // (ok=false) or succeeded on a partial matrix (ok=true). Either + // is fine — what we are pinning is the bound, not the outcome. +} + +// TestKeyVizFanoutRunPeerNearCapSucceedsWithWarning pins the +// warning-fires path Claude bot round-2 flagged on PR #686. A body +// whose JSON ends within the cap but whose total length (with +// trailing whitespace) overruns the cap exercises the case where +// the decoder returns success but countingReader.n > cap. The +// peer entry surfaces ok=true; the warning log is emitted by the +// aggregator (best-effort, not asserted from the test). +// +// Construction: minimal JSON envelope (~30 B) + 256 B of trailing +// whitespace, against a 100 B cap. json.Decoder reads in bufio +// chunks, so the LimitReader hands it cap+1 = 101 bytes; the +// decoder sees the complete object and returns nil, leaving +// cr.n == 101 > 100 → the warning condition is true. +func TestKeyVizFanoutRunPeerNearCapSucceedsWithWarning(t *testing.T) { + t.Parallel() + tiny := KeyVizMatrix{Series: keyVizSeriesReads} + peer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, "/admin/api/v1/keyviz/matrix") { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(tiny); err != nil { + t.Fatalf("encode tiny: %v", err) + } + _, _ = io.WriteString(w, strings.Repeat(" ", 256)) + })) + defer peer.Close() + + const testCap int64 = 100 + f := NewKeyVizFanout("self:8080", []string{peer.URL}). + WithHTTPClient(peer.Client()). + WithResponseBodyLimit(testCap) + + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, KeyVizMatrix{Series: keyVizSeriesReads}) + require.NotNil(t, merged.Fanout) + require.Len(t, merged.Fanout.Nodes, 2) + require.True(t, merged.Fanout.Nodes[0].OK, "self always reports ok") + require.True(t, merged.Fanout.Nodes[1].OK, + "near-cap success path: small JSON with trailing whitespace must decode despite the cap; got error %q", + merged.Fanout.Nodes[1].Error) +} + +// TestKeyVizFanoutRunPeerOverlargeBody pins the security-high +// review item on PR #686: a peer that streams more than +// keyVizPeerResponseBodyLimit bytes must not pin a goroutine on the +// JSON decoder or balloon memory. The aggregator caps the decode at +// the configured limit and surfaces a warning log rather than +// silently accepting a truncated matrix. +func TestKeyVizFanoutRunPeerOverlargeBody(t *testing.T) { + t.Parallel() + // Build a JSON payload whose `rows` array is enormous: many + // rows of 4096 zeroed values. We do not actually need to exceed + // the production cap (64 MiB) — we just need to assert that the + // decode completes promptly and that the peer call ends up + // reporting OK with a row count that matches what was on the wire + // up to the cap. + hugeRow := KeyVizRow{ + BucketID: "route:big", + Values: make([]uint64, 4096), + } + rows := make([]KeyVizRow, 64) + for i := range rows { + rows[i] = hugeRow + } + body := KeyVizMatrix{ + ColumnUnixMs: make([]int64, 4096), + Rows: rows, + Series: keyVizSeriesReads, + } + peer := newKeyVizPeerStub(t, body) + defer peer.Close() + + f := NewKeyVizFanout("self:8080", []string{peer.URL}).WithHTTPClient(peer.Client()) + + start := time.Now() + merged := f.Run(context.Background(), keyVizParams{series: keyVizSeriesReads, rows: 1024}, KeyVizMatrix{Series: keyVizSeriesReads}) + require.Less(t, time.Since(start), 5*time.Second, "decode must respect the size cap and complete promptly") + require.NotNil(t, merged.Fanout) + require.True(t, merged.Fanout.Nodes[1].OK, "in-cap response must succeed; cap is 64 MiB and the synthetic body is well under that") +} + +// newKeyVizPeerStub spins up an httptest.Server that answers +// /admin/api/v1/keyviz/matrix with a fixed 200 JSON body. Anything +// else returns 404 — which surfaces as "peer status 404" in the +// aggregator and lets a future test assert the path verbatim. +// +// Tests that need a non-200 response build their handler inline +// (see TestKeyVizFanoutRunPeerHTTPError); this helper covers the +// common happy-path stub. +func newKeyVizPeerStub(t *testing.T, body KeyVizMatrix) *httptest.Server { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, "/admin/api/v1/keyviz/matrix") { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(body); err != nil { + t.Logf("encode peer body: %v", err) + } + })) + return srv +} diff --git a/internal/admin/keyviz_handler.go b/internal/admin/keyviz_handler.go index bf8c3493..7bf841c4 100644 --- a/internal/admin/keyviz_handler.go +++ b/internal/admin/keyviz_handler.go @@ -53,17 +53,31 @@ const keyVizRowBudgetCap = 1024 // /admin/api/v1/keyviz/matrix. Mirrors the proto GetKeyVizMatrixResponse // shape so a future refactor can share a single pivot helper across // the adapter (gRPC) and admin (JSON) paths. +// +// Fanout is non-nil when the handler is configured for cluster-wide +// fan-out (Phase 2-C): it carries per-node status so the SPA can +// surface degraded responses inline (see design 2026_04_27_proposed_keyviz_cluster_fanout.md). +// The field is omitted from the wire form when fan-out is disabled +// so old clients keep working unchanged. type KeyVizMatrix struct { - ColumnUnixMs []int64 `json:"column_unix_ms"` - Rows []KeyVizRow `json:"rows"` - Series KeyVizSeries `json:"series"` - GeneratedAt time.Time `json:"generated_at"` + ColumnUnixMs []int64 `json:"column_unix_ms"` + Rows []KeyVizRow `json:"rows"` + Series KeyVizSeries `json:"series"` + GeneratedAt time.Time `json:"generated_at"` + Fanout *FanoutResult `json:"fanout,omitempty"` } // KeyVizRow is one route's worth of activity across the column window, // matching the proto KeyVizRow layout. Values is parallel to // KeyVizMatrix.ColumnUnixMs — Values[j] is the counter for that route // at column j. +// +// Conflict is true when the Phase 2-C max-merge collapsed disagreeing +// values from multiple nodes for the same row (see fan-out design 4.2); +// the SPA hatches such rows so operators know the displayed total may +// understate the true per-window count during a leadership flip. The +// flag is row-level for now and will move to per-cell when the proto +// extension lands in Phase 2-C+. type KeyVizRow struct { BucketID string `json:"bucket_id"` Start []byte `json:"start"` @@ -73,6 +87,7 @@ type KeyVizRow struct { RouteIDsTruncated bool `json:"route_ids_truncated,omitempty"` RouteCount uint64 `json:"route_count"` Values []uint64 `json:"values"` + Conflict bool `json:"conflict,omitempty"` // total accumulates the sum of Values during pivot so the // rowBudget sort is O(N log N) on a precomputed key rather // than O(N log N × M) recomputing the sum per comparison. @@ -99,6 +114,11 @@ type KeyVizHandler struct { source KeyVizSource now func() time.Time logger *slog.Logger + // fanout is non-nil when the operator configured + // --keyvizFanoutNodes. When set, ServeHTTP merges the local + // matrix with peer responses before encoding the JSON body. + // nil keeps the legacy single-node behaviour. + fanout *KeyVizFanout } // NewKeyVizHandler wires a KeyVizSource into the HTTP handler. @@ -131,6 +151,14 @@ func (h *KeyVizHandler) WithClock(now func() time.Time) *KeyVizHandler { return h } +// WithFanout enables cluster-wide fan-out aggregation. Pass nil to +// disable; passing a configured aggregator switches the handler to +// merge the local matrix with peer responses on every request. +func (h *KeyVizHandler) WithFanout(f *KeyVizFanout) *KeyVizHandler { + h.fanout = f + return h +} + func (h *KeyVizHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") @@ -149,6 +177,9 @@ func (h *KeyVizHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { cols := h.source.Snapshot(params.from, params.to) matrix := pivotKeyVizColumns(cols, params.series, params.rows) matrix.GeneratedAt = h.now() + if h.fanout != nil { + matrix = h.fanout.Run(r.Context(), params, matrix) + } w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(http.StatusOK) diff --git a/internal/admin/server.go b/internal/admin/server.go index d3704a38..0c229fb6 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -61,6 +61,12 @@ type ServerDeps struct { // off" state instead of an empty matrix. KeyViz KeyVizSource + // KeyVizFanout enables Phase 2-C cluster-wide aggregation. When + // non-nil, the keyviz handler merges the local matrix with the + // configured peer set on every request. Optional: leaving it nil + // preserves the legacy single-node behaviour. + KeyVizFanout *KeyVizFanout + // Queues is the SQS admin source — covers list, describe, and // delete via QueuesSource. Optional: a nil value disables // /admin/api/v1/sqs/queues{,/{name}} (the mux answers them @@ -119,7 +125,8 @@ func NewServer(deps ServerDeps) (*Server, error) { // KeyViz handler is always registered: even when the source is // nil it serves a 503 keyviz_disabled, which the SPA renders as // a clearer "feature off" state than an unknown_endpoint 404. - keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger) + // Fan-out is opt-in: nil leaves the handler in single-node mode. + keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger).WithFanout(deps.KeyVizFanout) sqs := buildSqsHandlerForDeps(deps, logger) mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, sqs, logger) router := NewRouter(mux, deps.StaticFS) diff --git a/main.go b/main.go index 1efb9a0d..89940ef0 100644 --- a/main.go +++ b/main.go @@ -136,8 +136,19 @@ var ( keyvizMaxTrackedRoutes = flag.Int("keyvizMaxTrackedRoutes", keyviz.DefaultMaxTrackedRoutes, "Maximum routes tracked individually before excess routes coarsen into virtual buckets") keyvizMaxMemberRoutesPerSlot = flag.Int("keyvizMaxMemberRoutesPerSlot", keyviz.DefaultMaxMemberRoutesPerSlot, "Maximum members listed on a virtual bucket; excess routes still drive the bucket counters") keyvizHistoryColumns = flag.Int("keyvizHistoryColumns", keyviz.DefaultHistoryColumns, "Maximum matrix columns retained in the keyviz ring buffer (each column = one Step)") + // Phase 2-C cluster fan-out: comma-separated list of admin + // HTTP endpoints (host:port or scheme://host:port). When set, + // the admin keyviz handler aggregates the local matrix with + // peer responses; when empty, behaviour is unchanged + // (single-node view). See docs/design/2026_04_27_proposed_keyviz_cluster_fanout.md. + keyvizFanoutNodes = flag.String("keyvizFanoutNodes", "", "Comma-separated peer admin endpoints (host:port) for keyviz cluster-wide fan-out; empty disables") + keyvizFanoutTimeout = flag.Duration("keyvizFanoutTimeout", keyvizFanoutDefaultTimeout, "Per-peer timeout for keyviz fan-out HTTP calls") ) +// keyvizFanoutDefaultTimeout matches design 9 open-question 2: 2 s +// per peer call. Operators on weird networks override via the flag. +const keyvizFanoutDefaultTimeout = 2 * time.Second + const adminTokenMaxBytes = 4 << 10 // memoryPressureExit is set to true by the memwatch OnExceed callback to @@ -764,7 +775,11 @@ func startServers(in serversInput) error { // the handler hands ErrTablesNotLeader writes to the forwarder // which dials the leader over the cached gRPC pool. Without these // the handler falls back to 503 + Retry-After:1. - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, runner.sqsServer, in.coordinate, connCache, in.keyvizSampler); err != nil { + fanoutCfg := keyVizFanoutConfig{ + Nodes: parseCSV(*keyvizFanoutNodes), + Timeout: *keyvizFanoutTimeout, + } + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, runner.sqsServer, in.coordinate, connCache, in.keyvizSampler, fanoutCfg); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil diff --git a/main_admin.go b/main_admin.go index 35dc74ff..8156e452 100644 --- a/main_admin.go +++ b/main_admin.go @@ -65,6 +65,13 @@ type adminListenerConfig struct { // up the admin listener. It owns the flag → config translation and the // credentials loading so run() does not inherit that complexity. // +// keyVizFanoutConfig bundles the operator-supplied fan-out flags. +// Empty Nodes leaves the keyviz handler in single-node mode. +type keyVizFanoutConfig struct { + Nodes []string + Timeout time.Duration +} + // When admin is disabled (the default) the function returns immediately // without touching --s3CredentialsFile: pulling the admin feature into // a hard dependency on that file would break deployments that never @@ -80,6 +87,7 @@ func startAdminFromFlags( coordinate kv.Coordinator, connCache *kv.GRPCConnCache, keyvizSampler *keyviz.MemSampler, + keyvizFanoutCfg keyVizFanoutConfig, ) error { if !*adminEnabled { return nil @@ -127,7 +135,7 @@ func startAdminFromFlags( if err != nil { return errors.Wrap(err, "build admin leader forwarder") } - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, queuesSrc, forwarder, keyvizSampler, buildVersion()) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, queuesSrc, forwarder, keyvizSampler, keyvizFanoutCfg, buildVersion()) return err } @@ -630,6 +638,7 @@ func startAdminServer( queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler, + keyvizFanoutCfg keyVizFanoutConfig, version string, ) (string, error) { adminCfg := buildAdminConfig(cfg) @@ -637,7 +646,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, queues, forwarder, keyvizSampler) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, queues, forwarder, keyvizSampler, keyvizFanoutCfg) if err != nil { return "", err } @@ -677,7 +686,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler, keyvizFanoutCfg keyVizFanoutConfig) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -695,17 +704,18 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust return nil, errors.Wrap(err, "open embedded admin SPA") } server, err := admin.NewServer(admin.ServerDeps{ - Signer: signer, - Verifier: verifier, - Credentials: admin.MapCredentialStore(creds), - Roles: adminCfg.RoleIndex(), - ClusterInfo: cluster, - Tables: tables, - Buckets: buckets, - Queues: queues, - Forwarder: forwarder, - KeyViz: keyvizSourceFromSampler(keyvizSampler), - StaticFS: staticFS, + Signer: signer, + Verifier: verifier, + Credentials: admin.MapCredentialStore(creds), + Roles: adminCfg.RoleIndex(), + ClusterInfo: cluster, + Tables: tables, + Buckets: buckets, + Queues: queues, + Forwarder: forwarder, + KeyViz: keyvizSourceFromSampler(keyvizSampler), + KeyVizFanout: buildKeyVizFanout(adminCfg.Listen, keyvizFanoutCfg), + StaticFS: staticFS, AuthOpts: admin.AuthServiceOpts{ InsecureCookie: adminCfg.AllowInsecureDevCookie, }, @@ -838,6 +848,80 @@ func keyvizSourceFromSampler(s *keyviz.MemSampler) admin.KeyVizSource { return s } +// buildKeyVizFanout assembles the Phase 2-C fan-out aggregator from +// the operator-supplied flag values. selfListen is the local admin +// listener address (used to filter the local node out of the peer +// list so symmetric `--keyvizFanoutNodes=node1,node2,node3` configs +// stamped onto every host do not loop back over HTTP). Returns nil +// when no peers remain after filtering, leaving the keyviz handler +// in single-node mode. +// +// The matching rule is conservative: a peer is treated as "self" +// when its host:port equals selfListen literally OR when it equals +// selfListen with the host normalised to 127.0.0.1 (operators +// commonly bind admin to 0.0.0.0 but list 127.0.0.1 as the +// per-host fan-out entry). Anything else is treated as a peer. +func buildKeyVizFanout(selfListen string, cfg keyVizFanoutConfig) *admin.KeyVizFanout { + if len(cfg.Nodes) == 0 { + return nil + } + peers := make([]string, 0, len(cfg.Nodes)) + for _, n := range cfg.Nodes { + if isSelfFanoutNode(selfListen, n) { + continue + } + peers = append(peers, n) + } + if len(peers) == 0 { + return nil + } + f := admin.NewKeyVizFanout(selfListen, peers) + if cfg.Timeout > 0 { + f = f.WithTimeout(cfg.Timeout) + } + return f.WithLogger(slog.Default().With(slog.String("component", "admin.keyviz.fanout"))) +} + +// isSelfFanoutNode returns true when n names this node's own admin +// listener. A relaxed match handles the common bind-vs-advertise +// asymmetry: bind on 0.0.0.0:8080 but advertise (and list) as +// 127.0.0.1:8080. +func isSelfFanoutNode(selfListen, n string) bool { + n = strings.TrimSpace(n) + if n == "" { + return true + } + stripped := stripScheme(n) + if stripped == selfListen { + return true + } + host, port, err := net.SplitHostPort(stripped) + if err != nil { + return false + } + selfHost, selfPort, err := net.SplitHostPort(selfListen) + if err != nil || port != selfPort { + return false + } + if isWildcardHost(selfHost) { + return isLoopbackHost(host) + } + return host == selfHost +} + +func isWildcardHost(h string) bool { return h == "0.0.0.0" || h == "::" || h == "" } + +func isLoopbackHost(h string) bool { + return h == "127.0.0.1" || h == "localhost" || h == "::1" +} + +func stripScheme(raw string) string { + if i := strings.Index(raw, "://"); i >= 0 { + return raw[i+3:] + } + return raw +} + // parseCSV splits a flag value like "a,b,c" into a slice with empty and // whitespace-only entries dropped. It is not in shard_config.go because // admin's comma-separated list format is simpler than raft groups. diff --git a/main_admin_test.go b/main_admin_test.go index d3e298ef..e6246557 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -198,7 +198,7 @@ func TestStartAdminServer_DisabledNoOp(t *testing.T) { eg, ctx := errgroup.WithContext(context.Background()) defer func() { _ = eg.Wait() }() var lc net.ListenConfig - _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, adminListenerConfig{enabled: false}, nil, nil, nil, nil, nil, nil, nil, keyVizFanoutConfig{}, "") require.NoError(t, err) } @@ -211,7 +211,7 @@ func TestStartAdminServer_InvalidConfigRejected(t *testing.T) { listen: "127.0.0.1:0", // missing signing key } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, keyVizFanoutConfig{}, "") require.Error(t, err) } @@ -224,7 +224,7 @@ func TestStartAdminServer_NonLoopbackWithoutTLSRejected(t *testing.T) { listen: "0.0.0.0:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, keyVizFanoutConfig{}, "") require.Error(t, err) require.Contains(t, err.Error(), "TLS") } @@ -238,7 +238,7 @@ func TestStartAdminServer_RejectsMissingClusterSource(t *testing.T) { listen: "127.0.0.1:0", sessionSigningKey: freshKey(), } - _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, "") + _, err := startAdminServer(ctx, &lc, eg, cfg, map[string]string{}, nil, nil, nil, nil, nil, nil, keyVizFanoutConfig{}, "") require.Error(t, err) require.Contains(t, err.Error(), "cluster info source") } @@ -261,7 +261,7 @@ func TestStartAdminServer_ServesHealthz(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n1", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, keyVizFanoutConfig{}, "test") require.NoError(t, err) // Poll /admin/healthz until success or the test deadline. @@ -304,7 +304,7 @@ func TestStartAdminServer_ServesTLS(t *testing.T) { cluster := admin.ClusterInfoFunc(func(_ context.Context) (admin.ClusterInfo, error) { return admin.ClusterInfo{NodeID: "n-tls", Version: "test"}, nil }) - addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, "test") + addr, err := startAdminServer(eCtx, &lc, eg, cfg, map[string]string{}, cluster, nil, nil, nil, nil, nil, keyVizFanoutConfig{}, "test") require.NoError(t, err) transport := &http.Transport{TLSClientConfig: &tls.Config{