diff --git a/internal/database/database.go b/internal/database/database.go index c9acf0736..b4a797fae 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -57,6 +57,10 @@ type Database interface { CheckVersionExists(ctx context.Context, tx pgx.Tx, serverName, version string) (bool, error) // UnmarkAsLatest marks the current latest version of a server as no longer latest UnmarkAsLatest(ctx context.Context, tx pgx.Tx, serverName string) error + // SetLatestVersion sets is_latest=true on the given version and false on all other + // versions of the same server. Passing an empty version clears is_latest for all rows. + // Callers must hold the per-server publish lock to avoid races. + SetLatestVersion(ctx context.Context, tx pgx.Tx, serverName, version string) error // AcquirePublishLock acquires an exclusive advisory lock for publishing a server // This prevents race conditions when multiple versions are published concurrently AcquirePublishLock(ctx context.Context, tx pgx.Tx, serverName string) error diff --git a/internal/database/migrations/014_heal_is_latest.sql b/internal/database/migrations/014_heal_is_latest.sql new file mode 100644 index 000000000..30e860ca2 --- /dev/null +++ b/internal/database/migrations/014_heal_is_latest.sql @@ -0,0 +1,51 @@ +-- Heal servers whose is_latest flag is on a deleted version (or missing entirely) +-- by promoting the highest-semver non-deleted version. Falls back to published_at +-- for non-semver versions and as a tiebreaker for prereleases. +-- See https://github.com/modelcontextprotocol/registry/issues/1081. +-- +-- The migration framework wraps each migration in its own transaction, so no explicit +-- BEGIN/COMMIT here. + +-- Pre-compute the version to promote per affected server. A temp table is used +-- (rather than CTEs) because the two UPDATEs that follow run as separate statements: +-- the unique partial index idx_unique_latest_per_server is non-deferrable and Postgres +-- checks it row-by-row inside a single UPDATE, so the clear and the set must be split. +CREATE TEMP TABLE _heal_picks ON COMMIT DROP AS +WITH broken AS ( + -- Servers where no non-deleted version is flagged is_latest, but at least one + -- non-deleted version exists. Catches "is_latest is on a deleted row" and the + -- defensive case where no row has is_latest at all. + SELECT server_name + FROM servers + GROUP BY server_name + HAVING COUNT(*) FILTER (WHERE is_latest AND status <> 'deleted') = 0 + AND COUNT(*) FILTER (WHERE status <> 'deleted') > 0 +), +parsed AS ( + SELECT s.server_name, s.version, s.published_at, + (regexp_match(s.version, '^(\d+)\.(\d+)\.(\d+)'))[1]::int AS major, + (regexp_match(s.version, '^(\d+)\.(\d+)\.(\d+)'))[2]::int AS minor, + (regexp_match(s.version, '^(\d+)\.(\d+)\.(\d+)'))[3]::int AS patch + FROM servers s + JOIN broken b USING (server_name) + WHERE s.status <> 'deleted' +) +SELECT DISTINCT ON (server_name) server_name, version +FROM parsed +ORDER BY server_name, + major DESC NULLS LAST, + minor DESC NULLS LAST, + patch DESC NULLS LAST, + published_at DESC; + +-- Clear stale is_latest=true on rows of affected servers (typically the deleted row). +UPDATE servers s +SET is_latest = false +WHERE s.server_name IN (SELECT server_name FROM _heal_picks) + AND s.is_latest; + +-- Promote the chosen version. +UPDATE servers s +SET is_latest = true +FROM _heal_picks p +WHERE s.server_name = p.server_name AND s.version = p.version; diff --git a/internal/database/postgres.go b/internal/database/postgres.go index a86e44b13..e40359aa7 100644 --- a/internal/database/postgres.go +++ b/internal/database/postgres.go @@ -877,6 +877,40 @@ func (db *PostgreSQL) UnmarkAsLatest(ctx context.Context, tx pgx.Tx, serverName return nil } +// SetLatestVersion sets is_latest=true on the given version and false on all other versions +// of the same server. Passing an empty version clears is_latest for all rows. +// +// The clear and set are issued as separate statements because the unique partial index +// idx_unique_latest_per_server is non-deferrable and Postgres checks it row-by-row within +// a single UPDATE, which would trip when flipping one row's flag off and another's on. +func (db *PostgreSQL) SetLatestVersion(ctx context.Context, tx pgx.Tx, serverName, version string) error { + if ctx.Err() != nil { + return ctx.Err() + } + + executor := db.getExecutor(tx) + + if _, err := executor.Exec(ctx, + `UPDATE servers SET is_latest = false WHERE server_name = $1 AND is_latest = true AND version <> $2`, + serverName, version, + ); err != nil { + return fmt.Errorf("failed to clear previous latest version: %w", err) + } + + if version == "" { + return nil + } + + if _, err := executor.Exec(ctx, + `UPDATE servers SET is_latest = true WHERE server_name = $1 AND version = $2 AND is_latest = false`, + serverName, version, + ); err != nil { + return fmt.Errorf("failed to set latest version: %w", err) + } + + return nil +} + // Close closes the database connection func (db *PostgreSQL) Close() error { db.pool.Close() diff --git a/internal/database/postgres_test.go b/internal/database/postgres_test.go index 9b6a5daea..d0e4b9700 100644 --- a/internal/database/postgres_test.go +++ b/internal/database/postgres_test.go @@ -3,6 +3,7 @@ package database_test import ( "context" "fmt" + "os" "testing" "time" @@ -1734,6 +1735,152 @@ func TestPostgreSQL_IncludeDeletedFilter(t *testing.T) { }) } +// TestMigration014_HealIsLatest exercises migrations/014_heal_is_latest.sql against +// synthetic broken states by re-running its SQL after seeding rows directly. The migration +// itself ran via the template DB; since it's idempotent (only matches servers with no +// non-deleted is_latest row), re-running it here only acts on the rows we just broke. +func TestMigration014_HealIsLatest(t *testing.T) { + db := database.NewTestDB(t) + ctx := context.Background() + + migrationSQL, err := os.ReadFile("migrations/014_heal_is_latest.sql") + require.NoError(t, err) + + createVersion := func(t *testing.T, name, version string, publishedAt time.Time, status model.Status, isLatest bool) { + t.Helper() + serverJSON := &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, + Name: name, + Description: "test", + Version: version, + } + officialMeta := &apiv0.RegistryExtensions{ + Status: status, + StatusChangedAt: publishedAt, + PublishedAt: publishedAt, + UpdatedAt: publishedAt, + IsLatest: isLatest, + } + _, err := db.CreateServer(ctx, nil, serverJSON, officialMeta) + require.NoError(t, err) + } + + runHeal := func(t *testing.T) { + t.Helper() + err := db.InTransaction(ctx, func(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, string(migrationSQL)) + return err + }) + require.NoError(t, err) + } + + versionState := func(t *testing.T, name, version string) (model.Status, bool) { + t.Helper() + v, err := db.GetServerByNameAndVersion(ctx, nil, name, version, true) + require.NoError(t, err) + return v.Meta.Official.Status, v.Meta.Official.IsLatest + } + + t.Run("kubernetes-mcp-server scenario picks highest semver", func(t *testing.T) { + name := "io.test/k8s-scenario" + base := time.Now().Add(-10 * time.Hour) + // 1.0.0 published first, then 0.0.50, 0.0.51, 0.0.59, 0.0.60, 0.0.61. + // 1.0.0 was the original latest and got soft-deleted, leaving is_latest stranded. + createVersion(t, name, "1.0.0", base, model.StatusDeleted, true) + createVersion(t, name, "0.0.50", base.Add(1*time.Hour), model.StatusActive, false) + createVersion(t, name, "0.0.51", base.Add(2*time.Hour), model.StatusActive, false) + createVersion(t, name, "0.0.59", base.Add(3*time.Hour), model.StatusActive, false) + createVersion(t, name, "0.0.60", base.Add(4*time.Hour), model.StatusActive, false) + createVersion(t, name, "0.0.61", base.Add(5*time.Hour), model.StatusActive, false) + + runHeal(t) + + _, isLatest := versionState(t, name, "1.0.0") + assert.False(t, isLatest, "deleted 1.0.0 should no longer be latest") + _, isLatest = versionState(t, name, "0.0.61") + assert.True(t, isLatest, "highest active version should become latest") + for _, v := range []string{"0.0.50", "0.0.51", "0.0.59", "0.0.60"} { + _, isLatest := versionState(t, name, v) + assert.False(t, isLatest, "version %s should not be latest", v) + } + }) + + t.Run("backport scenario picks highest semver not most recent", func(t *testing.T) { + // Published 2.0.0 (deleted), then 1.0.1 hotfix, then 1.0.0 (older patch backported later). + // Most-recent-published would pick 1.0.0; semver-aware picks 1.0.1. + name := "io.test/backport-scenario" + base := time.Now().Add(-10 * time.Hour) + createVersion(t, name, "2.0.0", base, model.StatusDeleted, true) + createVersion(t, name, "1.0.1", base.Add(1*time.Hour), model.StatusActive, false) + createVersion(t, name, "1.0.0", base.Add(2*time.Hour), model.StatusActive, false) + + runHeal(t) + + _, isLatest := versionState(t, name, "1.0.1") + assert.True(t, isLatest, "1.0.1 should win on semver despite 1.0.0 being published more recently") + _, isLatest = versionState(t, name, "1.0.0") + assert.False(t, isLatest) + }) + + t.Run("no is_latest row at all gets healed", func(t *testing.T) { + // Defensive case: nothing flagged latest, but active versions exist. + name := "io.test/no-latest-flag" + base := time.Now().Add(-10 * time.Hour) + createVersion(t, name, "1.0.0", base, model.StatusActive, false) + createVersion(t, name, "1.1.0", base.Add(1*time.Hour), model.StatusActive, false) + + runHeal(t) + + _, isLatest := versionState(t, name, "1.1.0") + assert.True(t, isLatest) + _, isLatest = versionState(t, name, "1.0.0") + assert.False(t, isLatest) + }) + + t.Run("all-deleted server is left untouched", func(t *testing.T) { + // No non-deleted version → nothing to promote, leave existing flags alone so the + // server remains addressable via includeDeleted=true admin lookups. + name := "io.test/all-deleted" + base := time.Now().Add(-10 * time.Hour) + createVersion(t, name, "1.0.0", base, model.StatusDeleted, true) + createVersion(t, name, "2.0.0", base.Add(1*time.Hour), model.StatusDeleted, false) + + runHeal(t) + + _, isLatest := versionState(t, name, "1.0.0") + assert.True(t, isLatest, "all-deleted server should keep its existing latest flag") + _, isLatest = versionState(t, name, "2.0.0") + assert.False(t, isLatest) + }) + + t.Run("healthy server is left untouched", func(t *testing.T) { + name := "io.test/healthy" + base := time.Now().Add(-10 * time.Hour) + createVersion(t, name, "1.0.0", base, model.StatusActive, false) + createVersion(t, name, "2.0.0", base.Add(1*time.Hour), model.StatusActive, true) + + runHeal(t) + + _, isLatest := versionState(t, name, "2.0.0") + assert.True(t, isLatest) + _, isLatest = versionState(t, name, "1.0.0") + assert.False(t, isLatest) + }) + + t.Run("non-semver versions fall back to published_at", func(t *testing.T) { + name := "io.test/non-semver" + base := time.Now().Add(-10 * time.Hour) + createVersion(t, name, "rolling", base, model.StatusDeleted, true) + createVersion(t, name, "build-100", base.Add(1*time.Hour), model.StatusActive, false) + createVersion(t, name, "build-200", base.Add(2*time.Hour), model.StatusActive, false) + + runHeal(t) + + _, isLatest := versionState(t, name, "build-200") + assert.True(t, isLatest, "without semver, most recently published wins") + }) +} + // Helper functions for creating pointers to basic types func stringPtr(s string) *string { return &s diff --git a/internal/service/registry_service.go b/internal/service/registry_service.go index 05c4f168b..4f4b8a00d 100644 --- a/internal/service/registry_service.go +++ b/internal/service/registry_service.go @@ -164,6 +164,59 @@ func (s *registryServiceImpl) createServerInTransaction(ctx context.Context, tx return s.db.CreateServer(ctx, tx, &serverJSON, officialMeta) } +// recalculateLatest picks the highest non-deleted version of the given server and flags it +// as latest, clearing is_latest on every other row. If every version is deleted, the highest +// deleted version keeps the flag so admin lookups (GetServerByName with includeDeleted=true) +// still find the server. Caller must hold the per-server publish lock. +func (s *registryServiceImpl) recalculateLatest(ctx context.Context, tx pgx.Tx, serverName string) error { + versions, err := s.db.GetAllVersionsByServerName(ctx, tx, serverName, true) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + return s.db.SetLatestVersion(ctx, tx, serverName, "") + } + return fmt.Errorf("failed to load versions for latest recalculation: %w", err) + } + + winner := pickLatestVersion(versions, false) + if winner == nil { + // No non-deleted versions — fall back to highest deleted so the server is still + // addressable via includeDeleted=true lookups. + winner = pickLatestVersion(versions, true) + } + + winnerVersion := "" + if winner != nil { + winnerVersion = winner.Server.Version + } + return s.db.SetLatestVersion(ctx, tx, serverName, winnerVersion) +} + +// pickLatestVersion returns the highest version from the given slice. If allowDeleted is +// false, deleted versions are skipped. +func pickLatestVersion(versions []*apiv0.ServerResponse, allowDeleted bool) *apiv0.ServerResponse { + var winner *apiv0.ServerResponse + for _, v := range versions { + if !allowDeleted && v.Meta.Official != nil && v.Meta.Official.Status == model.StatusDeleted { + continue + } + if winner == nil { + winner = v + continue + } + var winnerPublishedAt, candidatePublishedAt time.Time + if winner.Meta.Official != nil { + winnerPublishedAt = winner.Meta.Official.PublishedAt + } + if v.Meta.Official != nil { + candidatePublishedAt = v.Meta.Official.PublishedAt + } + if CompareVersions(v.Server.Version, winner.Server.Version, candidatePublishedAt, winnerPublishedAt) > 0 { + winner = v + } + } + return winner +} + // validateNoDuplicateRemoteURLs checks that no other server is using the same remote URLs func (s *registryServiceImpl) validateNoDuplicateRemoteURLs(ctx context.Context, tx pgx.Tx, serverDetail apiv0.ServerJSON) error { // Check each remote URL in the new server for conflicts @@ -237,11 +290,14 @@ func (s *registryServiceImpl) updateServerInTransaction(ctx context.Context, tx // Handle status change if provided if statusChange != nil { - updatedWithStatus, err := s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage) - if err != nil { + if _, err := s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage); err != nil { return nil, err } - return updatedWithStatus, nil + if err := s.recalculateLatest(ctx, tx, serverName); err != nil { + return nil, err + } + // Re-read to pick up the possibly updated is_latest flag. + return s.db.GetServerByNameAndVersion(ctx, tx, serverName, version, true) } return updatedServerResponse, nil @@ -279,7 +335,14 @@ func (s *registryServiceImpl) updateServerStatusInTransaction(ctx context.Contex } // Update only the status metadata - return s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage) + if _, err := s.db.SetServerStatus(ctx, tx, serverName, version, statusChange.NewStatus, statusChange.StatusMessage); err != nil { + return nil, err + } + if err := s.recalculateLatest(ctx, tx, serverName); err != nil { + return nil, err + } + // Re-read to pick up the possibly updated is_latest flag. + return s.db.GetServerByNameAndVersion(ctx, tx, serverName, version, true) } // UpdateAllVersionsStatus updates the status metadata of all versions of a server in a single transaction @@ -319,5 +382,12 @@ func (s *registryServiceImpl) updateAllVersionsStatusInTransaction(ctx context.C } // Update all versions' status in a single database call - return s.db.SetAllVersionsStatus(ctx, tx, serverName, statusChange.NewStatus, statusChange.StatusMessage) + if _, err := s.db.SetAllVersionsStatus(ctx, tx, serverName, statusChange.NewStatus, statusChange.StatusMessage); err != nil { + return nil, err + } + if err := s.recalculateLatest(ctx, tx, serverName); err != nil { + return nil, err + } + // Re-read to pick up the possibly updated is_latest flags. + return s.db.GetAllVersionsByServerName(ctx, tx, serverName, true) } diff --git a/internal/service/registry_service_test.go b/internal/service/registry_service_test.go index 63b652127..05ec8477a 100644 --- a/internal/service/registry_service_test.go +++ b/internal/service/registry_service_test.go @@ -982,6 +982,160 @@ func TestUpdateServerStatus_NoConflictWhenRestoringWithUniqueURLs(t *testing.T) assert.Equal(t, model.StatusActive, result.Meta.Official.Status) } +func TestRecalculateLatest_PromotesNextHighestWhenLatestSoftDeleted(t *testing.T) { + ctx := context.Background() + testDB := database.NewTestDB(t) + service := NewRegistryService(testDB, &config.Config{EnableRegistryValidation: false}) + + serverName := "com.example/delete-latest-server" + + // Publish 1.0.0, then 0.0.59. 1.0.0 is latest. + _, err := service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v1", Version: "1.0.0", + }) + require.NoError(t, err) + _, err = service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v0.0.59", Version: "0.0.59", + }) + require.NoError(t, err) + + // Soft-delete 1.0.0. + _, err = service.UpdateServerStatus(ctx, serverName, "1.0.0", &StatusChangeRequest{ + NewStatus: model.StatusDeleted, + }) + require.NoError(t, err) + + // 0.0.59 should now be latest. + latest, err := service.GetServerByName(ctx, serverName, false) + require.NoError(t, err) + assert.Equal(t, "0.0.59", latest.Server.Version) + assert.True(t, latest.Meta.Official.IsLatest) +} + +func TestRecalculateLatest_ReproFromIssue1081(t *testing.T) { + ctx := context.Background() + testDB := database.NewTestDB(t) + service := NewRegistryService(testDB, &config.Config{EnableRegistryValidation: false}) + + serverName := "com.example/issue-1081-repro" + + // 1. Publish 1.0.0. + _, err := service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v1", Version: "1.0.0", + }) + require.NoError(t, err) + + // 2. Publish 0.0.59 — expected not latest at this point. + _, err = service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v0.0.59", Version: "0.0.59", + }) + require.NoError(t, err) + + // 3. Delete 1.0.0. + _, err = service.UpdateServerStatus(ctx, serverName, "1.0.0", &StatusChangeRequest{ + NewStatus: model.StatusDeleted, + }) + require.NoError(t, err) + + // 4. /versions/latest must not 404. + latest, err := service.GetServerByName(ctx, serverName, false) + require.NoError(t, err) + assert.Equal(t, "0.0.59", latest.Server.Version) + + // 5. Publish 0.0.60 — should become latest because compare now excludes deleted 1.0.0. + _, err = service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v0.0.60", Version: "0.0.60", + }) + require.NoError(t, err) + + latest, err = service.GetServerByName(ctx, serverName, false) + require.NoError(t, err) + assert.Equal(t, "0.0.60", latest.Server.Version) + assert.True(t, latest.Meta.Official.IsLatest) + + // Only one row should be flagged latest. + all, err := service.GetAllVersionsByServerName(ctx, serverName, true) + require.NoError(t, err) + latestCount := 0 + for _, v := range all { + if v.Meta.Official.IsLatest { + latestCount++ + } + } + assert.Equal(t, 1, latestCount) +} + +func TestRecalculateLatest_AllDeletedKeepsHighestAsLatest(t *testing.T) { + ctx := context.Background() + testDB := database.NewTestDB(t) + service := NewRegistryService(testDB, &config.Config{EnableRegistryValidation: false}) + + serverName := "com.example/all-deleted-server" + + _, err := service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v1", Version: "1.0.0", + }) + require.NoError(t, err) + _, err = service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v2", Version: "2.0.0", + }) + require.NoError(t, err) + + // Delete all versions in one call. + _, err = service.UpdateAllVersionsStatus(ctx, serverName, &StatusChangeRequest{ + NewStatus: model.StatusDeleted, + }) + require.NoError(t, err) + + // Public lookup (includeDeleted=false) must now 404. + _, err = service.GetServerByName(ctx, serverName, false) + require.ErrorIs(t, err, database.ErrNotFound) + + // Admin lookup (includeDeleted=true) must still find the server: the highest deleted + // version keeps is_latest=true so the server remains addressable for restore flows. + latest, err := service.GetServerByName(ctx, serverName, true) + require.NoError(t, err) + assert.Equal(t, "2.0.0", latest.Server.Version) + assert.True(t, latest.Meta.Official.IsLatest) +} + +func TestRecalculateLatest_RestoringHigherVersionPromotesIt(t *testing.T) { + ctx := context.Background() + testDB := database.NewTestDB(t) + service := NewRegistryService(testDB, &config.Config{EnableRegistryValidation: false}) + + serverName := "com.example/restore-server" + + // Publish 2.0.0 and 1.0.0 — 2.0.0 is latest. + _, err := service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v2", Version: "2.0.0", + }) + require.NoError(t, err) + _, err = service.CreateServer(ctx, &apiv0.ServerJSON{ + Schema: model.CurrentSchemaURL, Name: serverName, Description: "v1", Version: "1.0.0", + }) + require.NoError(t, err) + + // Delete 2.0.0 — 1.0.0 gets promoted. + _, err = service.UpdateServerStatus(ctx, serverName, "2.0.0", &StatusChangeRequest{ + NewStatus: model.StatusDeleted, + }) + require.NoError(t, err) + latest, err := service.GetServerByName(ctx, serverName, false) + require.NoError(t, err) + require.Equal(t, "1.0.0", latest.Server.Version) + + // Restore 2.0.0 — it should reclaim latest. + _, err = service.UpdateServerStatus(ctx, serverName, "2.0.0", &StatusChangeRequest{ + NewStatus: model.StatusActive, + }) + require.NoError(t, err) + latest, err = service.GetServerByName(ctx, serverName, false) + require.NoError(t, err) + assert.Equal(t, "2.0.0", latest.Server.Version) + assert.True(t, latest.Meta.Official.IsLatest) +} + // Helper functions func stringPtr(s string) *string { return &s