/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.remotetask;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.BodyGenerator;
import io.airlift.http.client.FullJsonResponseHandler;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.Request;
import io.airlift.http.client.ResponseHandler;
import io.airlift.http.client.StaticBodyGenerator;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.FutureStateChange;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.PartitionedSplitsInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.ScheduledSplit;
import io.trino.execution.SplitAssignment;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PipelinedBufferInfo;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.operator.RetryPolicy;
import io.trino.operator.TaskStats;
import io.trino.server.DynamicFilterService;
import io.trino.server.FailTaskRequest;
import io.trino.server.TaskUpdateRequest;
import io.trino.server.remotetask.Backoff;
import io.trino.server.remotetask.ContinuousTaskStatusFetcher;
import io.trino.server.remotetask.DynamicFiltersFetcher;
import io.trino.server.remotetask.RemoteTaskStats;
import io.trino.server.remotetask.RequestErrorTracker;
import io.trino.server.remotetask.SimpleHttpResponseCallback;
import io.trino.server.remotetask.SimpleHttpResponseHandler;
import io.trino.server.remotetask.TaskInfoFetcher;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.TrinoTransportException;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.tracing.TrinoAttributes;
import io.trino.util.Failures;
import jakarta.ws.rs.ServiceUnavailableException;
import java.net.URI;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.joda.time.DateTime;

