/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.exchange.filesystem.s3;

import com.google.api.gax.paging.Page;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.Slices;
import io.airlift.units.Duration;
import io.trino.annotation.NotThreadSafe;
import io.trino.plugin.exchange.filesystem.ExchangeSourceFile;
import io.trino.plugin.exchange.filesystem.ExchangeStorageReader;
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.plugin.exchange.filesystem.s3.BufferWriteAsyncResponseTransformer;
import io.trino.plugin.exchange.filesystem.s3.ExchangeS3Config;
import io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper;
import io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorageStats;
import jakarta.annotation.PreDestroy;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.endpoint.DefaultServiceEndpointBuilder;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

public class S3FileSystemExchangeStorage
implements FileSystemExchangeStorage {
    private static final Logger log = Logger.get(S3FileSystemExchangeStorage.class);
    private final S3FileSystemExchangeStorageStats stats;
    private final Optional<Region> region;
    private final Optional<String> endpoint;
    private final int multiUploadPartSize;
    private final S3AsyncClient s3AsyncClient;
    private final StorageClass storageClass;
    private final CompatibilityMode compatibilityMode;
    private final S3SseContext s3SseContext;
    private final Optional<Storage> gcsClient;
    private final Optional<ListeningExecutorService> gcsDeleteExecutor;

    @Inject
    public S3FileSystemExchangeStorage(final S3FileSystemExchangeStorageStats stats, ExchangeS3Config config, CompatibilityMode compatibilityMode) throws IOException {
        this.stats = Objects.requireNonNull(stats, "stats is null");
        this.region = config.getS3Region();
        this.endpoint = config.getS3Endpoint();
        this.multiUploadPartSize = Math.toIntExact(config.getS3UploadPartSize().toBytes());
        this.storageClass = config.getStorageClass();
        this.compatibilityMode = Objects.requireNonNull(compatibilityMode, "compatibilityMode is null");
        this.s3SseContext = new S3SseContext(config.getSseType(), config.getSseKmsKeyId());
        AwsCredentialsProvider credentialsProvider = S3FileSystemExchangeStorage.createAwsCredentialsProvider(config);
        ClientOverrideConfiguration overrideConfig = (ClientOverrideConfiguration)ClientOverrideConfiguration.builder().retryStrategy(SdkDefaultRetryStrategy.forRetryMode((RetryMode)config.getRetryMode()).toBuilder().maxAttempts(config.getS3MaxErrorRetries()).build()).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, (Object)"").putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, (Object)"Trino-exchange").build();
        S3AsyncClient client = this.createS3AsyncClient(credentialsProvider, overrideConfig, config.isS3PathStyleAccess(), config.getAsyncClientConcurrency(), config.getAsyncClientMaxPendingConnectionAcquires(), config.getConnectionAcquisitionTimeout());
        this.s3AsyncClient = new S3AsyncClientWrapper(this, client){

            @Override
            protected void handle(S3AsyncClientWrapper.RequestType requestType, CompletableFuture<?> responseFuture) {
                stats.requestStarted(requestType);
                responseFuture.whenComplete((result, failure) -> {
                    if (failure != null && failure.getMessage() != null && failure.getMessage().contains("Maximum pending connection acquisitions exceeded")) {
                        log.error(failure, "Encountered 'Maximum pending connection acquisitions exceeded' error. Active requests: %s", new Object[]{stats.getActiveRequestsSummary()});
                    }
                    stats.requestCompleted(requestType);
                });
            }
        };
        if (compatibilityMode == CompatibilityMode.GCP) {
            Optional<String> gcsJsonKeyFilePath = config.getGcsJsonKeyFilePath();
            Optional<String> gcsJsonKey = config.getGcsJsonKey();
            Verify.verify((!gcsJsonKeyFilePath.isPresent() || !gcsJsonKey.isPresent() ? 1 : 0) != 0, (String)"gcsJsonKeyFilePath and gcsJsonKey shouldn't be set at the same time", (Object[])new Object[0]);
            if (gcsJsonKeyFilePath.isPresent()) {
                credentials = GoogleCredentials.fromStream((InputStream)new FileInputStream(gcsJsonKeyFilePath.get()));
                this.gcsClient = Optional.of((Storage)((StorageOptions.Builder)StorageOptions.newBuilder().setCredentials((Credentials)credentials)).build().getService());
            } else if (gcsJsonKey.isPresent()) {
                credentials = GoogleCredentials.fromStream((InputStream)new ByteArrayInputStream(gcsJsonKey.get().getBytes(StandardCharsets.UTF_8)));
                this.gcsClient = Optional.of((Storage)((StorageOptions.Builder)StorageOptions.newBuilder().setCredentials((Credentials)credentials)).build().getService());
            } else {
                this.gcsClient = Optional.of((Storage)StorageOptions.getDefaultInstance().getService());
            }
            ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Threads.threadsNamed((String)"gcs-delete-%s"));
            executor.allowCoreThreadTimeOut(true);
            this.gcsDeleteExecutor = Optional.of(MoreExecutors.listeningDecorator((ExecutorService)executor));
        } else {
            this.gcsClient = Optional.empty();
            this.gcsDeleteExecutor = Optional.empty();
        }
    }

    @Override
    public void createDirectories(URI dir) {
    }

    @Override
    public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder) {
        return new S3ExchangeStorageReader(this.stats, this.s3AsyncClient, this.multiUploadPartSize, sourceFiles, metricsBuilder, maxPageStorageSize);
    }

    @Override
    public ExchangeStorageWriter createExchangeStorageWriter(URI file) {
        String bucketName = S3FileSystemExchangeStorage.getBucketName(file);
        String key = S3FileSystemExchangeStorage.keyFromUri(file);
        return new S3ExchangeStorageWriter(this.stats, this.s3AsyncClient, bucketName, key, this.multiUploadPartSize, this.storageClass, this.s3SseContext);
    }

    @Override
    public ListenableFuture<Void> createEmptyFile(URI file) {
        PutObjectRequest request = (PutObjectRequest)PutObjectRequest.builder().bucket(S3FileSystemExchangeStorage.getBucketName(file)).key(S3FileSystemExchangeStorage.keyFromUri(file)).build();
        return this.stats.getCreateEmptyFile().record(FileSystemExchangeFutures.translateFailures(MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.putObject(request, AsyncRequestBody.empty()))));
    }

    @Override
    public ListenableFuture<Void> deleteRecursively(List<URI> directories) {
        if (this.compatibilityMode == CompatibilityMode.GCP) {
            return this.deleteRecursivelyGcp(directories);
        }
        ImmutableMultimap.Builder bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder();
        for (URI dir : directories) {
            ImmutableList.Builder keys2 = ImmutableList.builder();
            ListenableFuture listObjectsFuture = Futures.transform((ListenableFuture)MoreFutures.toListenableFuture((CompletableFuture)this.listObjectsRecursively(dir).subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream().map(S3Object::key).forEach(arg_0 -> ((ImmutableList.Builder)keys2).add(arg_0)))), void_ -> keys2.build(), (Executor)MoreExecutors.directExecutor());
            bucketToListObjectsFuturesBuilder.put((Object)S3FileSystemExchangeStorage.getBucketName(dir), (Object)listObjectsFuture);
        }
        ImmutableMultimap bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build();
        ImmutableList.Builder deleteObjectsFutures = ImmutableList.builder();
        for (String bucketName : bucketToListObjectsFutures.keySet()) {
            deleteObjectsFutures.add((Object)Futures.transformAsync((ListenableFuture)Futures.allAsList((Iterable)bucketToListObjectsFutures.get((Object)bucketName)), keys -> this.deleteObjects(bucketName, (List)keys.stream().flatMap(Collection::stream).collect(ImmutableList.toImmutableList())), (Executor)MoreExecutors.directExecutor()));
        }
        return FileSystemExchangeFutures.translateFailures(Futures.allAsList((Iterable)deleteObjectsFutures.build()));
    }

    private ListenableFuture<Void> deleteRecursivelyGcp(List<URI> directories) {
        Storage storage = this.gcsClient.orElseThrow(() -> new IllegalStateException("gcsClient is expected to be initialized"));
        ListeningExecutorService deleteExecutor = this.gcsDeleteExecutor.orElseThrow(() -> new IllegalStateException("gcsDeleteExecutor is expected to be initialized"));
        return this.stats.getDeleteRecursively().record(FileSystemExchangeFutures.translateFailures(deleteExecutor.submit(() -> {
            StorageBatch batch = storage.batch();
            for (URI dir : directories) {
                Page blobs = storage.list(S3FileSystemExchangeStorage.getBucketName(dir), new Storage.BlobListOption[]{Storage.BlobListOption.prefix((String)S3FileSystemExchangeStorage.keyFromUri(dir))});
                for (Blob blob : blobs.iterateAll()) {
                    batch.delete(blob.getBlobId(), new Storage.BlobSourceOption[0]);
                }
            }
            batch.submit();
        })));
    }

    @Override
    public ListenableFuture<List<FileStatus>> listFilesRecursively(URI dir) {
        ImmutableList.Builder fileStatuses = ImmutableList.builder();
        return this.stats.getListFilesRecursively().record(Futures.transform((ListenableFuture)MoreFutures.toListenableFuture((CompletableFuture)this.listObjectsRecursively(dir).subscribe(listObjectsV2Response -> {
            for (S3Object s3Object : listObjectsV2Response.contents()) {
                URI uri;
                try {
                    uri = new URI(dir.getScheme(), dir.getHost(), "/" + s3Object.key(), dir.getFragment());
                }
                catch (URISyntaxException e) {
                    throw new IllegalArgumentException(e);
                }
                fileStatuses.add((Object)new FileStatus(uri.toString(), s3Object.size()));
            }
        })), void_ -> fileStatuses.build(), (Executor)MoreExecutors.directExecutor()));
    }

    @Override
    public int getWriteBufferSize() {
        return this.multiUploadPartSize;
    }

    @Override
    @PreDestroy
    public void close() throws IOException {
        try (Closer closer = Closer.create();){
            closer.register(() -> ((S3AsyncClient)this.s3AsyncClient).close());
            this.gcsDeleteExecutor.ifPresent(listeningExecutorService -> closer.register(() -> listeningExecutorService.shutdown()));
        }
    }

    private ListObjectsV2Publisher listObjectsRecursively(URI dir) {
        Preconditions.checkArgument((boolean)S3FileSystemExchangeStorage.isDirectory(dir), (String)"listObjectsRecursively called on file uri %s", (Object)dir);
        ListObjectsV2Request request = (ListObjectsV2Request)ListObjectsV2Request.builder().bucket(S3FileSystemExchangeStorage.getBucketName(dir)).prefix(S3FileSystemExchangeStorage.keyFromUri(dir)).build();
        return this.s3AsyncClient.listObjectsV2Paginator(request);
    }

    private ListenableFuture<List<DeleteObjectsResponse>> deleteObjects(String bucketName, List<String> keys) {
        List subList = Lists.partition(keys, (int)1000);
        this.stats.getDeleteObjectsEntriesCount().add((long)keys.size());
        return this.stats.getDeleteObjects().record(Futures.allAsList((Iterable)((Iterable)subList.stream().map(list -> {
            DeleteObjectsRequest request = (DeleteObjectsRequest)DeleteObjectsRequest.builder().bucket(bucketName).delete((Delete)Delete.builder().objects((Collection)list.stream().map(key -> (ObjectIdentifier)ObjectIdentifier.builder().key(key).build()).collect(ImmutableList.toImmutableList())).build()).build();
            return MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.deleteObjects(request));
        }).collect(ImmutableList.toImmutableList()))));
    }

    private static String getBucketName(URI uri) {
        if (uri.getHost() != null) {
            return uri.getHost();
        }
        if (uri.getUserInfo() == null) {
            return uri.getAuthority();
        }
        throw new IllegalArgumentException("Unable to determine S3 bucket from URI.");
    }

    private static String keyFromUri(URI uri) {
        Preconditions.checkArgument((boolean)uri.isAbsolute(), (String)"Uri is not absolute: %s", (Object)uri);
        String key = Strings.nullToEmpty((String)uri.getPath());
        if (key.startsWith("/")) {
            key = key.substring("/".length());
        }
        if (key.endsWith("/")) {
            key = key.substring(0, key.length() - "/".length());
        }
        return key;
    }

    private static boolean isDirectory(URI uri) {
        return uri.toString().endsWith("/");
    }

    private static AwsCredentialsProvider createAwsCredentialsProvider(ExchangeS3Config config) {
        String accessKey = config.getS3AwsAccessKey();
        String secretKey = config.getS3AwsSecretKey();
        if (accessKey == null && secretKey != null) {
            throw new IllegalArgumentException("AWS access key set but secret is not set; make sure you set exchange.s3.aws-secret-key config property");
        }
        if (accessKey != null && secretKey == null) {
            throw new IllegalArgumentException("AWS secret key set but access is not set; make sure you set exchange.s3.aws-access-key config property");
        }
        if (accessKey != null) {
            Preconditions.checkArgument((boolean)config.getS3IamRole().isEmpty(), (Object)"IAM role is not compatible with access key based authentication; make sure you set only one of exchange.s3.aws-access-key, exchange.s3.iam-role config properties");
            Preconditions.checkArgument((boolean)config.getS3ExternalId().isEmpty(), (Object)"External ID is not compatible with access key based authentication; make sure you set only one of exchange.s3.aws-access-key, exchange.s3.external-id config properties");
            return StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)accessKey, (String)secretKey));
        }
        if (config.getS3ExternalId().isPresent() && config.getS3IamRole().isEmpty()) {
            throw new IllegalArgumentException("External ID can only be used with IAM role based authentication; make sure you set exchange.s3.iam-role config property");
        }
        if (config.getS3IamRole().isPresent()) {
            AssumeRoleRequest.Builder assumeRoleRequest = AssumeRoleRequest.builder().roleArn(config.getS3IamRole().get()).roleSessionName("trino-exchange");
            config.getS3ExternalId().ifPresent(arg_0 -> ((AssumeRoleRequest.Builder)assumeRoleRequest).externalId(arg_0));
            StsClientBuilder stsClientBuilder = StsClient.builder();
            config.getS3Region().ifPresent(arg_0 -> ((StsClientBuilder)stsClientBuilder).region(arg_0));
            return ((StsAssumeRoleCredentialsProvider.Builder)((StsAssumeRoleCredentialsProvider.Builder)StsAssumeRoleCredentialsProvider.builder().stsClient((StsClient)stsClientBuilder.build())).refreshRequest((AssumeRoleRequest)assumeRoleRequest.build()).asyncCredentialUpdateEnabled(Boolean.valueOf(true))).build();
        }
        return DefaultCredentialsProvider.create();
    }

    private S3AsyncClient createS3AsyncClient(AwsCredentialsProvider credentialsProvider, ClientOverrideConfiguration overrideConfig, boolean isS3PathStyleAccess, int maxConcurrency, int maxPendingConnectionAcquires, Duration connectionAcquisitionTimeout) {
        S3AsyncClientBuilder clientBuilder = (S3AsyncClientBuilder)((S3AsyncClientBuilder)((S3AsyncClientBuilder)((S3AsyncClientBuilder)((S3AsyncClientBuilder)S3AsyncClient.builder().credentialsProvider(credentialsProvider)).overrideConfiguration(overrideConfig)).serviceConfiguration((S3Configuration)S3Configuration.builder().pathStyleAccessEnabled(Boolean.valueOf(isS3PathStyleAccess)).build())).httpClientBuilder((SdkAsyncHttpClient.Builder)NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.valueOf(maxConcurrency)).maxPendingConnectionAcquires(Integer.valueOf(maxPendingConnectionAcquires)).connectionAcquisitionTimeout(java.time.Duration.ofMillis(connectionAcquisitionTimeout.toMillis())))).endpointOverride(this.endpoint.map(URI::create).orElseGet(() -> new DefaultServiceEndpointBuilder("s3", "http").withRegion(this.region.orElseThrow(() -> new IllegalArgumentException("region is expected to be set"))).getServiceEndpoint()));
        this.region.ifPresent(arg_0 -> ((S3AsyncClientBuilder)clientBuilder).region(arg_0));
        return (S3AsyncClient)clientBuilder.build();
    }

    private static <T> void recordDistributionMetric(ListenableFuture<T> future, final MetricsBuilder.DistributionMetricBuilder successMetric, final MetricsBuilder.DistributionMetricBuilder failureMetric) {
        final Stopwatch stopwatch = Stopwatch.createStarted();
        Futures.addCallback(future, (FutureCallback)new FutureCallback<T>(){

            public void onSuccess(T result) {
                successMetric.add(stopwatch.elapsed(TimeUnit.MILLISECONDS));
            }

            public void onFailure(Throwable t) {
                failureMetric.add(stopwatch.elapsed(TimeUnit.MILLISECONDS));
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    public static enum CompatibilityMode {
        AWS,
        GCP;

    }

    private record S3SseContext(ExchangeS3Config.S3SseType sseType, Optional<String> sseKmsKeyId) {
        S3SseContext {
            Objects.requireNonNull(sseType, "sseType is null");
            Preconditions.checkArgument((boolean)(sseType == ExchangeS3Config.S3SseType.KMS ^ sseKmsKeyId.isEmpty()), (Object)"sseKmsKeyId is supposed to be set only when sseType is KMS");
        }
    }

    @ThreadSafe
    private static class S3ExchangeStorageReader
    implements ExchangeStorageReader {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(S3ExchangeStorageReader.class);
        private final S3FileSystemExchangeStorageStats stats;
        private final S3AsyncClient s3AsyncClient;
        MetricsBuilder.CounterMetricBuilder sourceFilesProcessedMetric;
        MetricsBuilder.DistributionMetricBuilder s3GetObjectRequestsSuccessMetric;
        MetricsBuilder.DistributionMetricBuilder s3GetObjectRequestsFailedMetric;
        private final int partSize;
        private final int bufferSize;
        @GuardedBy(value="this")
        private final Queue<ExchangeSourceFile> sourceFiles;
        @GuardedBy(value="this")
        private ExchangeSourceFile currentFile;
        @GuardedBy(value="this")
        private long fileOffset;
        @GuardedBy(value="this")
        private SliceInput sliceInput;
        @GuardedBy(value="this")
        private int sliceSize = -1;
        private volatile boolean closed;
        private volatile long bufferRetainedSize;
        private volatile ListenableFuture<Void> inProgressReadFuture = Futures.immediateVoidFuture();

        public S3ExchangeStorageReader(S3FileSystemExchangeStorageStats stats, S3AsyncClient s3AsyncClient, int partSize, List<ExchangeSourceFile> sourceFiles, MetricsBuilder metricsBuilder, int maxPageStorageSize) {
            this.stats = Objects.requireNonNull(stats, "stats is null");
            this.s3AsyncClient = Objects.requireNonNull(s3AsyncClient, "s3AsyncClient is null");
            this.partSize = partSize;
            this.sourceFiles = new ArrayDeque<ExchangeSourceFile>((Collection)Objects.requireNonNull(sourceFiles, "sourceFiles is null"));
            Objects.requireNonNull(metricsBuilder, "metricsBuilder is null");
            this.sourceFilesProcessedMetric = metricsBuilder.getCounterMetric("FileSystemExchangeSource.filesProcessed");
            this.s3GetObjectRequestsSuccessMetric = metricsBuilder.getDistributionMetric("FileSystemExchangeSource.s3GetObjectRequestsSuccess");
            this.s3GetObjectRequestsFailedMetric = metricsBuilder.getDistributionMetric("FileSystemExchangeSource.s3GetObjectRequestsFailed");
            this.bufferSize = maxPageStorageSize + partSize;
            this.fillBuffer();
        }

        @Override
        public synchronized Slice read() throws IOException {
            if (this.closed || !this.inProgressReadFuture.isDone()) {
                return null;
            }
            try {
                MoreFutures.getFutureValue(this.inProgressReadFuture);
            }
            catch (RuntimeException e) {
                throw new IOException(e);
            }
            if (this.sliceSize < 0) {
                this.sliceSize = this.sliceInput.readInt();
            }
            Slice data = this.sliceInput.readSlice(this.sliceSize);
            if (this.sliceInput.available() > 4) {
                this.sliceSize = this.sliceInput.readInt();
                if (this.sliceInput.available() < this.sliceSize) {
                    this.fillBuffer();
                }
            } else {
                this.sliceSize = -1;
                this.fillBuffer();
            }
            return data;
        }

        @Override
        public ListenableFuture<Void> isBlocked() {
            return this.inProgressReadFuture;
        }

        @Override
        public long getRetainedSize() {
            return (long)INSTANCE_SIZE + this.bufferRetainedSize;
        }

        @Override
        public boolean isFinished() {
            return this.closed;
        }

        @Override
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.currentFile = null;
            this.sliceInput = null;
            this.bufferRetainedSize = 0L;
            this.inProgressReadFuture.cancel(true);
            this.inProgressReadFuture = Futures.immediateVoidFuture();
        }

        @GuardedBy(value="this")
        private void fillBuffer() {
            if (this.currentFile == null || this.fileOffset == this.currentFile.getFileSize()) {
                this.currentFile = this.sourceFiles.poll();
                if (this.currentFile == null) {
                    this.close();
                    return;
                }
                this.fileOffset = 0L;
            }
            byte[] buffer = new byte[this.bufferSize];
            int bufferFill = 0;
            if (this.sliceInput != null) {
                int length = this.sliceInput.available();
                this.sliceInput.readBytes(buffer, 0, length);
                bufferFill += length;
            }
            ImmutableList.Builder getObjectFutures = ImmutableList.builder();
            while (true) {
                long fileSize = this.currentFile.getFileSize();
                int readableParts = (buffer.length - bufferFill) / this.partSize;
                if (readableParts == 0) {
                    if ((long)(buffer.length - bufferFill) < fileSize - this.fileOffset) break;
                    readableParts = 1;
                }
                String key = S3FileSystemExchangeStorage.keyFromUri(this.currentFile.getFileUri());
                String bucketName = S3FileSystemExchangeStorage.getBucketName(this.currentFile.getFileUri());
                for (int i = 0; i < readableParts && this.fileOffset < fileSize; ++i) {
                    int length = (int)Math.min((long)this.partSize, fileSize - this.fileOffset);
                    GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder().key(key).bucket(bucketName).range("bytes=" + this.fileOffset + "-" + (this.fileOffset + (long)length - 1L));
                    ListenableFuture getObjectFuture = MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.getObject((GetObjectRequest)getObjectRequestBuilder.build(), BufferWriteAsyncResponseTransformer.toBufferWrite(buffer, bufferFill)));
                    this.stats.getGetObject().record(getObjectFuture);
                    this.stats.getGetObjectDataSizeInBytes().add((long)length);
                    S3FileSystemExchangeStorage.recordDistributionMetric(getObjectFuture, this.s3GetObjectRequestsSuccessMetric, this.s3GetObjectRequestsFailedMetric);
                    getObjectFutures.add((Object)getObjectFuture);
                    bufferFill += length;
                    this.fileOffset += (long)length;
                }
                if (this.fileOffset != fileSize) continue;
                this.sourceFilesProcessedMetric.increment();
                this.currentFile = this.sourceFiles.poll();
                if (this.currentFile == null) break;
                this.fileOffset = 0L;
            }
            this.inProgressReadFuture = MoreFutures.asVoid((ListenableFuture)Futures.allAsList((Iterable)getObjectFutures.build()));
            this.sliceInput = Slices.wrappedBuffer((byte[])buffer, (int)0, (int)bufferFill).getInput();
            this.bufferRetainedSize = this.sliceInput.getRetainedSize();
        }
    }

    @NotThreadSafe
    private static class S3ExchangeStorageWriter
    implements ExchangeStorageWriter {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(S3ExchangeStorageWriter.class);
        private final S3FileSystemExchangeStorageStats stats;
        private final S3AsyncClient s3AsyncClient;
        private final String bucketName;
        private final String key;
        private final int partSize;
        private final StorageClass storageClass;
        private final S3SseContext s3SseContext;
        private int currentPartNumber;
        private ListenableFuture<Void> directUploadFuture;
        private ListenableFuture<String> multiPartUploadIdFuture;
        private final List<ListenableFuture<CompletedPart>> multiPartUploadFutures = new ArrayList<ListenableFuture<CompletedPart>>();
        private volatile boolean closed;

        public S3ExchangeStorageWriter(S3FileSystemExchangeStorageStats stats, S3AsyncClient s3AsyncClient, String bucketName, String key, int partSize, StorageClass storageClass, S3SseContext s3SseContext) {
            this.stats = Objects.requireNonNull(stats, "stats is null");
            this.s3AsyncClient = Objects.requireNonNull(s3AsyncClient, "s3AsyncClient is null");
            this.bucketName = Objects.requireNonNull(bucketName, "bucketName is null");
            this.key = Objects.requireNonNull(key, "key is null");
            this.partSize = partSize;
            this.storageClass = Objects.requireNonNull(storageClass, "storageClass is null");
            this.s3SseContext = Objects.requireNonNull(s3SseContext, "s3SseContext is null");
        }

        @Override
        public ListenableFuture<Void> write(Slice slice) {
            Preconditions.checkState((this.directUploadFuture == null ? 1 : 0) != 0, (Object)"Direct upload already started");
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            if (slice.length() < this.partSize && this.multiPartUploadIdFuture == null) {
                PutObjectRequest putObjectRequest = (PutObjectRequest)((PutObjectRequest.Builder)PutObjectRequest.builder().bucket(this.bucketName).key(this.key).storageClass(this.storageClass).applyMutation(builder -> {
                    switch (this.s3SseContext.sseType()) {
                        case NONE: {
                            break;
                        }
                        case S3: {
                            builder.serverSideEncryption(ServerSideEncryption.AES256);
                            break;
                        }
                        case KMS: {
                            builder.serverSideEncryption(ServerSideEncryption.AWS_KMS).ssekmsKeyId(this.s3SseContext.sseKmsKeyId().get());
                        }
                    }
                })).build();
                this.directUploadFuture = FileSystemExchangeFutures.translateFailures(MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromByteBufferUnsafe((ByteBuffer)slice.toByteBuffer()))));
                this.stats.getPutObject().record(this.directUploadFuture);
                this.stats.getPutObjectDataSizeInBytes().add((long)slice.length());
                return this.directUploadFuture;
            }
            if (this.multiPartUploadIdFuture == null) {
                this.multiPartUploadIdFuture = Futures.transform(this.createMultipartUpload(), CreateMultipartUploadResponse::uploadId, (Executor)MoreExecutors.directExecutor());
            }
            int partNum = ++this.currentPartNumber;
            ListenableFuture uploadFuture = Futures.transformAsync(this.multiPartUploadIdFuture, uploadId -> this.uploadPart((String)uploadId, slice, partNum), (Executor)MoreExecutors.directExecutor());
            this.multiPartUploadFutures.add((ListenableFuture<CompletedPart>)uploadFuture);
            return FileSystemExchangeFutures.translateFailures(uploadFuture);
        }

        @Override
        public ListenableFuture<Void> finish() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            if (this.multiPartUploadIdFuture == null) {
                return Objects.requireNonNullElseGet(this.directUploadFuture, Futures::immediateVoidFuture);
            }
            ListenableFuture<Void> finishFuture = FileSystemExchangeFutures.translateFailures(Futures.transformAsync((ListenableFuture)Futures.allAsList(this.multiPartUploadFutures), completedParts -> this.completeMultipartUpload((String)MoreFutures.getFutureValue(this.multiPartUploadIdFuture), (List<CompletedPart>)completedParts), (Executor)MoreExecutors.directExecutor()));
            Futures.addCallback(finishFuture, (FutureCallback)new FutureCallback<Void>(){

                public void onSuccess(Void result) {
                    closed = true;
                }

                public void onFailure(Throwable ignored) {
                }
            }, (Executor)MoreExecutors.directExecutor());
            return finishFuture;
        }

        @Override
        public ListenableFuture<Void> abort() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            this.closed = true;
            if (this.multiPartUploadIdFuture == null) {
                if (this.directUploadFuture != null) {
                    this.directUploadFuture.cancel(true);
                }
                return Futures.immediateVoidFuture();
            }
            Verify.verify((this.directUploadFuture == null ? 1 : 0) != 0);
            this.multiPartUploadFutures.forEach(future -> future.cancel(true));
            return FileSystemExchangeFutures.translateFailures(Futures.transformAsync(this.multiPartUploadIdFuture, this::abortMultipartUpload, (Executor)MoreExecutors.directExecutor()));
        }

        @Override
        public long getRetainedSize() {
            return INSTANCE_SIZE;
        }

        private ListenableFuture<CreateMultipartUploadResponse> createMultipartUpload() {
            CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = (CreateMultipartUploadRequest.Builder)CreateMultipartUploadRequest.builder().bucket(this.bucketName).key(this.key).storageClass(this.storageClass).applyMutation(builder -> {
                switch (this.s3SseContext.sseType()) {
                    case NONE: {
                        break;
                    }
                    case S3: {
                        builder.serverSideEncryption(ServerSideEncryption.AES256);
                        break;
                    }
                    case KMS: {
                        builder.serverSideEncryption(ServerSideEncryption.AWS_KMS).ssekmsKeyId(this.s3SseContext.sseKmsKeyId().get());
                    }
                }
            });
            return this.stats.getCreateMultipartUpload().record(MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.createMultipartUpload((CreateMultipartUploadRequest)createMultipartUploadRequestBuilder.build())));
        }

        private ListenableFuture<CompletedPart> uploadPart(String uploadId, Slice slice, int partNumber) {
            UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder().bucket(this.bucketName).key(this.key).uploadId(uploadId).partNumber(Integer.valueOf(partNumber));
            UploadPartRequest uploadPartRequest = (UploadPartRequest)uploadPartRequestBuilder.build();
            this.stats.getUploadPartDataSizeInBytes().add((long)slice.length());
            return this.stats.getUploadPart().record(Futures.transform((ListenableFuture)MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.uploadPart(uploadPartRequest, AsyncRequestBody.fromByteBufferUnsafe((ByteBuffer)slice.toByteBuffer()))), uploadPartResponse -> (CompletedPart)CompletedPart.builder().eTag(uploadPartResponse.eTag()).partNumber(Integer.valueOf(partNumber)).build(), (Executor)MoreExecutors.directExecutor()));
        }

        private ListenableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(String uploadId, List<CompletedPart> completedParts) {
            CompletedMultipartUpload completedMultipartUpload = (CompletedMultipartUpload)CompletedMultipartUpload.builder().parts(completedParts).build();
            CompleteMultipartUploadRequest completeMultipartUploadRequest = (CompleteMultipartUploadRequest)CompleteMultipartUploadRequest.builder().bucket(this.bucketName).key(this.key).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
            this.stats.getCompleteMultipartUploadPartsCount().add((long)completedParts.size());
            return this.stats.getCompleteMultipartUpload().record(MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest)));
        }

        private ListenableFuture<AbortMultipartUploadResponse> abortMultipartUpload(String uploadId) {
            AbortMultipartUploadRequest abortMultipartUploadRequest = (AbortMultipartUploadRequest)AbortMultipartUploadRequest.builder().bucket(this.bucketName).key(this.key).uploadId(uploadId).build();
            return this.stats.getAbortMultipartUpload().record(MoreFutures.toListenableFuture((CompletableFuture)this.s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest)));
        }
    }
}

