/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.remotetask;

import com.google.common.base.Preconditions;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.FullJsonResponseHandler;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.Request;
import io.airlift.http.client.ResponseHandler;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.SpanBuilder;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.operator.RetryPolicy;
import io.trino.server.remotetask.ContinuousTaskStatusFetcher;
import io.trino.server.remotetask.RemoteTaskStats;
import io.trino.server.remotetask.RequestErrorTracker;
import io.trino.server.remotetask.SimpleHttpResponseCallback;
import io.trino.server.remotetask.SimpleHttpResponseHandler;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class TaskInfoFetcher {
    private static final Logger log = Logger.get(TaskInfoFetcher.class);
    private final TaskId taskId;
    private final Consumer<Throwable> onFail;
    private final ContinuousTaskStatusFetcher taskStatusFetcher;
    private final StateMachine<TaskInfo> taskInfo;
    private final StateMachine<Optional<TaskInfo>> finalTaskInfo;
    private final JsonCodec<TaskInfo> taskInfoCodec;
    private final long updateIntervalMillis;
    private final AtomicLong lastUpdateNanos = new AtomicLong();
    private final ScheduledExecutorService updateScheduledExecutor;
    private final Executor executor;
    private final HttpClient httpClient;
    private final Supplier<SpanBuilder> spanBuilderFactory;
    private final RequestErrorTracker errorTracker;
    private final boolean summarizeTaskInfo;
    private final RemoteTaskStats stats;
    private final Optional<DataSize> estimatedMemory;
    private final AtomicReference<SpoolingOutputStats.Snapshot> spoolingOutputStats = new AtomicReference();
    private final RetryPolicy retryPolicy;
    @GuardedBy(value="this")
    private boolean running;
    @GuardedBy(value="this")
    private ScheduledFuture<?> scheduledFuture;
    @GuardedBy(value="this")
    private ListenableFuture<FullJsonResponseHandler.JsonResponse<TaskInfo>> future;

    public TaskInfoFetcher(Consumer<Throwable> onFail, ContinuousTaskStatusFetcher taskStatusFetcher, TaskInfo initialTask, HttpClient httpClient, Supplier<SpanBuilder> spanBuilderFactory, Duration updateInterval, JsonCodec<TaskInfo> taskInfoCodec, Duration maxErrorDuration, boolean summarizeTaskInfo, Executor executor, ScheduledExecutorService updateScheduledExecutor, ScheduledExecutorService errorScheduledExecutor, RemoteTaskStats stats, Optional<DataSize> estimatedMemory, RetryPolicy retryPolicy) {
        Objects.requireNonNull(initialTask, "initialTask is null");
        Objects.requireNonNull(errorScheduledExecutor, "errorScheduledExecutor is null");
        this.taskId = initialTask.taskStatus().getTaskId();
        this.onFail = Objects.requireNonNull(onFail, "onFail is null");
        this.taskStatusFetcher = Objects.requireNonNull(taskStatusFetcher, "taskStatusFetcher is null");
        this.taskInfo = new StateMachine<TaskInfo>("task " + String.valueOf(this.taskId), executor, initialTask);
        this.finalTaskInfo = new StateMachine("task-" + String.valueOf(this.taskId), executor, Optional.empty());
        this.taskInfoCodec = Objects.requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        this.updateIntervalMillis = updateInterval.toMillis();
        this.updateScheduledExecutor = Objects.requireNonNull(updateScheduledExecutor, "updateScheduledExecutor is null");
        this.errorTracker = new RequestErrorTracker(this.taskId, initialTask.taskStatus().getSelf(), maxErrorDuration, errorScheduledExecutor, "getting info for task");
        this.summarizeTaskInfo = summarizeTaskInfo;
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.spanBuilderFactory = Objects.requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
        this.stats = Objects.requireNonNull(stats, "stats is null");
        this.estimatedMemory = Objects.requireNonNull(estimatedMemory, "estimatedMemory is null");
        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy is null");
    }

    public TaskInfo getTaskInfo() {
        return this.taskInfo.get();
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.scheduleUpdate();
    }

    private synchronized void stop() {
        this.running = false;
        if (this.future != null) {
            this.future.cancel(true);
            this.future = null;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
        }
    }

    public void addFinalTaskInfoListener(StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
        AtomicBoolean done = new AtomicBoolean();
        StateMachine.StateChangeListener<Optional> fireOnceStateChangeListener = finalTaskInfo -> {
            if (finalTaskInfo.isPresent() && done.compareAndSet(false, true)) {
                stateChangeListener.stateChanged((TaskInfo)finalTaskInfo.get());
            }
        };
        this.finalTaskInfo.addStateChangeListener(fireOnceStateChangeListener);
        fireOnceStateChangeListener.stateChanged(this.finalTaskInfo.get());
    }

    public SpoolingOutputStats.Snapshot retrieveAndDropSpoolingOutputStats() {
        Optional<TaskInfo> finalTaskInfo = this.finalTaskInfo.get();
        Preconditions.checkState((boolean)finalTaskInfo.isPresent(), (Object)"finalTaskInfo must be present");
        TaskState taskState = finalTaskInfo.get().taskStatus().getState();
        Preconditions.checkState((taskState == TaskState.FINISHED ? 1 : 0) != 0, (String)"task must be FINISHED, got: %s", (Object)((Object)taskState));
        SpoolingOutputStats.Snapshot result = this.spoolingOutputStats.getAndSet(null);
        Preconditions.checkState((result != null ? 1 : 0) != 0, (Object)"spooling output stats is not available");
        return result;
    }

    private synchronized void scheduleUpdate() {
        this.scheduledFuture = this.updateScheduledExecutor.scheduleWithFixedDelay(() -> {
            try {
                TaskInfoFetcher taskInfoFetcher = this;
                synchronized (taskInfoFetcher) {
                    if (this.future != null && !this.future.isDone()) {
                        return;
                    }
                }
                if (Duration.nanosSince((long)this.lastUpdateNanos.get()).toMillis() >= this.updateIntervalMillis) {
                    this.sendNextRequest();
                }
            }
            catch (Throwable e) {
                log.error(e, "Unexpected error while getting task info");
            }
        }, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    private synchronized void sendNextRequest() {
        TaskStatus taskStatus = this.getTaskInfo().taskStatus();
        if (!this.running) {
            return;
        }
        if (TaskInfoFetcher.isDone(this.getTaskInfo())) {
            this.stop();
            return;
        }
        if (this.future != null && !this.future.isDone()) {
            return;
        }
        ListenableFuture<Void> errorRateLimit = this.errorTracker.acquireRequestPermit();
        if (!errorRateLimit.isDone()) {
            errorRateLimit.addListener(this::sendNextRequest, this.executor);
            return;
        }
        HttpUriBuilder httpUriBuilder = HttpUriBuilder.uriBuilderFrom((URI)taskStatus.getSelf());
        URI uri = this.summarizeTaskInfo ? httpUriBuilder.addParameter("summarize", new String[0]).build() : httpUriBuilder.build();
        Request request = Request.Builder.prepareGet().setUri(uri).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setSpanBuilder(this.spanBuilderFactory.get()).build();
        this.errorTracker.startRequest();
        this.future = this.httpClient.executeAsync(request, (ResponseHandler)FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec));
        Futures.addCallback(this.future, new SimpleHttpResponseHandler<TaskInfo>(new TaskInfoResponseCallback(), request.getUri(), this.stats), (Executor)this.executor);
    }

    synchronized void updateTaskInfo(TaskInfo newTaskInfo) {
        TaskInfo newValue;
        boolean updated;
        TaskStatus localTaskStatus = this.taskStatusFetcher.getTaskStatus();
        TaskStatus newRemoteTaskStatus = newTaskInfo.taskStatus();
        if (!newRemoteTaskStatus.getTaskId().equals(this.taskId)) {
            log.debug("Task ID mismatch on remote task status. Member task ID is %s, but remote task ID is %s. This will confuse finalTaskInfo listeners.", new Object[]{this.taskId, newRemoteTaskStatus.getTaskId()});
        }
        if (localTaskStatus.getState().isDone() && newRemoteTaskStatus.getState().isDone() && localTaskStatus.getState() != newRemoteTaskStatus.getState()) {
            newTaskInfo = newTaskInfo.withTaskStatus(localTaskStatus);
            if (!localTaskStatus.getTaskId().equals(this.taskId)) {
                log.debug("Task ID mismatch on local task status. Member task ID is %s, but status-fetcher ID is %s. This will confuse finalTaskInfo listeners.", new Object[]{this.taskId, newRemoteTaskStatus.getTaskId()});
            }
        }
        if (this.estimatedMemory.isPresent()) {
            newTaskInfo = newTaskInfo.withEstimatedMemory(this.estimatedMemory.get());
        }
        if (newTaskInfo.taskStatus().getState().isDone()) {
            boolean wasSet = this.spoolingOutputStats.compareAndSet(null, newTaskInfo.outputBuffers().getSpoolingOutputStats().orElse(null));
            if (this.retryPolicy == RetryPolicy.TASK && wasSet && this.spoolingOutputStats.get() == null) {
                log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail.", new Object[]{this.taskId});
            }
            newTaskInfo = newTaskInfo.pruneSpoolingOutputStats();
        }
        if ((updated = this.taskInfo.setIf(newValue = newTaskInfo, oldValue -> {
            TaskStatus oldTaskStatus = oldValue.taskStatus();
            TaskStatus newTaskStatus = newValue.taskStatus();
            if (oldTaskStatus.getState().isDone()) {
                return false;
            }
            return newTaskStatus.getVersion() >= oldTaskStatus.getVersion();
        })) && newValue.taskStatus().getState().isDone()) {
            this.taskStatusFetcher.updateTaskStatus(newTaskInfo.taskStatus());
            this.finalTaskInfo.compareAndSet(Optional.empty(), Optional.of(newValue));
            this.stop();
        }
    }

    private synchronized void cleanupRequest() {
        if (this.future != null && this.future.isDone()) {
            this.future = null;
        }
    }

    private void updateStats(long currentRequestStartNanos) {
        this.stats.infoRoundTripMillis(Duration.nanosSince((long)currentRequestStartNanos).toMillis());
    }

    private static boolean isDone(TaskInfo taskInfo) {
        return taskInfo.taskStatus().getState().isDone();
    }

    private class TaskInfoResponseCallback
    implements SimpleHttpResponseCallback<TaskInfo> {
        private final long requestStartNanos = System.nanoTime();

        private TaskInfoResponseCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void success(TaskInfo newValue) {
            try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", new Object[]{TaskInfoFetcher.this.taskId});){
                TaskInfoFetcher.this.lastUpdateNanos.set(System.nanoTime());
                TaskInfoFetcher.this.updateStats(this.requestStartNanos);
                TaskInfoFetcher.this.errorTracker.requestSucceeded();
                TaskInfoFetcher.this.updateTaskInfo(newValue);
            }
            finally {
                TaskInfoFetcher.this.cleanupRequest();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void failed(Throwable cause) {
            try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", new Object[]{TaskInfoFetcher.this.taskId});){
                TaskInfoFetcher.this.lastUpdateNanos.set(System.nanoTime());
                try {
                    if (!TaskInfoFetcher.isDone(TaskInfoFetcher.this.getTaskInfo())) {
                        TaskInfoFetcher.this.errorTracker.requestFailed(cause);
                    }
                }
                catch (Error e) {
                    TaskInfoFetcher.this.onFail.accept(e);
                    throw e;
                }
                catch (RuntimeException e) {
                    TaskInfoFetcher.this.onFail.accept(e);
                }
            }
            finally {
                TaskInfoFetcher.this.cleanupRequest();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void fatal(Throwable cause) {
            try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", new Object[]{TaskInfoFetcher.this.taskId});){
                TaskInfoFetcher.this.onFail.accept(cause);
            }
            finally {
                TaskInfoFetcher.this.cleanupRequest();
            }
        }
    }
}

