Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [FEATURE] Querier: Add experimental per-tenant cardinality API (`GET /api/v1/cardinality`) that exposes top-N metrics by series count, label names by distinct value count, and label-value pairs by series count from ingester TSDB heads (`source=head`) and compacted blocks (`source=blocks`). Gated behind `-querier.cardinality-api-enabled` (default `false`). #7384
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
Expand Down
20 changes: 20 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4311,6 +4311,26 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -limits.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]

# [Experimental] Enables the per-tenant cardinality API endpoint. When disabled,
# the endpoint returns HTTP 403.
# CLI flag: -querier.cardinality-api-enabled
[cardinality_api_enabled: <boolean> | default = false]

# [Experimental] Maximum allowed time range (end - start) for source=blocks
# cardinality queries.
# CLI flag: -querier.cardinality-max-query-range
[cardinality_max_query_range: <duration> | default = 1d]

# [Experimental] Maximum number of concurrent cardinality requests per tenant.
# Excess requests are rejected with HTTP 429.
# CLI flag: -querier.cardinality-max-concurrent-requests
[cardinality_max_concurrent_requests: <int> | default = 2]

# [Experimental] Per-request timeout for cardinality computation. On timeout,
# partial results are returned.
# CLI flag: -querier.cardinality-query-timeout
[cardinality_query_timeout: <duration> | default = 1m]

# The maximum number of rows that can be fetched when querying parquet storage.
# Each row maps to a series in a parquet file. This limit applies before
# materializing chunks. 0 to disable.
Expand Down
4 changes: 4 additions & 0 deletions docs/getting-started/cortex-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ compactor:
frontend_worker:
match_max_concurrent: true

# https://cortexmetrics.io/docs/configuration/configuration-file/#limits_config
limits:
cardinality_api_enabled: true

# https://cortexmetrics.io/docs/configuration/configuration-file/#ruler_config
ruler:
enable_api: true
Expand Down
175 changes: 175 additions & 0 deletions integration/cardinality_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//go:build requires_docker

package integration

