Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Database interface {
CheckVersionExists(ctx context.Context, tx pgx.Tx, serverName, version string) (bool, error)
// UnmarkAsLatest marks the current latest version of a server as no longer latest
UnmarkAsLatest(ctx context.Context, tx pgx.Tx, serverName string) error
// SetLatestVersion sets is_latest=true on the given version and false on all other
// versions of the same server. Passing an empty version clears is_latest for all rows.
// Callers must hold the per-server publish lock to avoid races.
SetLatestVersion(ctx context.Context, tx pgx.Tx, serverName, version string) error
// AcquirePublishLock acquires an exclusive advisory lock for publishing a server
// This prevents race conditions when multiple versions are published concurrently
AcquirePublishLock(ctx context.Context, tx pgx.Tx, serverName string) error
Expand Down
51 changes: 51 additions & 0 deletions internal/database/migrations/014_heal_is_latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-- Heal servers whose is_latest flag is on a deleted version (or missing entirely)
-- by promoting the highest-semver non-deleted version. Falls back to published_at
-- for non-semver versions and as a tiebreaker for prereleases.
-- See https://github.com/modelcontextprotocol/registry/issues/1081.
--
-- The migration framework wraps each migration in its own transaction, so no explicit
-- BEGIN/COMMIT here.

-- Pre-compute the version to promote per affected server. A temp table is used
-- (rather than CTEs) because the two UPDATEs that follow run as separate statements:
-- the unique partial index idx_unique_latest_per_server is non-deferrable and Postgres
-- checks it row-by-row inside a single UPDATE, so the clear and the set must be split.
CREATE TEMP TABLE _heal_picks ON COMMIT DROP AS
WITH broken AS (
-- Servers where no non-deleted version is flagged is_latest, but at least one
-- non-deleted version exists. Catches "is_latest is on a deleted row" and the
-- defensive case where no row has is_latest at all.
SELECT server_name
FROM servers
GROUP BY server_name
HAVING COUNT(*) FILTER (WHERE is_latest AND status <> 'deleted') = 0
AND COUNT(*) FILTER (WHERE status <> 'deleted') > 0
),
parsed AS (
SELECT s.server_name, s.version, s.published_at,
(regexp_match(s.version, '^(\d+)\.(\d+)\.(\d+)'))[1]::int AS major,
(regexp_match(s.version, '^(\d+)\.(\d+)\.(\d+)'))[2]::int AS minor,
(regexp_match(s.version, '^(\d+)\.(\d+)\.(\d+)'))[3]::int AS patch
FROM servers s
JOIN broken b USING (server_name)
WHERE s.status <> 'deleted'
)
SELECT DISTINCT ON (server_name) server_name, version
FROM parsed
ORDER BY server_name,
major DESC NULLS LAST,
minor DESC NULLS LAST,
patch DESC NULLS LAST,
published_at DESC;

-- Clear stale is_latest=true on rows of affected servers (typically the deleted row).
UPDATE servers s
SET is_latest = false
WHERE s.server_name IN (SELECT server_name FROM _heal_picks)
AND s.is_latest;

-- Promote the chosen version.
UPDATE servers s
SET is_latest = true
FROM _heal_picks p
WHERE s.server_name = p.server_name AND s.version = p.version;
34 changes: 34 additions & 0 deletions internal/database/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,40 @@ func (db *PostgreSQL) UnmarkAsLatest(ctx context.Context, tx pgx.Tx, serverName
return nil
}

// SetLatestVersion sets is_latest=true on the given version and false on all other versions
// of the same server. Passing an empty version clears is_latest for all rows.
//
// The clear and set are issued as separate statements because the unique partial index
// idx_unique_latest_per_server is non-deferrable and Postgres checks it row-by-row within
// a single UPDATE, which would trip when flipping one row's flag off and another's on.
func (db *PostgreSQL) SetLatestVersion(ctx context.Context, tx pgx.Tx, serverName, version string) error {
if ctx.Err() != nil {
return ctx.Err()
}

executor := db.getExecutor(tx)

if _, err := executor.Exec(ctx,
`UPDATE servers SET is_latest = false WHERE server_name = $1 AND is_latest = true AND version <> $2`,
serverName, version,
); err != nil {
return fmt.Errorf("failed to clear previous latest version: %w", err)
}

if version == "" {
return nil
}

if _, err := executor.Exec(ctx,
`UPDATE servers SET is_latest = true WHERE server_name = $1 AND version = $2 AND is_latest = false`,
serverName, version,
); err != nil {
return fmt.Errorf("failed to set latest version: %w", err)
}

return nil
}

// Close closes the database connection
func (db *PostgreSQL) Close() error {
db.pool.Close()
Expand Down
147 changes: 147 additions & 0 deletions internal/database/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database_test
import (
"context"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -1734,6 +1735,152 @@ func TestPostgreSQL_IncludeDeletedFilter(t *testing.T) {
})
}

// TestMigration014_HealIsLatest exercises migrations/014_heal_is_latest.sql against
// synthetic broken states by re-running its SQL after seeding rows directly. The migration
// itself ran via the template DB; since it's idempotent (only matches servers with no
// non-deleted is_latest row), re-running it here only acts on the rows we just broke.
func TestMigration014_HealIsLatest(t *testing.T) {
db := database.NewTestDB(t)
ctx := context.Background()

migrationSQL, err := os.ReadFile("migrations/014_heal_is_latest.sql")
require.NoError(t, err)

createVersion := func(t *testing.T, name, version string, publishedAt time.Time, status model.Status, isLatest bool) {
t.Helper()
serverJSON := &apiv0.ServerJSON{
Schema: model.CurrentSchemaURL,
Name: name,
Description: "test",
Version: version,
}
officialMeta := &apiv0.RegistryExtensions{
Status: status,
StatusChangedAt: publishedAt,
PublishedAt: publishedAt,
UpdatedAt: publishedAt,
IsLatest: isLatest,
}
_, err := db.CreateServer(ctx, nil, serverJSON, officialMeta)
require.NoError(t, err)
}

runHeal := func(t *testing.T) {
t.Helper()
err := db.InTransaction(ctx, func(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx, string(migrationSQL))
return err
})
require.NoError(t, err)
}

versionState := func(t *testing.T, name, version string) (model.Status, bool) {
t.Helper()
v, err := db.GetServerByNameAndVersion(ctx, nil, name, version, true)
require.NoError(t, err)
return v.Meta.Official.Status, v.Meta.Official.IsLatest
}

t.Run("kubernetes-mcp-server scenario picks highest semver", func(t *testing.T) {
name := "io.test/k8s-scenario"
base := time.Now().Add(-10 * time.Hour)
// 1.0.0 published first, then 0.0.50, 0.0.51, 0.0.59, 0.0.60, 0.0.61.
// 1.0.0 was the original latest and got soft-deleted, leaving is_latest stranded.
createVersion(t, name, "1.0.0", base, model.StatusDeleted, true)
createVersion(t, name, "0.0.50", base.Add(1*time.Hour), model.StatusActive, false)
createVersion(t, name, "0.0.51", base.Add(2*time.Hour), model.StatusActive, false)
createVersion(t, name, "0.0.59", base.Add(3*time.Hour), model.StatusActive, false)
createVersion(t, name, "0.0.60", base.Add(4*time.Hour), model.StatusActive, false)
createVersion(t, name, "0.0.61", base.Add(5*time.Hour), model.StatusActive, false)

runHeal(t)

_, isLatest := versionState(t, name, "1.0.0")
assert.False(t, isLatest, "deleted 1.0.0 should no longer be latest")
_, isLatest = versionState(t, name, "0.0.61")
assert.True(t, isLatest, "highest active version should become latest")
for _, v := range []string{"0.0.50", "0.0.51", "0.0.59", "0.0.60"} {
_, isLatest := versionState(t, name, v)
assert.False(t, isLatest, "version %s should not be latest", v)
}
})

t.Run("backport scenario picks highest semver not most recent", func(t *testing.T) {
// Published 2.0.0 (deleted), then 1.0.1 hotfix, then 1.0.0 (older patch backported later).
// Most-recent-published would pick 1.0.0; semver-aware picks 1.0.1.
name := "io.test/backport-scenario"
base := time.Now().Add(-10 * time.Hour)
createVersion(t, name, "2.0.0", base, model.StatusDeleted, true)
createVersion(t, name, "1.0.1", base.Add(1*time.Hour), model.StatusActive, false)
createVersion(t, name, "1.0.0", base.Add(2*time.Hour), model.StatusActive, false)

runHeal(t)

_, isLatest := versionState(t, name, "1.0.1")
assert.True(t, isLatest, "1.0.1 should win on semver despite 1.0.0 being published more recently")
_, isLatest = versionState(t, name, "1.0.0")
assert.False(t, isLatest)
})

t.Run("no is_latest row at all gets healed", func(t *testing.T) {
// Defensive case: nothing flagged latest, but active versions exist.
name := "io.test/no-latest-flag"
base := time.Now().Add(-10 * time.Hour)
createVersion(t, name, "1.0.0", base, model.StatusActive, false)
createVersion(t, name, "1.1.0", base.Add(1*time.Hour), model.StatusActive, false)

runHeal(t)

_, isLatest := versionState(t, name, "1.1.0")
assert.True(t, isLatest)
_, isLatest = versionState(t, name, "1.0.0")
assert.False(t, isLatest)
})

t.Run("all-deleted server is left untouched", func(t *testing.T) {
// No non-deleted version → nothing to promote, leave existing flags alone so the
// server remains addressable via includeDeleted=true admin lookups.
name := "io.test/all-deleted"
base := time.Now().Add(-10 * time.Hour)
createVersion(t, name, "1.0.0", base, model.StatusDeleted, true)
createVersion(t, name, "2.0.0", base.Add(1*time.Hour), model.StatusDeleted, false)

runHeal(t)

_, isLatest := versionState(t, name, "1.0.0")
assert.True(t, isLatest, "all-deleted server should keep its existing latest flag")
_, isLatest = versionState(t, name, "2.0.0")
assert.False(t, isLatest)
})

t.Run("healthy server is left untouched", func(t *testing.T) {
name := "io.test/healthy"
base := time.Now().Add(-10 * time.Hour)
createVersion(t, name, "1.0.0", base, model.StatusActive, false)
createVersion(t, name, "2.0.0", base.Add(1*time.Hour), model.StatusActive, true)

runHeal(t)

_, isLatest := versionState(t, name, "2.0.0")
assert.True(t, isLatest)
_, isLatest = versionState(t, name, "1.0.0")
assert.False(t, isLatest)
})

t.Run("non-semver versions fall back to published_at", func(t *testing.T) {
name := "io.test/non-semver"
base := time.Now().Add(-10 * time.Hour)
createVersion(t, name, "rolling", base, model.StatusDeleted, true)
createVersion(t, name, "build-100", base.Add(1*time.Hour), model.StatusActive, false)
createVersion(t, name, "build-200", base.Add(2*time.Hour), model.StatusActive, false)

runHeal(t)

_, isLatest := versionState(t, name, "build-200")
assert.True(t, isLatest, "without semver, most recently published wins")
})
}

