Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/1188.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Reduced peak memory consumption of repair_metadata by lowering batch size from 1000 to 250,
eliminating double S3 reads for wheel files, and closing artifact file handles after each
iteration. This fixes "Worker has gone missing" errors on repositories with 1000+ packages.
37 changes: 25 additions & 12 deletions pulp_python/app/tasks/repair.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from collections import defaultdict
from gettext import gettext as _
from itertools import groupby
Expand All @@ -8,18 +9,20 @@
from django.db.models.query import QuerySet
from pulp_python.app.models import PythonPackageContent, PythonRepository
from pulp_python.app.utils import (
artifact_to_metadata_artifact,
artifact_to_python_content_data,
copy_artifact_to_temp_file,
extract_wheel_metadata,
fetch_json_release_metadata,
metadata_content_to_artifact,
parse_metadata,
)
from pulpcore.plugin.models import Artifact, ContentArtifact, ProgressReport
from pulpcore.plugin.models import ContentArtifact, ProgressReport
from pulpcore.plugin.util import get_domain

log = logging.getLogger(__name__)


BULK_SIZE = 1000
BULK_SIZE = 250


def repair(repository_pk: UUID) -> None:
Expand Down Expand Up @@ -118,11 +121,21 @@ def repair_metadata(content: QuerySet[PythonPackageContent]) -> tuple[int, set[s
.first()
.artifact
)
new_data = artifact_to_python_content_data(package.filename, main_artifact, domain)
# Copy artifact to temp file once, extract both content data and metadata
temp_path = copy_artifact_to_temp_file(main_artifact, package.filename)
try:
new_data = artifact_to_python_content_data(
package.filename, main_artifact, domain, temp_path=temp_path
)
metadata_content = (
extract_wheel_metadata(temp_path) if package.filename.endswith(".whl") else None
)
finally:
os.unlink(temp_path)
total_metadata_repaired += update_metadata_artifact_if_needed(
package,
new_data.get("metadata_sha256"),
main_artifact,
metadata_content,
metadata_batch,
pkgs_metadata_not_repaired,
)
Expand Down Expand Up @@ -236,7 +249,7 @@ def update_package_if_needed(
def update_metadata_artifact_if_needed(
package: PythonPackageContent,
new_metadata_sha256: str | None,
main_artifact: Artifact,
metadata_content: bytes | None,
metadata_batch: list[tuple],
pkgs_metadata_not_repaired: set[str],
) -> int:
Expand All @@ -248,7 +261,7 @@ def update_metadata_artifact_if_needed(
Args:
package: Package to check for metadata changes.
new_metadata_sha256: The correct metadata_sha256 extracted from the main artifact, or None.
main_artifact: The main package artifact used to generate metadata.
metadata_content: Raw metadata bytes extracted from the wheel, or None.
metadata_batch: List of tuples for batch processing (updated in-place).
pkgs_metadata_not_repaired: Set of package PKs that failed repair (updated in-place).

Expand All @@ -265,13 +278,13 @@ def update_metadata_artifact_if_needed(

# Create missing
if not cas:
metadata_batch.append((package, main_artifact))
metadata_batch.append((package, metadata_content))
# Fix existing
elif new_metadata_sha256 != original_metadata_sha256:
ca = cas.first()
metadata_artifact = ca.artifact
if metadata_artifact is None or (metadata_artifact.sha256 != new_metadata_sha256):
metadata_batch.append((package, main_artifact))
metadata_batch.append((package, metadata_content))

if len(metadata_batch) == BULK_SIZE:
not_repaired = _process_metadata_batch(metadata_batch)
Expand All @@ -288,16 +301,16 @@ def _process_metadata_batch(metadata_batch: list[tuple]) -> set[str]:
and their corresponding ContentArtifacts.

Args:
metadata_batch: List of (package, main_artifact) tuples.
metadata_batch: List of (package, metadata_content) tuples.

Returns:
Set of package PKs for which metadata artifacts could not be created.
"""
not_repaired = set()
content_artifacts = []

for package, main_artifact in metadata_batch:
metadata_artifact = artifact_to_metadata_artifact(package.filename, main_artifact)
for package, metadata_content in metadata_batch:
metadata_artifact = metadata_content_to_artifact(metadata_content)
if metadata_artifact:
ca = ContentArtifact(
artifact=metadata_artifact,
Expand Down
41 changes: 35 additions & 6 deletions pulp_python/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,37 @@ def compute_metadata_sha256(filename: str) -> str | None:
return hashlib.sha256(metadata_content).hexdigest() if metadata_content else None


def artifact_to_python_content_data(filename, artifact, domain=None):
def copy_artifact_to_temp_file(artifact, filename, tmp_dir="."):
"""
Copy an artifact's file to a temporary file on disk.

Returns the path to the temp file. The caller is responsible for cleanup.
"""
temp_file = tempfile.NamedTemporaryFile("wb", dir=tmp_dir, suffix=filename, delete=False)
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
temp_file.close()
return temp_file.name


def artifact_to_python_content_data(filename, artifact, domain=None, temp_path=None):
"""
Takes the artifact/filename and returns the metadata needed to create a PythonPackageContent.

If temp_path is provided, uses it instead of copying the artifact to a new temp file.
"""
# Copy file to a temp directory under the user provided filename, we do this
# because pkginfo validates that the filename has a valid extension before
# reading it
with tempfile.NamedTemporaryFile("wb", dir=".", suffix=filename) as temp_file:
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
metadata = get_project_metadata_from_file(temp_file.name)
if temp_path:
metadata = get_project_metadata_from_file(temp_path)
else:
with tempfile.NamedTemporaryFile("wb", dir=".", suffix=filename) as temp_file:
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
metadata = get_project_metadata_from_file(temp_file.name)
data = parse_project_metadata(vars(metadata))
data["sha256"] = artifact.sha256
data["size"] = artifact.size
Expand Down Expand Up @@ -280,6 +299,16 @@ def artifact_to_metadata_artifact(
if not metadata_content:
return None

return metadata_content_to_artifact(metadata_content, tmp_dir)


def metadata_content_to_artifact(metadata_content: bytes, tmp_dir: str = ".") -> Artifact | None:
"""
Creates an Artifact from raw metadata content bytes.
"""
if not metadata_content:
return None

with tempfile.NamedTemporaryFile(
"wb", dir=tmp_dir, suffix=".metadata", delete=False
) as temp_md:
Expand Down
Loading