/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.remotetask;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.BodyGenerator;
import io.airlift.http.client.FullJsonResponseHandler;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.Request;
import io.airlift.http.client.ResponseHandler;
import io.airlift.http.client.StaticBodyGenerator;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.FutureStateChange;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.PartitionedSplitsInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.ScheduledSplit;
import io.trino.execution.SplitAssignment;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PipelinedBufferInfo;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.metadata.Split;
import io.trino.operator.TaskStats;
import io.trino.server.DynamicFilterService;
import io.trino.server.FailTaskRequest;
import io.trino.server.TaskUpdateRequest;
import io.trino.server.remotetask.Backoff;
import io.trino.server.remotetask.ContinuousTaskStatusFetcher;
import io.trino.server.remotetask.DynamicFiltersFetcher;
import io.trino.server.remotetask.RemoteTaskStats;
import io.trino.server.remotetask.RequestErrorTracker;
import io.trino.server.remotetask.SimpleHttpResponseCallback;
import io.trino.server.remotetask.SimpleHttpResponseHandler;
import io.trino.server.remotetask.TaskInfoFetcher;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoTransportException;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.util.Failures;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import org.joda.time.DateTime;

public final class HttpRemoteTask
implements RemoteTask {
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final Session session;
    private final String nodeId;
    private final PlanFragment planFragment;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final RemoteTaskStats stats;
    private final TaskInfoFetcher taskInfoFetcher;
    private final ContinuousTaskStatusFetcher taskStatusFetcher;
    private final DynamicFiltersFetcher dynamicFiltersFetcher;
    private final DynamicFiltersCollector outboundDynamicFiltersCollector;
    private final AtomicLong sentDynamicFiltersVersion = new AtomicLong(0L);
    private final AtomicReference<Future<?>> currentRequest = new AtomicReference();
    @GuardedBy(value="this")
    private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = HashMultimap.create();
    private final int maxUnacknowledgedSplits;
    @GuardedBy(value="this")
    private volatile int pendingSourceSplitCount;
    @GuardedBy(value="this")
    private volatile long pendingSourceSplitsWeight;
    @GuardedBy(value="this")
    private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<PlanNodeId, Boolean>();
    private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference();
    private final FutureStateChange<Void> whenSplitQueueHasSpace = new FutureStateChange();
    @GuardedBy(value="this")
    private boolean splitQueueHasSpace = true;
    @GuardedBy(value="this")
    private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty();
    private final boolean summarizeTaskInfo;
    private final HttpClient httpClient;
    private final Executor executor;
    private final ScheduledExecutorService errorScheduledExecutor;
    private final Duration maxErrorDuration;
    private final JsonCodec<TaskInfo> taskInfoCodec;
    private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;
    private final JsonCodec<FailTaskRequest> failTaskRequestCodec;
    private final RequestErrorTracker updateErrorTracker;
    private final AtomicInteger pendingRequestsCounter = new AtomicInteger(0);
    private final AtomicBoolean sendPlan = new AtomicBoolean(true);
    private final NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean aborting = new AtomicBoolean(false);

    public HttpRemoteTask(Session session, TaskId taskId, String nodeId, URI location, PlanFragment planFragment, Multimap<PlanNodeId, Split> initialSplits, OutputBuffers outputBuffers, HttpClient httpClient, Executor executor, ScheduledExecutorService updateScheduledExecutor, ScheduledExecutorService errorScheduledExecutor, Duration maxErrorDuration, Duration taskStatusRefreshMaxWait, Duration taskInfoUpdateInterval, boolean summarizeTaskInfo, JsonCodec<TaskStatus> taskStatusCodec, JsonCodec<DynamicFiltersCollector.VersionedDynamicFilterDomains> dynamicFilterDomainsCodec, JsonCodec<TaskInfo> taskInfoCodec, JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec, JsonCodec<FailTaskRequest> failTaskRequestCodec, NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker, RemoteTaskStats stats, DynamicFilterService dynamicFilterService, Set<DynamicFilterId> outboundDynamicFilterIds, Optional<DataSize> estimatedMemory) {
        Objects.requireNonNull(session, "session is null");
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(nodeId, "nodeId is null");
        Objects.requireNonNull(location, "location is null");
        Objects.requireNonNull(planFragment, "planFragment is null");
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        Objects.requireNonNull(httpClient, "httpClient is null");
        Objects.requireNonNull(executor, "executor is null");
        Objects.requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        Objects.requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
        Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        Objects.requireNonNull(stats, "stats is null");
        Objects.requireNonNull(outboundDynamicFilterIds, "outboundDynamicFilterIds is null");
        Objects.requireNonNull(estimatedMemory, "estimatedMemory is null");
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{taskId});){
            this.taskId = taskId;
            this.session = session;
            this.nodeId = nodeId;
            this.planFragment = planFragment;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = errorScheduledExecutor;
            this.maxErrorDuration = Objects.requireNonNull(maxErrorDuration, "maxErrorDuration is null");
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.failTaskRequestCodec = failTaskRequestCodec;
            this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
            this.partitionedSplitCountTracker = Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.stats = stats;
            for (Map.Entry entry : initialSplits.entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(this.nextSplitId.getAndIncrement(), (PlanNodeId)entry.getKey(), (Split)entry.getValue());
                this.pendingSplits.put((Object)((PlanNodeId)entry.getKey()), (Object)scheduledSplit);
            }
            this.maxUnacknowledgedSplits = SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask(session);
            int pendingSourceSplitCount = 0;
            long pendingSourceSplitsWeight = 0L;
            for (PlanNodeId planNodeId : planFragment.getPartitionedSources()) {
                Collection tableScanSplits = initialSplits.get((Object)planNodeId);
                if (tableScanSplits.isEmpty()) continue;
                pendingSourceSplitCount += tableScanSplits.size();
                pendingSourceSplitsWeight = Math.addExact(pendingSourceSplitsWeight, SplitWeight.rawValueSum((Collection)tableScanSplits, Split::getSplitWeight));
            }
            this.pendingSourceSplitCount = pendingSourceSplitCount;
            this.pendingSourceSplitsWeight = pendingSourceSplitsWeight;
            Optional<List<PipelinedBufferInfo>> pipelinedBufferStates = Optional.empty();
            if (outputBuffers instanceof PipelinedOutputBuffers) {
                PipelinedOutputBuffers buffers = (PipelinedOutputBuffers)outputBuffers;
                pipelinedBufferStates = Optional.of((List)buffers.getBuffers().keySet().stream().map(outputId -> new PipelinedBufferInfo((PipelinedOutputBuffers.OutputBufferId)outputId, 0L, 0L, 0, 0L, 0L, false)).collect(ImmutableList.toImmutableList()));
            }
            TaskInfo initialTask = TaskInfo.createInitialTask(taskId, location, nodeId, pipelinedBufferStates, new TaskStats(DateTime.now(), null));
            this.dynamicFiltersFetcher = new DynamicFiltersFetcher(this::fail, taskId, location, taskStatusRefreshMaxWait, dynamicFilterDomainsCodec, executor, httpClient, maxErrorDuration, errorScheduledExecutor, stats, dynamicFilterService);
            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(this::fail, initialTask.getTaskStatus(), taskStatusRefreshMaxWait, taskStatusCodec, this.dynamicFiltersFetcher, executor, httpClient, maxErrorDuration, errorScheduledExecutor, stats);
            this.taskInfoFetcher = new TaskInfoFetcher(this::fail, this.taskStatusFetcher, initialTask, httpClient, taskInfoUpdateInterval, taskInfoCodec, maxErrorDuration, summarizeTaskInfo, executor, updateScheduledExecutor, errorScheduledExecutor, stats, estimatedMemory);
            this.taskStatusFetcher.addStateChangeListener(newStatus -> {
                TaskState state = newStatus.getState();
                if (state.isDone()) {
                    this.cleanUpTask();
                } else {
                    partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
                    this.updateSplitQueueSpace();
                }
            });
            this.outboundDynamicFiltersCollector = new DynamicFiltersCollector(this::triggerUpdate);
            dynamicFilterService.registerDynamicFilterConsumer(taskId.getQueryId(), taskId.getAttemptId(), outboundDynamicFilterIds, this.outboundDynamicFiltersCollector::updateDomains);
            partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
            this.updateSplitQueueSpace();
        }
    }

    @Override
    public TaskId getTaskId() {
        return this.taskId;
    }

    @Override
    public String getNodeId() {
        return this.nodeId;
    }

    @Override
    public TaskInfo getTaskInfo() {
        return this.taskInfoFetcher.getTaskInfo();
    }

    @Override
    public TaskStatus getTaskStatus() {
        return this.taskStatusFetcher.getTaskStatus();
    }

    @Override
    public void start() {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.started.set(true);
            this.triggerUpdate();
            this.dynamicFiltersFetcher.start();
            this.taskStatusFetcher.start();
            this.taskInfoFetcher.start();
        }
    }

    @Override
    public synchronized void addSplits(Multimap<PlanNodeId, Split> splitsBySource) {
        Objects.requireNonNull(splitsBySource, "splitsBySource is null");
        if (this.getTaskStatus().getState().isDone()) {
            return;
        }
        boolean needsUpdate = false;
        for (Map.Entry entry : splitsBySource.asMap().entrySet()) {
            PlanNodeId sourceId = (PlanNodeId)entry.getKey();
            Collection splits = (Collection)entry.getValue();
            boolean isPartitionedSource = this.planFragment.isPartitionedSources(sourceId);
            Preconditions.checkState((!this.noMoreSplits.containsKey(sourceId) ? 1 : 0) != 0, (String)"noMoreSplits has already been set for %s", (Object)sourceId);
            int added = 0;
            long addedWeight = 0L;
            for (Split split : splits) {
                if (!this.pendingSplits.put((Object)sourceId, (Object)new ScheduledSplit(this.nextSplitId.getAndIncrement(), sourceId, split)) || !isPartitionedSource) continue;
                ++added;
                addedWeight = Math.addExact(addedWeight, split.getSplitWeight().getRawValue());
            }
            if (isPartitionedSource) {
                this.pendingSourceSplitCount += added;
                this.pendingSourceSplitsWeight = Math.addExact(this.pendingSourceSplitsWeight, addedWeight);
                this.partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
            }
            needsUpdate = true;
        }
        this.updateSplitQueueSpace();
        if (needsUpdate) {
            this.triggerUpdate();
        }
    }

    @Override
    public synchronized void noMoreSplits(PlanNodeId sourceId) {
        if (this.noMoreSplits.containsKey(sourceId)) {
            return;
        }
        this.noMoreSplits.put(sourceId, true);
        this.triggerUpdate();
    }

    @Override
    public void setOutputBuffers(OutputBuffers newOutputBuffers) {
        if (this.getTaskStatus().getState().isDone()) {
            return;
        }
        long previousVersion = this.outputBuffers.getAndUpdate(previousOutputBuffers -> {
            if (newOutputBuffers.getVersion() > previousOutputBuffers.getVersion()) {
                return newOutputBuffers;
            }
            return previousOutputBuffers;
        }).getVersion();
        if (newOutputBuffers.getVersion() > previousVersion) {
            this.triggerUpdate();
        }
    }

    @Override
    public PartitionedSplitsInfo getPartitionedSplitsInfo() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return PartitionedSplitsInfo.forZeroSplits();
        }
        PartitionedSplitsInfo unacknowledgedSplitsInfo = this.getUnacknowledgedPartitionedSplitsInfo();
        int count = unacknowledgedSplitsInfo.getCount() + taskStatus.getQueuedPartitionedDrivers() + taskStatus.getRunningPartitionedDrivers();
        long weight = unacknowledgedSplitsInfo.getWeightSum() + taskStatus.getQueuedPartitionedSplitsWeight() + taskStatus.getRunningPartitionedSplitsWeight();
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
    }

    public PartitionedSplitsInfo getUnacknowledgedPartitionedSplitsInfo() {
        int count = this.pendingSourceSplitCount;
        long weight = this.pendingSourceSplitsWeight;
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
    }

    @Override
    public PartitionedSplitsInfo getQueuedPartitionedSplitsInfo() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return PartitionedSplitsInfo.forZeroSplits();
        }
        PartitionedSplitsInfo unacknowledgedSplitsInfo = this.getUnacknowledgedPartitionedSplitsInfo();
        int count = unacknowledgedSplitsInfo.getCount() + taskStatus.getQueuedPartitionedDrivers();
        long weight = unacknowledgedSplitsInfo.getWeightSum() + taskStatus.getQueuedPartitionedSplitsWeight();
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
    }

    @Override
    public int getUnacknowledgedPartitionedSplitCount() {
        return this.getPendingSourceSplitCount();
    }

    @Override
    public SpoolingOutputStats.Snapshot retrieveAndDropSpoolingOutputStats() {
        return this.taskInfoFetcher.retrieveAndDropSpoolingOutputStats();
    }

    private int getPendingSourceSplitCount() {
        return this.pendingSourceSplitCount;
    }

    private long getQueuedPartitionedSplitsWeight() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return 0L;
        }
        return this.getPendingSourceSplitsWeight() + taskStatus.getQueuedPartitionedSplitsWeight();
    }

    private long getPendingSourceSplitsWeight() {
        return this.pendingSourceSplitsWeight;
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus> stateChangeListener) {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.taskStatusFetcher.addStateChangeListener(stateChangeListener);
        }
    }

    @Override
    public void addFinalTaskInfoListener(StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
        this.taskInfoFetcher.addFinalTaskInfoListener(stateChangeListener);
    }

    @Override
    public synchronized ListenableFuture<Void> whenSplitQueueHasSpace(long weightThreshold) {
        if (this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            Preconditions.checkArgument((weightThreshold == this.whenSplitQueueHasSpaceThreshold.getAsLong() ? 1 : 0) != 0, (Object)"Multiple split queue space notification thresholds not supported");
        } else {
            this.whenSplitQueueHasSpaceThreshold = OptionalLong.of(weightThreshold);
            this.updateSplitQueueSpace();
        }
        if (this.splitQueueHasSpace) {
            return Futures.immediateVoidFuture();
        }
        return this.whenSplitQueueHasSpace.createNewListener();
    }

    private synchronized void updateSplitQueueSpace() {
        boolean bl = this.splitQueueHasSpace = this.getUnacknowledgedPartitionedSplitCount() < this.maxUnacknowledgedSplits && (this.whenSplitQueueHasSpaceThreshold.isEmpty() || this.getQueuedPartitionedSplitsWeight() < this.whenSplitQueueHasSpaceThreshold.getAsLong());
        if (this.splitQueueHasSpace && this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
    }

    private synchronized void processTaskUpdate(TaskInfo newValue, List<SplitAssignment> splitAssignments) {
        this.updateTaskInfo(newValue);
        for (SplitAssignment assignment : splitAssignments) {
            PlanNodeId planNodeId = assignment.getPlanNodeId();
            boolean isPartitionedSource = this.planFragment.isPartitionedSources(planNodeId);
            int removed = 0;
            long removedWeight = 0L;
            for (ScheduledSplit split : assignment.getSplits()) {
                if (!this.pendingSplits.remove((Object)planNodeId, (Object)split) || !isPartitionedSource) continue;
                ++removed;
                removedWeight = Math.addExact(removedWeight, split.getSplit().getSplitWeight().getRawValue());
            }
            if (assignment.isNoMoreSplits()) {
                this.noMoreSplits.put(planNodeId, false);
            }
            if (!isPartitionedSource) continue;
            this.pendingSourceSplitCount -= removed;
            this.pendingSourceSplitsWeight -= removedWeight;
        }
        this.partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
        this.updateSplitQueueSpace();
    }

    private void updateTaskInfo(TaskInfo taskInfo) {
        this.taskStatusFetcher.updateTaskStatus(taskInfo.getTaskStatus());
        this.taskInfoFetcher.updateTaskInfo(taskInfo);
    }

    private void scheduleUpdate() {
        this.executor.execute(this::sendUpdate);
    }

    private void triggerUpdate() {
        if (!this.started.get()) {
            return;
        }
        if (this.pendingRequestsCounter.getAndIncrement() == 0) {
            this.scheduleUpdate();
        }
    }

    private void sendUpdate() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return;
        }
        Preconditions.checkState((boolean)this.started.get());
        int currentPendingRequestsCounter = this.pendingRequestsCounter.get();
        Preconditions.checkState((currentPendingRequestsCounter > 0 ? 1 : 0) != 0, (Object)"sendUpdate shouldn't be called without pending requests");
        ListenableFuture<Void> errorRateLimit = this.updateErrorTracker.acquireRequestPermit();
        if (!errorRateLimit.isDone()) {
            errorRateLimit.addListener(this::sendUpdate, this.executor);
            return;
        }
        List<SplitAssignment> splitAssignments = this.getSplitAssignments();
        DynamicFiltersCollector.VersionedDynamicFilterDomains dynamicFilterDomains = this.outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(this.sentDynamicFiltersVersion.get());
        Optional<PlanFragment> fragment = this.sendPlan.get() ? Optional.of(this.planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
        TaskUpdateRequest updateRequest = new TaskUpdateRequest(this.session.toSessionRepresentation(), this.session.getIdentity().getExtraCredentials(), fragment, splitAssignments, this.outputBuffers.get(), dynamicFilterDomains.getDynamicFilterDomains(), this.session.getExchangeEncryptionKey());
        byte[] taskUpdateRequestJson = this.taskUpdateRequestCodec.toJsonBytes((Object)updateRequest);
        if (fragment.isPresent()) {
            this.stats.updateWithPlanBytes(taskUpdateRequestJson.length);
        }
        if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
            this.stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
        }
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(taskStatus);
        Request request = Request.Builder.preparePost().setUri(uriBuilder.build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator((BodyGenerator)StaticBodyGenerator.createStaticBodyGenerator((byte[])taskUpdateRequestJson)).build();
        this.updateErrorTracker.startRequest();
        HttpClient.HttpResponseFuture future = this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec));
        Preconditions.checkState((this.currentRequest.getAndSet((Future<?>)future) == null ? 1 : 0) != 0, (Object)"There should be no previous request running");
        Futures.addCallback((ListenableFuture)future, new SimpleHttpResponseHandler<TaskInfo>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion(), System.nanoTime(), currentPendingRequestsCounter), request.getUri(), this.stats), (Executor)this.executor);
    }

    private synchronized List<SplitAssignment> getSplitAssignments() {
        return (List)Stream.concat(this.planFragment.getPartitionedSourceNodes().stream(), this.planFragment.getRemoteSourceNodes().stream()).filter(Objects::nonNull).map(PlanNode::getId).map(this::getSplitAssignment).filter(Objects::nonNull).collect(ImmutableList.toImmutableList());
    }

    private synchronized SplitAssignment getSplitAssignment(PlanNodeId planNodeId) {
        Set splits = this.pendingSplits.get((Object)planNodeId);
        boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId));
        boolean noMoreSplits = this.noMoreSplits.containsKey(planNodeId);
        SplitAssignment assignment = null;
        if (!splits.isEmpty() || pendingNoMoreSplits) {
            assignment = new SplitAssignment(planNodeId, splits, noMoreSplits);
        }
        return assignment;
    }

    @Override
    public synchronized void cancel() {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            TaskStatus taskStatus = this.getTaskStatus();
            if (taskStatus.getState().isDone()) {
                return;
            }
            this.scheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), "cancel", false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanUpTask() {
        Preconditions.checkState((boolean)this.getTaskStatus().getState().isDone(), (Object)"attempt to clean up a task that is not done yet");
        HttpRemoteTask httpRemoteTask = this;
        synchronized (httpRemoteTask) {
            this.pendingSplits.clear();
            this.pendingSourceSplitCount = 0;
            this.pendingSourceSplitsWeight = 0L;
            this.partitionedSplitCountTracker.setPartitionedSplits(PartitionedSplitsInfo.forZeroSplits());
            this.splitQueueHasSpace = true;
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
        this.outboundDynamicFiltersCollector.acknowledge(Long.MAX_VALUE);
        Future request = this.currentRequest.getAndSet(null);
        if (request != null) {
            request.cancel(true);
        }
        this.taskStatusFetcher.stop();
        this.scheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), "cleanup", true);
    }

    @Override
    public synchronized void abort() {
        if (this.getTaskStatus().getState().isDone()) {
            return;
        }
        TaskStatus status = TaskStatus.failWith(this.getTaskStatus(), TaskState.ABORTED, (List<ExecutionFailureInfo>)ImmutableList.of());
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            this.taskStatusFetcher.updateTaskStatus(status);
            this.scheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), "abort", true);
        }
    }

    private void scheduleAsyncCleanupRequest(Backoff cleanupBackoff, String action, boolean abort) {
        this.scheduleAsyncCleanupRequest(cleanupBackoff, action, (Supplier<Request>)((Supplier)() -> this.buildDeleteTaskRequest(abort)));
    }

    private void scheduleAsyncCleanupRequest(Backoff cleanupBackoff, String action, FailTaskRequest failTaskRequest) {
        this.scheduleAsyncCleanupRequest(cleanupBackoff, action, (Supplier<Request>)((Supplier)() -> this.buildFailTaskRequest(failTaskRequest)));
    }

    private void scheduleAsyncCleanupRequest(Backoff cleanupBackoff, String action, Supplier<Request> remoteRequestSupplier) {
        if (!this.aborting.compareAndSet(false, true)) {
            return;
        }
        Request request = (Request)remoteRequestSupplier.get();
        this.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action);
    }

    private Request buildDeleteTaskRequest(boolean abort) {
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(this.getTaskStatus()).addParameter("abort", new String[]{"" + abort});
        return Request.Builder.prepareDelete().setUri(uriBuilder.build()).build();
    }

    private Request buildFailTaskRequest(FailTaskRequest failTaskRequest) {
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(this.getTaskStatus());
        uriBuilder = uriBuilder.appendPath("fail");
        return Request.Builder.preparePost().setUri(uriBuilder.build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator((BodyGenerator)StaticBodyGenerator.createStaticBodyGenerator((byte[])this.failTaskRequestCodec.toJsonBytes((Object)failTaskRequest))).build();
    }

    private void doScheduleAsyncCleanupRequest(final Backoff cleanupBackoff, final Request request, final String action) {
        Futures.addCallback((ListenableFuture)this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec)), (FutureCallback)new FutureCallback<FullJsonResponseHandler.JsonResponse<TaskInfo>>(){

            public void onSuccess(FullJsonResponseHandler.JsonResponse<TaskInfo> result) {
                try {
                    HttpRemoteTask.this.updateTaskInfo((TaskInfo)result.getValue());
                }
                finally {
                    if (!HttpRemoteTask.this.getTaskInfo().getTaskStatus().getState().isDone()) {
                        HttpRemoteTask.this.cleanUpLocally();
                    }
                }
            }

            public void onFailure(Throwable t) {
                if (t instanceof RejectedExecutionException && HttpRemoteTask.this.httpClient.isClosed()) {
                    RequestErrorTracker.logError(t, "Unable to %s task at %s. HTTP client is closed.", action, request.getUri());
                    HttpRemoteTask.this.cleanUpLocally();
                    return;
                }
                if (cleanupBackoff.failure()) {
                    RequestErrorTracker.logError(t, "Unable to %s task at %s. Back off depleted.", action, request.getUri());
                    HttpRemoteTask.this.cleanUpLocally();
                    return;
                }
                if (HttpRemoteTask.this.taskInfoFetcher.getTaskInfo().getTaskStatus().getState().isDone()) {
                    return;
                }
                long delayNanos = cleanupBackoff.getBackoffDelayNanos();
                if (delayNanos == 0L) {
                    HttpRemoteTask.this.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action);
                } else {
                    HttpRemoteTask.this.errorScheduledExecutor.schedule(() -> HttpRemoteTask.this.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action), delayNanos, TimeUnit.NANOSECONDS);
                }
            }
        }, (Executor)this.executor);
    }

    @Override
    public synchronized void fail(Throwable cause) {
        TaskStatus taskStatus = this.getTaskStatus();
        if (!taskStatus.getState().isDone()) {
            log.debug(cause, "Remote task %s failed with %s", new Object[]{taskStatus.getSelf(), cause});
        }
        TaskStatus status = TaskStatus.failWith(this.getTaskStatus(), TaskState.FAILED, (List<ExecutionFailureInfo>)ImmutableList.of((Object)Failures.toFailure(cause)));
        this.taskStatusFetcher.updateTaskStatus(status);
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            if (cause instanceof TrinoTransportException) {
                this.cleanUpLocally();
            } else {
                this.scheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), "abort", true);
            }
        }
    }

    @Override
    public synchronized void failRemotely(Throwable cause) {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});){
            TaskStatus taskStatus = this.getTaskStatus();
            if (taskStatus.getState().isDone()) {
                return;
            }
            this.scheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), "fail", new FailTaskRequest(Failures.toFailure(cause)));
        }
    }

    private void cleanUpLocally() {
        this.updateTaskInfo(this.getTaskInfo().withTaskStatus(this.getTaskStatus()));
    }

    private HttpUriBuilder getHttpUriBuilder(TaskStatus taskStatus) {
        HttpUriBuilder uriBuilder = HttpUriBuilder.uriBuilderFrom((URI)taskStatus.getSelf());
        if (this.summarizeTaskInfo) {
            uriBuilder.addParameter("summarize", new String[0]);
        }
        return uriBuilder;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).addValue((Object)this.getTaskInfo()).toString();
    }

    private class UpdateResponseHandler
    implements SimpleHttpResponseCallback<TaskInfo> {
        private final List<SplitAssignment> splitAssignments;
        private final long currentRequestDynamicFiltersVersion;
        private final long currentRequestStartNanos;
        private final int currentPendingRequestsCounter;

        private UpdateResponseHandler(List<SplitAssignment> splitAssignments, long currentRequestDynamicFiltersVersion, long currentRequestStartNanos, int currentPendingRequestsCounter) {
            this.splitAssignments = ImmutableList.copyOf((Collection)Objects.requireNonNull(splitAssignments, "splitAssignments is null"));
            this.currentRequestDynamicFiltersVersion = currentRequestDynamicFiltersVersion;
            this.currentRequestStartNanos = currentRequestStartNanos;
            this.currentPendingRequestsCounter = currentPendingRequestsCounter;
        }

        @Override
        public void success(TaskInfo value) {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                HttpRemoteTask.this.sentDynamicFiltersVersion.set(this.currentRequestDynamicFiltersVersion);
                HttpRemoteTask.this.outboundDynamicFiltersCollector.acknowledge(this.currentRequestDynamicFiltersVersion);
                HttpRemoteTask.this.sendPlan.set(value.isNeedsPlan());
                HttpRemoteTask.this.currentRequest.set(null);
                this.updateStats();
                HttpRemoteTask.this.updateErrorTracker.requestSucceeded();
                if (HttpRemoteTask.this.pendingRequestsCounter.addAndGet(-this.currentPendingRequestsCounter) > 0) {
                    HttpRemoteTask.this.scheduleUpdate();
                }
                HttpRemoteTask.this.processTaskUpdate(value, this.splitAssignments);
            }
        }

        @Override
        public void failed(Throwable cause) {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                try {
                    HttpRemoteTask.this.currentRequest.set(null);
                    this.updateStats();
                    TaskStatus taskStatus = HttpRemoteTask.this.getTaskStatus();
                    if (!taskStatus.getState().isDone()) {
                        HttpRemoteTask.this.updateErrorTracker.requestFailed(cause);
                    }
                    HttpRemoteTask.this.scheduleUpdate();
                }
                catch (Error e) {
                    HttpRemoteTask.this.fail(e);
                    throw e;
                }
                catch (RuntimeException e) {
                    HttpRemoteTask.this.fail(e);
                }
            }
        }

        @Override
        public void fatal(Throwable cause) {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});){
                HttpRemoteTask.this.fail(cause);
            }
        }

        private void updateStats() {
            Duration requestRoundTrip = Duration.nanosSince((long)this.currentRequestStartNanos);
            HttpRemoteTask.this.stats.updateRoundTripMillis(requestRoundTrip.toMillis());
        }
    }
}

