/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.operator.PageTransportErrorException;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class HttpNativeExecutionTaskResultFetcher {
    private static final Duration FETCH_INTERVAL = new Duration(200.0, TimeUnit.MILLISECONDS);
    private static final Duration POLL_TIMEOUT = new Duration(100.0, TimeUnit.MILLISECONDS);
    private static final DataSize MAX_BUFFER_SIZE = new DataSize(128.0, DataSize.Unit.MEGABYTE);
    private final ScheduledExecutorService scheduler;
    private final PrestoSparkHttpTaskClient workerClient;
    private final LinkedBlockingDeque<SerializedPage> pageBuffer = new LinkedBlockingDeque();
    private final AtomicLong bufferMemoryBytes;
    private ScheduledFuture<?> schedulerFuture;
    private boolean started;

    public HttpNativeExecutionTaskResultFetcher(ScheduledExecutorService scheduler, PrestoSparkHttpTaskClient workerClient) {
        this.scheduler = Objects.requireNonNull(scheduler, "scheduler is null");
        this.workerClient = Objects.requireNonNull(workerClient, "workerClient is null");
        this.bufferMemoryBytes = new AtomicLong();
    }

    public CompletableFuture<Void> start() {
        if (this.started) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "trying to start an already started TaskResultFetcher");
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.schedulerFuture = this.scheduler.scheduleAtFixedRate(new HttpNativeExecutionTaskResultFetcherRunner(this.workerClient, future, this.pageBuffer, this.bufferMemoryBytes), 0L, (long)FETCH_INTERVAL.getValue(), FETCH_INTERVAL.getUnit());
        this.started = true;
        return future.handle((result, throwable) -> {
            this.schedulerFuture.cancel(false);
            if (throwable != null) {
                throw new CompletionException(throwable.getCause());
            }
            return result;
        });
    }

    public void stop() {
        if (this.schedulerFuture != null) {
            this.schedulerFuture.cancel(false);
        }
    }

    public Optional<SerializedPage> pollPage() throws InterruptedException {
        SerializedPage page = this.pageBuffer.poll((long)POLL_TIMEOUT.getValue(), POLL_TIMEOUT.getUnit());
        if (page != null) {
            this.bufferMemoryBytes.addAndGet(-page.getSizeInBytes());
            return Optional.of(page);
        }
        return Optional.empty();
    }

    private static class HttpNativeExecutionTaskResultFetcherRunner
    implements Runnable {
        private static final DataSize MAX_RESPONSE_SIZE = new DataSize(32.0, DataSize.Unit.MEGABYTE);
        private static final int MAX_TRANSPORT_ERROR_RETRIES = 5;
        private final PrestoSparkHttpTaskClient client;
        private final LinkedBlockingDeque<SerializedPage> pageBuffer;
        private final AtomicLong bufferMemoryBytes;
        private final CompletableFuture<Void> future;
        private int transportErrorRetries;
        private long token;

        public HttpNativeExecutionTaskResultFetcherRunner(PrestoSparkHttpTaskClient client, CompletableFuture<Void> future, LinkedBlockingDeque<SerializedPage> pageBuffer, AtomicLong bufferMemoryBytes) {
            this.client = Objects.requireNonNull(client, "client is null");
            this.future = Objects.requireNonNull(future, "future is null");
            this.pageBuffer = Objects.requireNonNull(pageBuffer, "pageBuffer is null");
            this.bufferMemoryBytes = Objects.requireNonNull(bufferMemoryBytes, "bufferMemoryBytes is null");
        }

        @Override
        public void run() {
            try {
                if (this.bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) {
                    return;
                }
                PageBufferClient.PagesResponse pagesResponse = (PageBufferClient.PagesResponse)this.client.getResults(this.token, MAX_RESPONSE_SIZE).get();
                List pages = pagesResponse.getPages();
                long bytes = 0L;
                for (SerializedPage page : pages) {
                    if (!PagesSerdeUtil.isChecksumValid((SerializedPage)page)) {
                        throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.SERIALIZED_PAGE_CHECKSUM_ERROR, String.format("Received corrupted serialized page from host %s", HostAddress.fromUri((URI)this.client.getLocation())));
                    }
                    bytes += (long)page.getSizeInBytes();
                }
                this.pageBuffer.addAll(pages);
                this.bufferMemoryBytes.addAndGet(bytes);
                long nextToken = pagesResponse.getNextToken();
                if (pages.size() > 0) {
                    this.client.acknowledgeResultsAsync(nextToken);
                }
                this.token = nextToken;
                if (pagesResponse.isClientComplete()) {
                    this.client.abortResults();
                    this.future.complete(null);
                }
            }
            catch (InterruptedException e) {
                if (!this.future.isDone()) {
                    this.client.abortResults();
                    this.future.completeExceptionally(e);
                }
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof PageTransportErrorException) {
                    if (++this.transportErrorRetries >= 5) {
                        this.client.abortResults();
                        this.future.completeExceptionally(e.getCause());
                    }
                }
                e.printStackTrace();
                this.client.abortResults();
                this.future.completeExceptionally(e);
            }
        }
    }
}