// Helper functions for creating pointers to basic types
func stringPtr(s string) *string {
return &s
Expand Down
80 changes: 75 additions & 5 deletions internal/service/registry_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,59 @@ func (s *registryServiceImpl) createServerInTransaction(ctx context.Context, tx
return s.db.CreateServer(ctx, tx, &serverJSON, officialMeta)
}

// recalculateLatest picks the highest non-deleted version of the given server and flags it
// as latest, clearing is_latest on every other row. If every version is deleted, the highest
// deleted version keeps the flag so admin lookups (GetServerByName with includeDeleted=true)
// still find the server. Caller must hold the per-server publish lock.
func (s *registryServiceImpl) recalculateLatest(ctx context.Context, tx pgx.Tx, serverName string) error {
versions, err := s.db.GetAllVersionsByServerName(ctx, tx, serverName, true)
if err != nil {
if errors.Is(err, database.ErrNotFound) {
return s.db.SetLatestVersion(ctx, tx, serverName, "")
}
return fmt.Errorf("failed to load versions for latest recalculation: %w", err)
}

winner := pickLatestVersion(versions, false)
if winner == nil {
// No non-deleted versions — fall back to highest deleted so the server is still
// addressable via includeDeleted=true lookups.
winner = pickLatestVersion(versions, true)
}

winnerVersion := ""
if winner != nil {
winnerVersion = winner.Server.Version
}
return s.db.SetLatestVersion(ctx, tx, serverName, winnerVersion)
}

// pickLatestVersion returns the highest version from the given slice. If allowDeleted is
// false, deleted versions are skipped.
func pickLatestVersion(versions []*apiv0.ServerResponse, allowDeleted bool) *apiv0.ServerResponse {
var winner *apiv0.ServerResponse
for _, v := range versions {
if !allowDeleted && v.Meta.Official != nil && v.Meta.Official.Status == model.StatusDeleted {
continue
}
if winner == nil {
winner = v
continue
}
var winnerPublishedAt, candidatePublishedAt time.Time
if winner.Meta.Official != nil {
winnerPublishedAt = winner.Meta.Official.PublishedAt
}
if v.Meta.Official != nil {
candidatePublishedAt = v.Meta.Official.PublishedAt
}
if CompareVersions(v.Server.Version, winner.Server.Version, candidatePublishedAt, winnerPublishedAt) > 0 {
winner = v
}
}
return winner
}

// validateNoDuplicateRemoteURLs checks that no other server is using the same remote URLs
func (s *registryServiceImpl) validateNoDuplicateRemoteURLs(ctx context.Context, tx pgx.Tx, serverDetail apiv0.ServerJSON) error {
// Check each remote URL in the new server for conflicts
Expand Down Expand Up @@ -237,11 +290,14 @@ func (s *registryServiceImpl) updateServerInTransaction(ctx context.Context, tx

// Handle status change if provided
if statusChange != nil {
updatedWithStatus, err := s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage)
if err != nil {
if _, err := s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage); err != nil {
return nil, err
}
return updatedWithStatus, nil
if err := s.recalculateLatest(ctx, tx, serverName); err != nil {
return nil, err
}
// Re-read to pick up the possibly updated is_latest flag.
return s.db.GetServerByNameAndVersion(ctx, tx, serverName, version, true)
}

return updatedServerResponse, nil
Expand Down Expand Up @@ -279,7 +335,14 @@ func (s *registryServiceImpl) updateServerStatusInTransaction(ctx context.Contex
}

// Update only the status metadata
return s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage)
if _, err := s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage); err != nil {
return nil, err
}
if err := s.recalculateLatest(ctx, tx, serverName); err != nil {
return nil, err
}
// Re-read to pick up the possibly updated is_latest flag.
return s.db.GetServerByNameAndVersion(ctx, tx, serverName, version, true)
}

// UpdateAllVersionsStatus updates the status metadata of all versions of a server in a single transaction
Expand Down Expand Up @@ -319,5 +382,12 @@ func (s *registryServiceImpl) updateAllVersionsStatusInTransaction(ctx context.C
}

// Update all versions' status in a single database call
return s.db.SetAllVersionsStatus(ctx, tx, serverName, statusChange.NewStatus, statusChange.StatusMessage)
if _, err := s.db.SetAllVersionsStatus(ctx, tx, serverName, statusChange.NewStatus, statusChange.StatusMessage); err != nil {
return nil, err
}
if err := s.recalculateLatest(ctx, tx, serverName); err != nil {
return nil, err
}
// Re-read to pick up the possibly updated is_latest flags.
return s.db.GetAllVersionsByServerName(ctx, tx, serverName, true)
}
Loading
Loading