Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applySharedScheduledExecutor)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applySharedScheduledExecutor)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties()::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties()::applyServiceConfigurations)
.applyMutation(s3FileIOProperties()::applyRetryConfigurations)
.applyMutation(s3FileIOProperties()::applySharedScheduledExecutor)
.credentialsProvider(
new LakeFormationCredentialsProvider(lakeFormation(), buildTableArn()))
.region(Region.of(region()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applySharedScheduledExecutor)
.build();
}

Expand Down
153 changes: 153 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.xml.stream.XMLStreamException;
import org.apache.iceberg.EnvironmentContext;
Expand Down Expand Up @@ -506,6 +514,24 @@ public class S3FileIOProperties implements Serializable {

public static final boolean S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true;

/**
* Controls whether a shared ScheduledExecutorService is used across S3 clients to prevent
* unbounded thread accumulation. When enabled, all S3 clients share a single executor instead of
* each creating its own sdk-ScheduledExecutor threads that are never shut down.
*/
public static final String S3_SHARED_SCHEDULED_EXECUTOR_ENABLED =
"s3.shared-scheduled-executor.enabled";

public static final boolean S3_SHARED_SCHEDULED_EXECUTOR_ENABLED_DEFAULT = true;

/**
* Pool size for the shared ScheduledExecutorService. Matches the AWS SDK v2 default. The SDK uses
* this executor for scheduling async retry attempts and timeout tasks, not for data transfer.
*
* <p>See https://github.com/aws/aws-sdk-java-v2/issues/1690
*/
private static final int SHARED_SCHEDULED_EXECUTOR_POOL_SIZE = 5;

private String sseType;
private String sseKey;
private String sseMd5;
Expand Down Expand Up @@ -547,6 +573,7 @@ public class S3FileIOProperties implements Serializable {
private long s3RetryMaxWaitMs;

private boolean s3DirectoryBucketListPrefixAsDirectory;
private boolean isSharedScheduledExecutorEnabled;
private final Map<String, String> allProperties;

public S3FileIOProperties() {
Expand Down Expand Up @@ -586,6 +613,7 @@ public S3FileIOProperties() {
this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
this.s3DirectoryBucketListPrefixAsDirectory =
S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT;
this.isSharedScheduledExecutorEnabled = S3_SHARED_SCHEDULED_EXECUTOR_ENABLED_DEFAULT;
this.isS3AnalyticsAcceleratorEnabled = S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT;
this.s3AnalyticsacceleratorProperties = Maps.newHashMap();
this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT;
Expand Down Expand Up @@ -705,6 +733,11 @@ public S3FileIOProperties(Map<String, String> properties) {
properties,
S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY,
S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT);
this.isSharedScheduledExecutorEnabled =
PropertyUtil.propertyAsBoolean(
properties,
S3_SHARED_SCHEDULED_EXECUTOR_ENABLED,
S3_SHARED_SCHEDULED_EXECUTOR_ENABLED_DEFAULT);
this.isS3AnalyticsAcceleratorEnabled =
PropertyUtil.propertyAsBoolean(
properties, S3_ANALYTICS_ACCELERATOR_ENABLED, S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
Expand Down Expand Up @@ -1169,6 +1202,126 @@ public <T extends S3ClientBuilder> void applyUserAgentConfigurations(T builder)
.build());
}

/**
* Applies a shared ScheduledExecutorService to the S3Client to prevent unbounded thread
* accumulation.
*/
public <T extends S3ClientBuilder> void applySharedScheduledExecutor(T builder) {
if (!isSharedScheduledExecutorEnabled) {
return;
}

// Without this, each S3Client creates its own sdk-ScheduledExecutor threads that are never
// shut down when the client becomes unreachable (threads are GC roots).
// https://github.com/apache/iceberg/issues/15898
ClientOverrideConfiguration.Builder configBuilder =
null != builder.overrideConfiguration()
? builder.overrideConfiguration().toBuilder()
: ClientOverrideConfiguration.builder();
builder.overrideConfiguration(
configBuilder
.scheduledExecutorService(
SharedScheduledExecutorHolder.get(SHARED_SCHEDULED_EXECUTOR_POOL_SIZE))
.build());
}

/**
* Lazy holder for the shared ScheduledExecutorService. Uses daemon threads so they don't prevent
* JVM shutdown.
*
* <p>The returned executor is wrapped to prevent shutdown by the AWS SDK. When S3Client.close()
* is called, the SDK's AttributeMap.closeIfPossible() calls shutdown() on any ExecutorService
* stored in the client configuration. The wrapper makes shutdown/shutdownNow no-ops so the shared
* executor remains usable across multiple S3 client lifecycles.
*/
private static final class SharedScheduledExecutorHolder {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
private static ScheduledExecutorService instance;

static synchronized ScheduledExecutorService get(int poolSize) {
if (instance == null) {
ScheduledExecutorService delegate =
Executors.newScheduledThreadPool(
poolSize,
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("iceberg-shared-sdk-executor-" + THREAD_COUNT.getAndIncrement());
thread.setDaemon(true);
return thread;
});
instance = new NonClosingScheduledExecutorService(delegate);
}
return instance;
}
}

/**
* A wrapper around a ScheduledExecutorService that prevents shutdown. This is necessary because
* the AWS SDK v2 calls ExecutorService.shutdown() on client close via
* AttributeMap.closeIfPossible().
*/
private static class NonClosingScheduledExecutorService extends AbstractExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService delegate;

NonClosingScheduledExecutorService(ScheduledExecutorService delegate) {
this.delegate = delegate;
}

@Override
public void shutdown() {
// no-op: prevent SDK from shutting down the shared executor
}

@Override
public List<Runnable> shutdownNow() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a test for shutdown being a no-op?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added, thanks!

// no-op: prevent SDK from shutting down the shared executor
return Collections.emptyList();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
delegate.execute(command);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate.schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

public S3CrtAsyncClientBuilder applyS3CrtConfigurations(S3CrtAsyncClientBuilder builder) {
return builder.maxConcurrency(s3CrtMaxConcurrency());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient;
Expand Down Expand Up @@ -320,4 +321,58 @@ public void testIsTreatS3DirectoryBucketListPrefixAsDirectoryEnabled() {
S3FileIOProperties properties = new S3FileIOProperties(map);
assertThat(properties.isS3DirectoryBucketListPrefixAsDirectory()).isEqualTo(false);
}

@Test
public void testApplySharedScheduledExecutorIsSingleton() {
S3FileIOProperties s3Properties = new S3FileIOProperties(Maps.newHashMap());
S3ClientBuilder builder1 = S3Client.builder();
S3ClientBuilder builder2 = S3Client.builder();

s3Properties.applySharedScheduledExecutor(builder1);
s3Properties.applySharedScheduledExecutor(builder2);

assertThat(builder1.overrideConfiguration().scheduledExecutorService()).isPresent();
assertThat(builder1.overrideConfiguration().scheduledExecutorService().get())
.isSameAs(builder2.overrideConfiguration().scheduledExecutorService().get());
}

@Test
public void testSharedScheduledExecutorPreservesExistingConfig() {
S3FileIOProperties s3Properties = new S3FileIOProperties(Maps.newHashMap());
S3ClientBuilder builder = S3Client.builder();

s3Properties.applyRetryConfigurations(builder);
s3Properties.applySharedScheduledExecutor(builder);

assertThat(builder.overrideConfiguration().retryPolicy()).isPresent();
assertThat(builder.overrideConfiguration().scheduledExecutorService()).isPresent();
}

@Test
public void testSharedScheduledExecutorDisabled() {
Map<String, String> properties = Maps.newHashMap();
properties.put(S3FileIOProperties.S3_SHARED_SCHEDULED_EXECUTOR_ENABLED, "false");
S3FileIOProperties s3Properties = new S3FileIOProperties(properties);
S3ClientBuilder builder = S3Client.builder();

s3Properties.applySharedScheduledExecutor(builder);

assertThat(builder.overrideConfiguration().scheduledExecutorService()).isNotPresent();
}

@Test
public void testSharedScheduledExecutorSurvivesShutdown() {
S3FileIOProperties s3Properties = new S3FileIOProperties(Maps.newHashMap());
S3ClientBuilder builder = S3Client.builder();

s3Properties.applySharedScheduledExecutor(builder);

ScheduledExecutorService executor =
builder.overrideConfiguration().scheduledExecutorService().get();
executor.shutdown();
executor.shutdownNow();

assertThat(executor.isShutdown()).isFalse();
assertThat(executor.isTerminated()).isFalse();
}
}
Loading