public final class HttpRemoteTask
implements RemoteTask {
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final Session session;
    private final Span stageSpan;
    private final String nodeId;
    private final AtomicBoolean speculative;
    private final PlanFragment planFragment;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final RemoteTaskStats stats;
    private final Tracer tracer;
    private final Span span;
    private final TaskInfoFetcher taskInfoFetcher;
    private final ContinuousTaskStatusFetcher taskStatusFetcher;
    private final DynamicFiltersFetcher dynamicFiltersFetcher;
    private final DynamicFiltersCollector outboundDynamicFiltersCollector;
    private final AtomicLong sentDynamicFiltersVersion = new AtomicLong(0L);
    private final AtomicLong terminationStartedNanos = new AtomicLong();
    private final AtomicReference<Future<?>> currentRequest = new AtomicReference();
    @GuardedBy(value="this")
    private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = LinkedHashMultimap.create();
    private final int maxUnacknowledgedSplits;
    @GuardedBy(value="this")
    private volatile int pendingSourceSplitCount;
    @GuardedBy(value="this")
    private volatile long pendingSourceSplitsWeight;
    @GuardedBy(value="this")
    private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<PlanNodeId, Boolean>();
    private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference();
    private final FutureStateChange<Void> whenSplitQueueHasSpace = new FutureStateChange();
    @GuardedBy(value="this")
    private boolean splitQueueHasSpace = true;
    @GuardedBy(value="this")
    private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty();
    @VisibleForTesting
    final AtomicInteger splitBatchSize;
    private final boolean summarizeTaskInfo;
    private final HttpClient httpClient;
    private final Executor executor;
    private final ScheduledExecutorService errorScheduledExecutor;
    private final Duration maxErrorDuration;
    private final Duration taskTerminationTimeout;
    private final JsonCodec<TaskInfo> taskInfoCodec;
    private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;
    private final JsonCodec<FailTaskRequest> failTaskRequestCodec;
    private final RequestErrorTracker updateErrorTracker;
    private final AtomicInteger pendingRequestsCounter = new AtomicInteger(0);
    private final AtomicBoolean sendPlan = new AtomicBoolean(true);
    private final NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean terminating = new AtomicBoolean(false);
    private final AtomicBoolean cleanedUp = new AtomicBoolean(false);
    private final int guaranteedSplitsPerRequest;
    private final long maxRequestSizeInBytes;
    private final long requestSizeHeadroomInBytes;
    private final boolean adaptiveUpdateRequestSizeEnabled;

    public HttpRemoteTask(Session session, Span stageSpan, TaskId taskId, InternalNode node, boolean speculative, URI location, PlanFragment planFragment, Multimap<PlanNodeId, Split> initialSplits, OutputBuffers outputBuffers, HttpClient httpClient, Executor executor, ScheduledExecutorService updateScheduledExecutor, ScheduledExecutorService errorScheduledExecutor, Duration maxErrorDuration, Duration taskStatusRefreshMaxWait, Duration taskInfoUpdateInterval, Duration taskTerminationTimeout, boolean summarizeTaskInfo, JsonCodec<TaskStatus> taskStatusCodec, JsonCodec<DynamicFiltersCollector.VersionedDynamicFilterDomains> dynamicFilterDomainsCodec, JsonCodec<TaskInfo> taskInfoCodec, JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec, JsonCodec<FailTaskRequest> failTaskRequestCodec, NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker, Tracer tracer, RemoteTaskStats stats, DynamicFilterService dynamicFilterService, Set<DynamicFilterId> outboundDynamicFilterIds, Optional<DataSize> estimatedMemory) {
        Objects.requireNonNull(session, "session is null");
        Objects.requireNonNull(stageSpan, "stageSpan is null");
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(node, "node is null");
        Objects.requireNonNull(location, "location is null");
        Objects.requireNonNull(planFragment, "planFragment is null");
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        Objects.requireNonNull(httpClient, "httpClient is null");
        Objects.requireNonNull(executor, "executor is null");
        Objects.requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        Objects.requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
        Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        Objects.requireNonNull(stats, "stats is null");
        Objects.requireNonNull(outboundDynamicFilterIds, "outboundDynamicFilterIds is null");
        Objects.requireNonNull(estimatedMemory, "estimatedMemory is null");
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(taskId));){
            this.taskId = taskId;
            this.session = session;
            this.stageSpan = stageSpan;
            this.nodeId = node.getNodeIdentifier();
            this.speculative = new AtomicBoolean(speculative);
            this.planFragment = planFragment;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = errorScheduledExecutor;
            this.maxErrorDuration = Objects.requireNonNull(maxErrorDuration, "maxErrorDuration is null");
            this.taskTerminationTimeout = Objects.requireNonNull(taskTerminationTimeout, "taskTerminationTimeout is null");
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.failTaskRequestCodec = failTaskRequestCodec;
            this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
            this.partitionedSplitCountTracker = Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.tracer = Objects.requireNonNull(tracer, "tracer is null");
            this.span = this.createSpanBuilder("remote-task", stageSpan).startSpan();
            this.stats = stats;
            for (Map.Entry entry : initialSplits.entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(this.nextSplitId.getAndIncrement(), (PlanNodeId)entry.getKey(), (Split)entry.getValue());
                this.pendingSplits.put((Object)((PlanNodeId)entry.getKey()), (Object)scheduledSplit);
            }
            this.maxUnacknowledgedSplits = SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask(session);
            this.guaranteedSplitsPerRequest = SystemSessionProperties.getRemoteTaskGuaranteedSplitsPerRequest(session);
            this.maxRequestSizeInBytes = SystemSessionProperties.getMaxRemoteTaskRequestSize(session).toBytes();
            this.requestSizeHeadroomInBytes = SystemSessionProperties.getRemoteTaskRequestSizeHeadroom(session).toBytes();
            this.splitBatchSize = new AtomicInteger(this.maxUnacknowledgedSplits);
            long numOfPartitionedSources = planFragment.getPartitionedSources().size();
            boolean bl = this.adaptiveUpdateRequestSizeEnabled = numOfPartitionedSources == 1L && SystemSessionProperties.isRemoteTaskAdaptiveUpdateRequestSizeEnabled(session);
            if (numOfPartitionedSources > 1L) {
                log.debug("%s - There are more than one partitioned sources: numOfPartitionedSources=%s", new Object[]{taskId, planFragment.getPartitionedSources().size()});
            }
            int pendingSourceSplitCount = 0;
            long pendingSourceSplitsWeight = 0L;
            for (PlanNodeId planNodeId : planFragment.getPartitionedSources()) {
                Collection tableScanSplits = initialSplits.get((Object)planNodeId);
                if (tableScanSplits.isEmpty()) continue;
                pendingSourceSplitCount += tableScanSplits.size();
                pendingSourceSplitsWeight = Math.addExact(pendingSourceSplitsWeight, SplitWeight.rawValueSum((Collection)tableScanSplits, Split::getSplitWeight));
            }
            this.pendingSourceSplitCount = pendingSourceSplitCount;
            this.pendingSourceSplitsWeight = pendingSourceSplitsWeight;
            Optional<List<PipelinedBufferInfo>> pipelinedBufferStates = Optional.empty();
            if (outputBuffers instanceof PipelinedOutputBuffers) {
                PipelinedOutputBuffers buffers = (PipelinedOutputBuffers)outputBuffers;
                pipelinedBufferStates = Optional.of((List)buffers.getBuffers().keySet().stream().map(outputId -> new PipelinedBufferInfo((PipelinedOutputBuffers.OutputBufferId)outputId, 0L, 0L, 0, 0L, 0L, false)).collect(ImmutableList.toImmutableList()));
            }
            TaskInfo initialTask = TaskInfo.createInitialTask(taskId, location, this.nodeId, this.speculative.get(), pipelinedBufferStates, new TaskStats(DateTime.now(), null));
            this.dynamicFiltersFetcher = new DynamicFiltersFetcher(this::fatalUnacknowledgedFailure, taskId, location, taskStatusRefreshMaxWait, dynamicFilterDomainsCodec, executor, httpClient, () -> this.createSpanBuilder("task-dynamic-filters", this.span), maxErrorDuration, errorScheduledExecutor, stats, dynamicFilterService);
            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(this::fatalUnacknowledgedFailure, initialTask.taskStatus(), taskStatusRefreshMaxWait, taskStatusCodec, this.dynamicFiltersFetcher, executor, httpClient, () -> this.createSpanBuilder("task-status", this.span), maxErrorDuration, errorScheduledExecutor, stats);
            RetryPolicy retryPolicy = SystemSessionProperties.getRetryPolicy(session);
            this.taskInfoFetcher = new TaskInfoFetcher(this::fatalUnacknowledgedFailure, this.taskStatusFetcher, initialTask, httpClient, () -> this.createSpanBuilder("task-info", this.span), taskInfoUpdateInterval, taskInfoCodec, maxErrorDuration, summarizeTaskInfo, executor, updateScheduledExecutor, errorScheduledExecutor, stats, estimatedMemory, retryPolicy);
            this.taskStatusFetcher.addStateChangeListener(newStatus -> {
                TaskState state = newStatus.getState();
                if (state.isTerminatingOrDone()) {
                    this.cleanUpTask(state);
                } else {
                    partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
                    this.updateSplitQueueSpace();
                }
                if (state.isDone()) {
                    this.span.end();
                }
            });
            this.outboundDynamicFiltersCollector = new DynamicFiltersCollector(this::triggerUpdate);
            dynamicFilterService.registerDynamicFilterConsumer(taskId.getQueryId(), taskId.getAttemptId(), outboundDynamicFilterIds, this.outboundDynamicFiltersCollector::updateDomains);
            partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
            this.updateSplitQueueSpace();
        }
    }

    @Override
    public TaskId getTaskId() {
        return this.taskId;
    }

    @Override
    public String getNodeId() {
        return this.nodeId;
    }

    @Override
    public TaskInfo getTaskInfo() {
        return this.taskInfoFetcher.getTaskInfo();
    }

    @Override
    public TaskStatus getTaskStatus() {
        return this.taskStatusFetcher.getTaskStatus();
    }

    @Override
    public void start() {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
            this.started.set(true);
            this.triggerUpdate();
            this.dynamicFiltersFetcher.start();
            this.taskStatusFetcher.start();
            this.taskInfoFetcher.start();
        }
    }

    @Override
    public synchronized void addSplits(Multimap<PlanNodeId, Split> splitsBySource) {
        Objects.requireNonNull(splitsBySource, "splitsBySource is null");
        if (this.getTaskStatus().getState().isTerminatingOrDone() || this.terminating.get()) {
            return;
        }
        boolean needsUpdate = false;
        for (Map.Entry entry : splitsBySource.asMap().entrySet()) {
            PlanNodeId sourceId = (PlanNodeId)entry.getKey();
            Collection splits = (Collection)entry.getValue();
            boolean isPartitionedSource = this.planFragment.isPartitionedSources(sourceId);
            Preconditions.checkState((!this.noMoreSplits.containsKey(sourceId) ? 1 : 0) != 0, (String)"noMoreSplits has already been set for %s", (Object)sourceId);
            int added = 0;
            long addedWeight = 0L;
            for (Split split : splits) {
                if (!this.pendingSplits.put((Object)sourceId, (Object)new ScheduledSplit(this.nextSplitId.getAndIncrement(), sourceId, split)) || !isPartitionedSource) continue;
                ++added;
                addedWeight = Math.addExact(addedWeight, split.getSplitWeight().getRawValue());
            }
            if (isPartitionedSource) {
                this.pendingSourceSplitCount += added;
                this.pendingSourceSplitsWeight = Math.addExact(this.pendingSourceSplitsWeight, addedWeight);
                this.partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
            }
            needsUpdate = true;
        }
        this.updateSplitQueueSpace();
        if (needsUpdate) {
            this.triggerUpdate();
        }
    }

    @Override
    public synchronized void noMoreSplits(PlanNodeId sourceId) {
        if (this.noMoreSplits.containsKey(sourceId) || this.terminating.get()) {
            return;
        }
        this.noMoreSplits.put(sourceId, true);
        this.triggerUpdate();
    }

    @Override
    public void setOutputBuffers(OutputBuffers newOutputBuffers) {
        if (this.getTaskStatus().getState().isTerminatingOrDone() || this.terminating.get()) {
            return;
        }
        long previousVersion = this.outputBuffers.getAndUpdate(previousOutputBuffers -> {
            if (newOutputBuffers.getVersion() > previousOutputBuffers.getVersion()) {
                return newOutputBuffers;
            }
            return previousOutputBuffers;
        }).getVersion();
        if (newOutputBuffers.getVersion() > previousVersion) {
            this.triggerUpdate();
        }
    }

    @Override
    public void setSpeculative(boolean speculative) {
        Preconditions.checkArgument((!speculative ? 1 : 0) != 0, (Object)"we can only move task from speculative to non-speculative");
        if (this.speculative.compareAndSet(true, speculative)) {
            this.triggerUpdate();
        }
    }

    @Override
    public PartitionedSplitsInfo getPartitionedSplitsInfo() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return PartitionedSplitsInfo.forZeroSplits();
        }
        if (taskStatus.getState().isTerminating()) {
            return PartitionedSplitsInfo.forSplitCountAndWeightSum(taskStatus.getRunningPartitionedDrivers(), taskStatus.getRunningPartitionedSplitsWeight());
        }
        PartitionedSplitsInfo unacknowledgedSplitsInfo = this.getUnacknowledgedPartitionedSplitsInfo();
        int count = unacknowledgedSplitsInfo.getCount() + taskStatus.getQueuedPartitionedDrivers() + taskStatus.getRunningPartitionedDrivers();
        long weight = unacknowledgedSplitsInfo.getWeightSum() + taskStatus.getQueuedPartitionedSplitsWeight() + taskStatus.getRunningPartitionedSplitsWeight();
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
    }

    public PartitionedSplitsInfo getUnacknowledgedPartitionedSplitsInfo() {
        int count = this.pendingSourceSplitCount;
        long weight = this.pendingSourceSplitsWeight;
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
    }

    @Override
    public PartitionedSplitsInfo getQueuedPartitionedSplitsInfo() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isTerminatingOrDone()) {
            return PartitionedSplitsInfo.forZeroSplits();
        }
        PartitionedSplitsInfo unacknowledgedSplitsInfo = this.getUnacknowledgedPartitionedSplitsInfo();
        int count = unacknowledgedSplitsInfo.getCount() + taskStatus.getQueuedPartitionedDrivers();
        long weight = unacknowledgedSplitsInfo.getWeightSum() + taskStatus.getQueuedPartitionedSplitsWeight();
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(count, weight);
    }

    @Override
    public int getUnacknowledgedPartitionedSplitCount() {
        return this.getPendingSourceSplitCount();
    }

    @Override
    public Optional<SpoolingOutputStats.Snapshot> retrieveAndDropSpoolingOutputStats() {
        return this.taskInfoFetcher.retrieveAndDropSpoolingOutputStats();
    }

    private int getPendingSourceSplitCount() {
        return this.pendingSourceSplitCount;
    }

    private long getQueuedPartitionedSplitsWeight() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isTerminatingOrDone()) {
            return 0L;
        }
        return this.getPendingSourceSplitsWeight() + taskStatus.getQueuedPartitionedSplitsWeight();
    }

    private long getPendingSourceSplitsWeight() {
        return this.pendingSourceSplitsWeight;
    }

    @Override
    public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus> stateChangeListener) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
            this.taskStatusFetcher.addStateChangeListener(stateChangeListener);
        }
    }

    @Override
    public void addFinalTaskInfoListener(StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
        this.taskInfoFetcher.addFinalTaskInfoListener(stateChangeListener);
    }

    @Override
    public synchronized ListenableFuture<Void> whenSplitQueueHasSpace(long weightThreshold) {
        if (this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            Preconditions.checkArgument((weightThreshold == this.whenSplitQueueHasSpaceThreshold.getAsLong() ? 1 : 0) != 0, (Object)"Multiple split queue space notification thresholds not supported");
        } else {
            this.whenSplitQueueHasSpaceThreshold = OptionalLong.of(weightThreshold);
            this.updateSplitQueueSpace();
        }
        if (this.splitQueueHasSpace) {
            return Futures.immediateVoidFuture();
        }
        return this.whenSplitQueueHasSpace.createNewListener();
    }

    @VisibleForTesting
    DynamicFiltersFetcher getDynamicFiltersFetcher() {
        return this.dynamicFiltersFetcher;
    }

    private synchronized void updateSplitQueueSpace() {
        boolean bl = this.splitQueueHasSpace = this.getUnacknowledgedPartitionedSplitCount() < this.maxUnacknowledgedSplits && (this.whenSplitQueueHasSpaceThreshold.isEmpty() || this.getQueuedPartitionedSplitsWeight() < this.whenSplitQueueHasSpaceThreshold.getAsLong());
        if (this.splitQueueHasSpace && this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
    }

    private synchronized void processTaskUpdate(TaskInfo newValue, List<SplitAssignment> splitAssignments) {
        this.updateTaskInfo(newValue);
        for (SplitAssignment assignment : splitAssignments) {
            PlanNodeId planNodeId = assignment.getPlanNodeId();
            boolean isPartitionedSource = this.planFragment.isPartitionedSources(planNodeId);
            int removed = 0;
            long removedWeight = 0L;
            for (ScheduledSplit split : assignment.getSplits()) {
                if (!this.pendingSplits.remove((Object)planNodeId, (Object)split) || !isPartitionedSource) continue;
                ++removed;
                removedWeight = Math.addExact(removedWeight, split.getSplit().getSplitWeight().getRawValue());
            }
            if (assignment.isNoMoreSplits()) {
                this.noMoreSplits.put(planNodeId, false);
            }
            if (!isPartitionedSource) continue;
            this.pendingSourceSplitCount -= removed;
            this.pendingSourceSplitsWeight -= removedWeight;
        }
        if (this.pendingSourceSplitCount > 0) {
            this.pendingRequestsCounter.incrementAndGet();
        }
        this.partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
        this.updateSplitQueueSpace();
    }

    private void updateTaskInfo(TaskInfo taskInfo) {
        this.taskStatusFetcher.updateTaskStatus(taskInfo.taskStatus());
        this.taskInfoFetcher.updateTaskInfo(taskInfo);
    }

    private void scheduleUpdate() {
        this.executor.execute(this::sendUpdate);
    }

    private void triggerUpdate() {
        if (!this.started.get()) {
            return;
        }
        if (this.pendingRequestsCounter.getAndIncrement() == 0) {
            this.scheduleUpdate();
        }
    }

    @VisibleForTesting
    boolean adjustSplitBatchSize(List<SplitAssignment> splitAssignments, long requestSize, int currentSplitBatchSize) {
        if (requestSize > this.maxRequestSizeInBytes && currentSplitBatchSize > this.guaranteedSplitsPerRequest || requestSize < this.maxRequestSizeInBytes && currentSplitBatchSize < this.maxUnacknowledgedSplits) {
            int newSplitBatchSize = currentSplitBatchSize;
            int numSplits = 0;
            for (SplitAssignment splitAssignment : splitAssignments) {
                if (!this.planFragment.isPartitionedSources(splitAssignment.getPlanNodeId())) continue;
                numSplits = splitAssignment.getSplits().size();
                break;
            }
            if (numSplits != 0) {
                newSplitBatchSize = Math.clamp((long)numSplits * (this.maxRequestSizeInBytes - this.requestSizeHeadroomInBytes) / requestSize, this.guaranteedSplitsPerRequest, this.maxUnacknowledgedSplits);
            }
            if (newSplitBatchSize != currentSplitBatchSize) {
                log.debug("%s - Split batch size changed: prevSize=%s, newSize=%s", new Object[]{this.taskId, currentSplitBatchSize, newSplitBatchSize});
                this.splitBatchSize.set(newSplitBatchSize);
            }
            if (numSplits > newSplitBatchSize && requestSize > this.maxRequestSizeInBytes) {
                log.debug("%s - current taskUpdateRequestJson exceeded limit: %d, currentSplitBatchSize: %d, newSplitBatchSize: %d", new Object[]{this.taskId, requestSize, currentSplitBatchSize, newSplitBatchSize});
                return true;
            }
        }
        return false;
    }

    private void sendUpdate() {
        try {
            this.sendUpdateInternal();
        }
        catch (Throwable e) {
            this.fatalUnacknowledgedFailure(new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "unexpected error calling sendUpdate()", e));
        }
    }

    private void sendUpdateInternal() {
        TaskStatus taskStatus = this.getTaskStatus();
        if (taskStatus.getState().isTerminatingOrDone() || this.terminating.get()) {
            return;
        }
        Preconditions.checkState((boolean)this.started.get());
        int currentPendingRequestsCounter = this.pendingRequestsCounter.get();
        Preconditions.checkState((currentPendingRequestsCounter > 0 ? 1 : 0) != 0, (Object)"sendUpdate shouldn't be called without pending requests");
        ListenableFuture<Void> errorRateLimit = this.updateErrorTracker.acquireRequestPermit();
        if (!errorRateLimit.isDone()) {
            errorRateLimit.addListener(this::sendUpdate, this.executor);
            return;
        }
        int currentSplitBatchSize = this.splitBatchSize.get();
        List<SplitAssignment> splitAssignments = this.getSplitAssignments(currentSplitBatchSize);
        DynamicFiltersCollector.VersionedDynamicFilterDomains dynamicFilterDomains = this.outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(this.sentDynamicFiltersVersion.get());
        Optional<PlanFragment> fragment = this.sendPlan.get() ? Optional.of(this.planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
        TaskUpdateRequest updateRequest = new TaskUpdateRequest(this.session.toSessionRepresentation(), this.session.getIdentity().getExtraCredentials(), this.stageSpan, fragment, splitAssignments, this.outputBuffers.get(), dynamicFilterDomains.getDynamicFilterDomains(), this.session.getExchangeEncryptionKey(), this.speculative.get());
        byte[] taskUpdateRequestJson = this.taskUpdateRequestCodec.toJsonBytes((Object)updateRequest);
        if (this.adaptiveUpdateRequestSizeEnabled && this.adjustSplitBatchSize(splitAssignments, taskUpdateRequestJson.length, currentSplitBatchSize)) {
            this.scheduleUpdate();
            return;
        }
        if (fragment.isPresent()) {
            this.stats.updateWithPlanBytes(taskUpdateRequestJson.length);
        }
        if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
            this.stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
        }
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(taskStatus);
        Request request = Request.Builder.preparePost().setUri(uriBuilder.build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator((BodyGenerator)StaticBodyGenerator.createStaticBodyGenerator((byte[])taskUpdateRequestJson)).setSpanBuilder(this.createSpanBuilder("task-update", this.span)).build();
        this.updateErrorTracker.startRequest();
        HttpClient.HttpResponseFuture future = this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec));
        Preconditions.checkState((this.currentRequest.getAndSet((Future<?>)future) == null ? 1 : 0) != 0, (Object)"There should be no previous request running");
        Futures.addCallback((ListenableFuture)future, new SimpleHttpResponseHandler<TaskInfo>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion(), System.nanoTime(), currentPendingRequestsCounter), request.getUri(), this.stats), (Executor)this.executor);
    }

    private synchronized List<SplitAssignment> getSplitAssignments(int currentSplitBatchSize) {
        return (List)Stream.concat(this.planFragment.getPartitionedSourceNodes().stream(), this.planFragment.getRemoteSourceNodes().stream()).filter(Objects::nonNull).map(PlanNode::getId).map(planNodeId -> this.getSplitAssignment((PlanNodeId)planNodeId, currentSplitBatchSize)).filter(Objects::nonNull).collect(ImmutableList.toImmutableList());
    }

    private synchronized SplitAssignment getSplitAssignment(PlanNodeId planNodeId, int currentSplitBatchSize) {
        Set splits = this.pendingSplits.get((Object)planNodeId);
        boolean pendingNoMoreSplits = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId));
        boolean noMoreSplits = this.noMoreSplits.containsKey(planNodeId);
        if (this.planFragment.isPartitionedSources(planNodeId) && currentSplitBatchSize < splits.size()) {
            log.debug("%s - Splits are limited by splitBatchSize: splitBatchSize=%s, splits=%s, planNodeId=%s", new Object[]{this.taskId, currentSplitBatchSize, splits.size(), planNodeId});
            splits = splits.stream().sorted(Comparator.comparingLong(ScheduledSplit::getSequenceId)).limit(currentSplitBatchSize).collect(Collectors.toCollection(LinkedHashSet::new));
            noMoreSplits = false;
        }
        SplitAssignment assignment = null;
        if (!splits.isEmpty() || pendingNoMoreSplits) {
            assignment = new SplitAssignment(planNodeId, splits, noMoreSplits);
        }
        return assignment;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abort() {
        if (!this.terminating.compareAndSet(false, true)) {
            return;
        }
        HttpRemoteTask httpRemoteTask = this;
        synchronized (httpRemoteTask) {
            try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
                if (!this.getTaskStatus().getState().isTerminatingOrDone()) {
                    this.scheduleAsyncCleanupRequest("abort", true);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() {
        if (!this.terminating.compareAndSet(false, true)) {
            return;
        }
        HttpRemoteTask httpRemoteTask = this;
        synchronized (httpRemoteTask) {
            try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
                TaskStatus taskStatus = this.getTaskStatus();
                if (!taskStatus.getState().isTerminatingOrDone()) {
                    this.scheduleAsyncCleanupRequest("cancel", false);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanUpTask(TaskState taskState) {
        Preconditions.checkState((boolean)taskState.isTerminatingOrDone(), (String)"attempt to clean up a task that is not terminating or done: %s", (Object)((Object)taskState));
        HttpRemoteTask httpRemoteTask = this;
        synchronized (httpRemoteTask) {
            this.pendingSplits.clear();
            this.pendingSourceSplitCount = 0;
            this.pendingSourceSplitsWeight = 0L;
            this.partitionedSplitCountTracker.setPartitionedSplits(this.getPartitionedSplitsInfo());
            this.splitQueueHasSpace = true;
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
        this.outboundDynamicFiltersCollector.acknowledge(Long.MAX_VALUE);
        if (taskState.isDone()) {
            this.taskStatusFetcher.stop();
            Future request = this.currentRequest.getAndSet(null);
            if (request != null) {
                request.cancel(true);
            }
            this.scheduleAsyncCleanupRequest("cleanup", true);
        } else {
            long terminationStartedNanos = this.terminationStartedNanos.get();
            if (terminationStartedNanos == 0L) {
                long currentTimeNanos = System.nanoTime();
                if (currentTimeNanos == 0L) {
                    currentTimeNanos = 1L;
                }
                this.terminationStartedNanos.compareAndSet(0L, currentTimeNanos);
            } else {
                Duration terminatingTime = Duration.nanosSince((long)terminationStartedNanos);
                if (terminatingTime.compareTo(this.taskTerminationTimeout) >= 0) {
                    this.fatalUnacknowledgedFailure(new TrinoException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR, String.format("Task %s failed to terminate after %s, last known state: %s", new Object[]{this.taskId, this.taskTerminationTimeout, taskState})));
                }
            }
        }
    }

    private void scheduleAsyncCleanupRequest(String action, boolean abort) {
        this.scheduleAsyncCleanupRequest(action, (Supplier<Request>)((Supplier)() -> this.buildDeleteTaskRequest(abort)));
    }

    private void scheduleAsyncCleanupRequest(String action, FailTaskRequest failTaskRequest) {
        this.scheduleAsyncCleanupRequest(action, (Supplier<Request>)((Supplier)() -> this.buildFailTaskRequest(failTaskRequest)));
    }

    private void scheduleAsyncCleanupRequest(String action, Supplier<Request> remoteRequestSupplier) {
        TaskState taskStatusState = this.getTaskStatus().getState();
        if (taskStatusState.isDone() && !this.cleanedUp.compareAndSet(false, true)) {
            return;
        }
        Request request = (Request)remoteRequestSupplier.get();
        this.doScheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), request, action);
    }

    private Request buildDeleteTaskRequest(boolean abort) {
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(this.getTaskStatus()).addParameter("abort", new String[]{"" + abort});
        return Request.Builder.prepareDelete().setUri(uriBuilder.build()).setSpanBuilder(this.createSpanBuilder("task-delete", this.span)).build();
    }

    private Request buildFailTaskRequest(FailTaskRequest failTaskRequest) {
        HttpUriBuilder uriBuilder = this.getHttpUriBuilder(this.getTaskStatus());
        uriBuilder = uriBuilder.appendPath("fail");
        return Request.Builder.preparePost().setUri(uriBuilder.build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator((BodyGenerator)StaticBodyGenerator.createStaticBodyGenerator((byte[])this.failTaskRequestCodec.toJsonBytes((Object)failTaskRequest))).setSpanBuilder(this.createSpanBuilder("task-fail", this.span)).build();
    }

    private void doScheduleAsyncCleanupRequest(final Backoff cleanupBackoff, final Request request, final String action) {
        Futures.addCallback((ListenableFuture)this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec)), (FutureCallback)new FutureCallback<FullJsonResponseHandler.JsonResponse<TaskInfo>>(this){
            final /* synthetic */ HttpRemoteTask this$0;
            {
                this.this$0 = this$0;
            }

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            public void onSuccess(FullJsonResponseHandler.JsonResponse<TaskInfo> result) {
                TaskState taskState;
                if (result.getStatusCode() != HttpStatus.OK.code()) {
                    this.onFailure(1.exceptionForErrorCode(result));
                    return;
                }
                try {
                    Preconditions.checkArgument((boolean)result.hasValue(), (String)"TaskInfo result did not contain JSON payload; payload=%s", (Object)result.getResponseBody());
                    this.this$0.updateTaskInfo((TaskInfo)result.getValue());
                    taskState = this.this$0.getTaskInfo().taskStatus().getState();
                    if (taskState.isTerminatingOrDone()) return;
                }
                catch (Throwable throwable) {
                    TaskState taskState2 = this.this$0.getTaskInfo().taskStatus().getState();
                    if (taskState2.isTerminatingOrDone()) throw throwable;
                    this.fatalAsyncCleanupFailure(new TrinoTransportException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri((URI)request.getUri()), String.format("Unable to %s task at %s, last known state was: %s", new Object[]{action, request.getUri(), taskState2})));
                    throw throwable;
                }
                this.fatalAsyncCleanupFailure(new TrinoTransportException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri((URI)request.getUri()), String.format("Unable to %s task at %s, last known state was: %s", new Object[]{action, request.getUri(), taskState})));
            }

            private static RuntimeException exceptionForErrorCode(FullJsonResponseHandler.JsonResponse<TaskInfo> result) {
                return switch (result.getStatusCode()) {
                    case 503 -> new ServiceUnavailableException("Service Unavailable");
                    default -> new RuntimeException("Unexpected http status code " + result.getStatusCode());
                };
            }

            public void onFailure(Throwable t) {
                if (this.this$0.getTaskInfo().taskStatus().getState().isDone()) {
                    return;
                }
                if (t instanceof RejectedExecutionException && this.this$0.httpClient.isClosed()) {
                    String message = String.format("Unable to %s task at %s. HTTP client is closed.", action, request.getUri());
                    RequestErrorTracker.logError(t, message);
                    this.fatalAsyncCleanupFailure(new TrinoTransportException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri((URI)request.getUri()), message));
                    return;
                }
                if (cleanupBackoff.failure()) {
                    String message = String.format("Unable to %s task at %s. Back off depleted.", action, request.getUri());
                    RequestErrorTracker.logError(t, message);
                    this.fatalAsyncCleanupFailure(new TrinoTransportException((ErrorCodeSupplier)StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri((URI)request.getUri()), message));
                    return;
                }
                long delayNanos = cleanupBackoff.getBackoffDelayNanos();
                if (delayNanos == 0L) {
                    this.this$0.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action);
                } else {
                    this.this$0.errorScheduledExecutor.schedule(() -> this.this$0.doScheduleAsyncCleanupRequest(cleanupBackoff, request, action), delayNanos, TimeUnit.NANOSECONDS);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void fatalAsyncCleanupFailure(TrinoTransportException cause) {
                HttpRemoteTask httpRemoteTask = this.this$0;
                synchronized (httpRemoteTask) {
                    try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.this$0.taskId));){
                        TaskStatus taskStatus = this.this$0.getTaskStatus();
                        if (taskStatus.getState().isDone()) {
                            log.warn("Task %s already in terminal state %s; cannot overwrite with FAILED due to %s", new Object[]{taskStatus.getTaskId(), taskStatus.getState(), cause});
                        } else {
                            ImmutableList failures = ImmutableList.builderWithExpectedSize((int)(taskStatus.getFailures().size() + 1)).add((Object)Failures.toFailure((Throwable)cause)).addAll(taskStatus.getFailures()).build();
                            taskStatus = TaskStatus.failWith(taskStatus, TaskState.FAILED, (List<ExecutionFailureInfo>)failures);
                        }
                        this.this$0.updateTaskInfo(this.this$0.getTaskInfo().withTaskStatus(taskStatus));
                    }
                }
            }
        }, (Executor)this.executor);
    }

    private synchronized void fatalUnacknowledgedFailure(Throwable cause) {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
            TaskStatus taskStatus = this.getTaskStatus();
            if (!taskStatus.getState().isDone()) {
                ImmutableList failures = ImmutableList.builderWithExpectedSize((int)(taskStatus.getFailures().size() + 1)).add((Object)Failures.toFailure(cause)).addAll(taskStatus.getFailures()).build();
                taskStatus = TaskStatus.failWith(taskStatus, TaskState.FAILED, (List<ExecutionFailureInfo>)failures);
                if (cause instanceof TrinoTransportException) {
                    this.updateTaskInfo(this.getTaskInfo().withTaskStatus(taskStatus));
                } else {
                    this.taskStatusFetcher.updateTaskStatus(taskStatus);
                }
            } else {
                this.updateTaskInfo(this.getTaskInfo().withTaskStatus(taskStatus));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void failRemotely(Throwable cause) {
        if (!this.terminating.compareAndSet(false, true)) {
            return;
        }
        HttpRemoteTask httpRemoteTask = this;
        synchronized (httpRemoteTask) {
            try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
                TaskStatus taskStatus = this.getTaskStatus();
                if (!taskStatus.getState().isTerminatingOrDone()) {
                    log.debug(cause, "Remote task %s failed with %s", new Object[]{taskStatus.getSelf(), cause});
                    this.scheduleAsyncCleanupRequest("fail", new FailTaskRequest(Failures.toFailure(cause)));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void failLocallyImmediately(Throwable cause) {
        Objects.requireNonNull(cause, "cause is null");
        this.terminating.set(true);
        HttpRemoteTask httpRemoteTask = this;
        synchronized (httpRemoteTask) {
            try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-" + String.valueOf(this.taskId));){
                TaskStatus taskStatus = this.getTaskStatus();
                if (!taskStatus.getState().isDone()) {
                    ImmutableList failures = ImmutableList.builderWithExpectedSize((int)(taskStatus.getFailures().size() + 1)).add((Object)Failures.toFailure(cause)).addAll(taskStatus.getFailures()).build();
                    this.taskStatusFetcher.updateTaskStatus(TaskStatus.failWith(taskStatus, TaskState.FAILED, (List<ExecutionFailureInfo>)failures));
                }
            }
        }
    }

    private HttpUriBuilder getHttpUriBuilder(TaskStatus taskStatus) {
        HttpUriBuilder uriBuilder = HttpUriBuilder.uriBuilderFrom((URI)taskStatus.getSelf());
        if (this.summarizeTaskInfo) {
            uriBuilder.addParameter("summarize", new String[0]);
        }
        return uriBuilder;
    }

    private SpanBuilder createSpanBuilder(String name, Span parent) {
        return this.tracer.spanBuilder(name).setParent(Context.current().with((ImplicitContextKeyed)parent)).setAttribute(TrinoAttributes.QUERY_ID, (Object)this.taskId.getQueryId().toString()).setAttribute(TrinoAttributes.STAGE_ID, (Object)this.taskId.getStageId().toString()).setAttribute(TrinoAttributes.TASK_ID, (Object)this.taskId.toString());
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).addValue((Object)this.getTaskInfo()).toString();
    }

    private class UpdateResponseHandler
    implements SimpleHttpResponseCallback<TaskInfo> {
        private final List<SplitAssignment> splitAssignments;
        private final long currentRequestDynamicFiltersVersion;
        private final long currentRequestStartNanos;
        private final int currentPendingRequestsCounter;

        private UpdateResponseHandler(List<SplitAssignment> splitAssignments, long currentRequestDynamicFiltersVersion, long currentRequestStartNanos, int currentPendingRequestsCounter) {
            this.splitAssignments = ImmutableList.copyOf((Collection)Objects.requireNonNull(splitAssignments, "splitAssignments is null"));
            this.currentRequestDynamicFiltersVersion = currentRequestDynamicFiltersVersion;
            this.currentRequestStartNanos = currentRequestStartNanos;
            this.currentPendingRequestsCounter = currentPendingRequestsCounter;
        }

        @Override
        public void success(TaskInfo value) {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-" + String.valueOf(HttpRemoteTask.this.taskId));){
                HttpRemoteTask.this.sentDynamicFiltersVersion.set(this.currentRequestDynamicFiltersVersion);
                HttpRemoteTask.this.outboundDynamicFiltersCollector.acknowledge(this.currentRequestDynamicFiltersVersion);
                HttpRemoteTask.this.sendPlan.set(value.needsPlan());
                HttpRemoteTask.this.currentRequest.set(null);
                this.updateStats();
                HttpRemoteTask.this.updateErrorTracker.requestSucceeded();
                HttpRemoteTask.this.processTaskUpdate(value, this.splitAssignments);
                if (HttpRemoteTask.this.pendingRequestsCounter.addAndGet(-this.currentPendingRequestsCounter) > 0) {
                    HttpRemoteTask.this.scheduleUpdate();
                }
            }
        }

        @Override
        public void failed(Throwable cause) {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-" + String.valueOf(HttpRemoteTask.this.taskId));){
                try {
                    HttpRemoteTask.this.currentRequest.set(null);
                    this.updateStats();
                    TaskStatus taskStatus = HttpRemoteTask.this.getTaskStatus();
                    if (!taskStatus.getState().isDone()) {
                        HttpRemoteTask.this.updateErrorTracker.requestFailed(cause);
                    }
                    HttpRemoteTask.this.scheduleUpdate();
                }
                catch (Error e) {
                    HttpRemoteTask.this.fatalUnacknowledgedFailure(e);
                    throw e;
                }
                catch (RuntimeException e) {
                    HttpRemoteTask.this.fatalUnacknowledgedFailure(e);
                }
            }
        }

        @Override
        public void fatal(Throwable cause) {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-" + String.valueOf(HttpRemoteTask.this.taskId));){
                HttpRemoteTask.this.fatalUnacknowledgedFailure(cause);
            }
        }

        private void updateStats() {
            Duration requestRoundTrip = Duration.nanosSince((long)this.currentRequestStartNanos);
            HttpRemoteTask.this.stats.updateRoundTripMillis(requestRoundTrip.toMillis());
        }
    }
}