import (
"encoding/json"
"testing"
"time"

"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

type cardinalityAPIResponse struct {
Status string `json:"status"`
Data struct {
NumSeries uint64 `json:"numSeries"`
Approximated bool `json:"approximated"`
SeriesCountByMetricName []struct {
Name string `json:"name"`
Value uint64 `json:"value"`
} `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []struct {
Name string `json:"name"`
Value uint64 `json:"value"`
} `json:"labelValueCountByLabelName"`
SeriesCountByLabelValuePair []struct {
Name string `json:"name"`
Value uint64 `json:"value"`
} `json:"seriesCountByLabelValuePair"`
} `json:"data"`
}

func TestCardinalityAPI(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Configure the blocks storage to frequently compact TSDB head and ship blocks to storage.
flags := mergeFlags(BlocksStorageFlags(), AlertmanagerLocalFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
"-querier.cardinality-api-enabled": "true",
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Use inmemory ring to avoid needing Consul.
"-ring.store": "inmemory",
"-compactor.ring.store": "inmemory",
"-store-gateway.sharding-ring.store": "inmemory",
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.replication-factor": "1",
})

require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push multiple series with different metric names and labels.
now := time.Now()
series1, _ := generateSeries("test_metric_1", now, prompb.Label{Name: "job", Value: "api"})
series2, _ := generateSeries("test_metric_2", now, prompb.Label{Name: "job", Value: "worker"})
series3, _ := generateSeries("test_metric_3", now, prompb.Label{Name: "job", Value: "api"}, prompb.Label{Name: "instance", Value: "host1"})

for _, s := range [][]prompb.TimeSeries{series1, series2, series3} {
res, err := c.Push(s)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// --- Test 1: Head path ---
t.Run("head path returns cardinality data", func(t *testing.T) {
resp, body, err := c.CardinalityRaw("head", 10, time.Time{}, time.Time{})
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "body: %s", string(body))

var result cardinalityAPIResponse
require.NoError(t, json.Unmarshal(body, &result))

assert.Equal(t, "success", result.Status)
assert.GreaterOrEqual(t, result.Data.NumSeries, uint64(3))
assert.NotEmpty(t, result.Data.SeriesCountByMetricName, "seriesCountByMetricName should not be empty")
assert.NotEmpty(t, result.Data.LabelValueCountByLabelName, "labelValueCountByLabelName should not be empty")
assert.NotEmpty(t, result.Data.SeriesCountByLabelValuePair, "seriesCountByLabelValuePair should not be empty")
})

// --- Test 2: Default source (should be head) ---
t.Run("default source is head", func(t *testing.T) {
resp, body, err := c.CardinalityRaw("", 10, time.Time{}, time.Time{})
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "body: %s", string(body))

var result cardinalityAPIResponse
require.NoError(t, json.Unmarshal(body, &result))
assert.Equal(t, "success", result.Status)
assert.GreaterOrEqual(t, result.Data.NumSeries, uint64(3))
})

// --- Test 3: Parameter validation ---
t.Run("invalid source returns 400", func(t *testing.T) {
resp, _, err := c.CardinalityRaw("invalid", 0, time.Time{}, time.Time{})
require.NoError(t, err)
assert.Equal(t, 400, resp.StatusCode)
})

// --- Test 4: Blocks path ---
// Push series at timestamps spanning two block ranges to trigger head compaction and shipping.
t.Run("blocks path returns cardinality data", func(t *testing.T) {
// Push a series at a timestamp in a different block range to trigger compaction of the first block.
series4, _ := generateSeries("test_metric_4", now.Add(blockRangePeriod*2),
prompb.Label{Name: "job", Value: "scheduler"})
res, err := c.Push(series4)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Wait until at least one block is shipped from the ingester.
require.NoError(t, cortex.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_ingester_shipper_uploads_total"},
e2e.WaitMissingMetrics,
))

// Wait until the store gateway has loaded the shipped blocks.
require.NoError(t, cortex.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_bucket_store_blocks_loaded"},
e2e.WaitMissingMetrics,
))

// Query the blocks path with retries. The querier's block finder and
// store gateway may need additional sync cycles before returning data.
start := now.Add(-1 * time.Hour)
end := now.Add(1 * time.Hour)
deadline := time.Now().Add(30 * time.Second)

var result cardinalityAPIResponse
for time.Now().Before(deadline) {
resp, body, err := c.CardinalityRaw("blocks", 10, start, end)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "body: %s", string(body))
require.NoError(t, json.Unmarshal(body, &result))

if len(result.Data.LabelValueCountByLabelName) > 0 {
break
}
time.Sleep(1 * time.Second)
}

assert.Equal(t, "success", result.Status)
assert.NotEmpty(t, result.Data.LabelValueCountByLabelName, "labelValueCountByLabelName should not be empty after retries")
})

// --- Test 5: Blocks path requires start/end ---
t.Run("blocks path without start/end returns 400", func(t *testing.T) {
resp, _, err := c.CardinalityRaw("blocks", 0, time.Time{}, time.Time{})
require.NoError(t, err)
assert.Equal(t, 400, resp.StatusCode)
})
}
25 changes: 25 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,31 @@ func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTi
return c.query(u.String(), headers)
}

// CardinalityRaw runs a cardinality request directly against the querier API.
func (c *Client) CardinalityRaw(source string, limit int, start, end time.Time) (*http.Response, []byte, error) {
u := &url.URL{
Scheme: "http",
Path: fmt.Sprintf("%s/api/prom/api/v1/cardinality", c.querierAddress),
}
q := u.Query()

if source != "" {
q.Set("source", source)
}
if limit > 0 {
q.Set("limit", strconv.Itoa(limit))
}
if !start.IsZero() {
q.Set("start", FormatTime(start))
}
if !end.IsZero() {
q.Set("end", FormatTime(end))
}

u.RawQuery = q.Encode()
return c.query(u.String(), nil)
}

// RemoteRead runs a remote read query.
func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) {
startMs := start.UnixMilli()
Expand Down
3 changes: 2 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ type Cortex struct {

// Queryables that the querier should use to query the long
// term storage. It depends on the storage engine used.
StoreQueryables []querier.QueryableWithFilter
StoreQueryables []querier.QueryableWithFilter
BlocksStoreQueryable *querier.BlocksStoreQueryable
}

// New makes a new Cortex.
Expand Down
8 changes: 8 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"net/http"
"path"
"runtime"
"runtime/debug"

Expand Down Expand Up @@ -292,6 +293,12 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
// Register the default endpoints that are always enabled for the querier module
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)

// Register the cardinality endpoint directly on the external API server.
// This endpoint bypasses the query-frontend and is served directly by the querier.
cardinalityHandler := querier.CardinalityHandler(t.Distributor, t.BlocksStoreQueryable, t.OverridesConfig, prometheus.DefaultRegisterer)
t.API.RegisterRoute(path.Join(t.Cfg.API.PrometheusHTTPPrefix, "/api/v1/cardinality"), cardinalityHandler, true, "GET")
t.API.RegisterRoute(path.Join(t.Cfg.API.LegacyHTTPPrefix, "/api/v1/cardinality"), cardinalityHandler, true, "GET")

return nil, nil
}

Expand Down Expand Up @@ -447,6 +454,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
if q, err := initBlockStoreQueryable(t.Cfg, t.OverridesConfig, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to initialize querier: %v", err)
} else {
t.BlocksStoreQueryable = q
queriable = q
if t.Cfg.Querier.EnableParquetQueryable {
pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.OverridesConfig, q, util_log.Logger, prometheus.DefaultRegisterer)
Expand Down
Loading
Loading