/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.execution.BatchTaskUpdateRequest;
import com.facebook.presto.spark.execution.HttpNativeExecutionTaskInfoFetcher;
import com.facebook.presto.spark.execution.HttpNativeExecutionTaskResultFetcher;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

public class NativeExecutionTask {
    private static final Logger log = Logger.get(NativeExecutionTask.class);
    private final Session session;
    private final PlanFragment planFragment;
    private final OutputBuffers outputBuffers;
    private final PrestoSparkHttpTaskClient workerClient;
    private final TableWriteInfo tableWriteInfo;
    private final Optional<String> shuffleWriteInfo;
    private final List<TaskSource> sources;
    private final Executor executor;
    private final HttpNativeExecutionTaskInfoFetcher taskInfoFetcher;
    private final Optional<HttpNativeExecutionTaskResultFetcher> taskResultFetcher;

    public NativeExecutionTask(Session session, URI location, TaskId taskId, PlanFragment planFragment, List<TaskSource> sources, HttpClient httpClient, TableWriteInfo tableWriteInfo, Optional<String> shuffleWriteInfo, Executor executor, ScheduledExecutorService updateScheduledExecutor, ScheduledExecutorService errorRetryScheduledExecutor, JsonCodec<TaskInfo> taskInfoCodec, JsonCodec<PlanFragment> planFragmentCodec, JsonCodec<BatchTaskUpdateRequest> taskUpdateRequestCodec, TaskManagerConfig taskManagerConfig, QueryManagerConfig queryManagerConfig) {
        this.session = Objects.requireNonNull(session, "session is null");
        this.planFragment = Objects.requireNonNull(planFragment, "planFragment is null");
        this.tableWriteInfo = Objects.requireNonNull(tableWriteInfo, "tableWriteInfo is null");
        this.shuffleWriteInfo = Objects.requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null");
        this.sources = Objects.requireNonNull(sources, "sources is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers((OutputBuffers.BufferType)OutputBuffers.BufferType.PARTITIONED);
        Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null");
        this.workerClient = new PrestoSparkHttpTaskClient(Objects.requireNonNull(httpClient, "httpClient is null"), taskId, location, taskInfoCodec, planFragmentCodec, taskUpdateRequestCodec, taskManagerConfig.getInfoRefreshMaxWait());
        Objects.requireNonNull(updateScheduledExecutor, "updateScheduledExecutor is null");
        Objects.requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
        this.taskInfoFetcher = new HttpNativeExecutionTaskInfoFetcher(updateScheduledExecutor, errorRetryScheduledExecutor, this.workerClient, this.executor, taskManagerConfig.getInfoUpdateInterval(), queryManagerConfig.getRemoteTaskMaxErrorDuration());
        this.taskResultFetcher = !shuffleWriteInfo.isPresent() ? Optional.of(new HttpNativeExecutionTaskResultFetcher(updateScheduledExecutor, this.workerClient)) : Optional.empty();
    }

    public Optional<TaskInfo> getTaskInfo() throws RuntimeException {
        return this.taskInfoFetcher.getTaskInfo();
    }

    public Optional<SerializedPage> pollResult() throws InterruptedException {
        if (!this.taskResultFetcher.isPresent()) {
            return Optional.empty();
        }
        return this.taskResultFetcher.get().pollPage();
    }

    public TaskInfo start() {
        TaskInfo taskInfo = this.sendUpdateRequest();
        if (!taskInfo.getTaskStatus().getState().isDone()) {
            log.info("Starting TaskInfoFetcher and TaskResultFetcher.");
            this.taskResultFetcher.ifPresent(fetcher -> fetcher.start());
            this.taskInfoFetcher.start();
        }
        return taskInfo;
    }

    public void stop() {
        this.taskInfoFetcher.stop();
        this.taskResultFetcher.ifPresent(fetcher -> fetcher.stop());
        this.workerClient.abortResults();
    }

    private TaskInfo sendUpdateRequest() {
        try {
            ListenableFuture<BaseResponse<TaskInfo>> future = this.workerClient.updateTask(this.sources, this.planFragment, this.tableWriteInfo, this.shuffleWriteInfo, this.session, this.outputBuffers);
            BaseResponse response = (BaseResponse)future.get();
            if (response.hasValue()) {
                return (TaskInfo)response.getValue();
            }
            String message = String.format("Create-or-update task request didn't return a result. %s: %s", HttpStatus.fromStatusCode((int)response.getStatusCode()), response.getStatusMessage());
            throw new IllegalStateException(message);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

