/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.http.client.HttpClient;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.stats.TDigest;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.FeaturesConfig;
import io.trino.execution.TaskFailureListener;
import io.trino.execution.TaskId;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.DirectExchangeBuffer;
import io.trino.operator.DirectExchangeClientStatus;
import io.trino.operator.HttpPageBufferClient;
import io.trino.operator.PageBufferClientStatus;
import io.trino.operator.WorkProcessor;
import io.trino.plugin.base.metrics.TDigestHistogram;
import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@ThreadSafe
public class DirectExchangeClient
implements Closeable {
    private static final Logger log = Logger.get(DirectExchangeClient.class);
    private final String selfAddress;
    private final FeaturesConfig.DataIntegrityVerification dataIntegrityVerification;
    private final DataSize maxResponseSize;
    private final int concurrentRequestMultiplier;
    private final Duration maxErrorDuration;
    private final boolean acknowledgePages;
    private final HttpClient httpClient;
    private final ScheduledExecutorService scheduledExecutor;
    @GuardedBy(value="this")
    private boolean noMoreLocations;
    private final Map<URI, HttpPageBufferClient> allClients = new ConcurrentHashMap<URI, HttpPageBufferClient>();
    @GuardedBy(value="this")
    private final Set<HttpPageBufferClient> queuedClients = new LinkedHashSet<HttpPageBufferClient>();
    @GuardedBy(value="this")
    private final Set<HttpPageBufferClient> runningClients = new LinkedHashSet<HttpPageBufferClient>();
    private final Set<HttpPageBufferClient> completedClients = Sets.newConcurrentHashSet();
    private final DirectExchangeBuffer buffer;
    @GuardedBy(value="this")
    private long successfulRequests;
    @GuardedBy(value="this")
    private long averageBytesPerRequest;
    @GuardedBy(value="this")
    private boolean closed;
    @GuardedBy(value="this")
    private final TDigest requestDuration = new TDigest();
    @Nullable
    @GuardedBy(value="memoryContextLock")
    private LocalMemoryContext memoryContext;
    private final ReadWriteLock memoryContextLock = new ReentrantReadWriteLock();
    private final Lock memoryContextReadLock = this.memoryContextLock.readLock();
    private final Lock memoryContextWriteLock = this.memoryContextLock.writeLock();
    private final Executor pageBufferClientCallbackExecutor;
    private final TaskFailureListener taskFailureListener;

    public DirectExchangeClient(String selfAddress, FeaturesConfig.DataIntegrityVerification dataIntegrityVerification, DirectExchangeBuffer buffer, DataSize maxResponseSize, int concurrentRequestMultiplier, Duration maxErrorDuration, boolean acknowledgePages, HttpClient httpClient, ScheduledExecutorService scheduledExecutor, LocalMemoryContext memoryContext, Executor pageBufferClientCallbackExecutor, TaskFailureListener taskFailureListener) {
        this.selfAddress = Objects.requireNonNull(selfAddress, "selfAddress is null");
        this.dataIntegrityVerification = Objects.requireNonNull(dataIntegrityVerification, "dataIntegrityVerification is null");
        this.buffer = Objects.requireNonNull(buffer, "buffer is null");
        this.maxResponseSize = maxResponseSize;
        this.concurrentRequestMultiplier = concurrentRequestMultiplier;
        this.maxErrorDuration = maxErrorDuration;
        this.acknowledgePages = acknowledgePages;
        this.httpClient = httpClient;
        this.scheduledExecutor = scheduledExecutor;
        this.memoryContext = memoryContext;
        this.pageBufferClientCallbackExecutor = Objects.requireNonNull(pageBufferClientCallbackExecutor, "pageBufferClientCallbackExecutor is null");
        this.taskFailureListener = Objects.requireNonNull(taskFailureListener, "taskFailureListener is null");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DirectExchangeClientStatus getStatus() {
        ImmutableList.Builder pageBufferClientStatusBuilder = ImmutableList.builder();
        for (HttpPageBufferClient client : this.allClients.values()) {
            pageBufferClientStatusBuilder.add((Object)client.getStatus());
        }
        ImmutableList pageBufferClientStatus = pageBufferClientStatusBuilder.build();
        DirectExchangeClient directExchangeClient = this;
        synchronized (directExchangeClient) {
            return new DirectExchangeClientStatus(this.buffer.getRetainedSizeInBytes(), this.buffer.getMaxRetainedSizeInBytes(), this.averageBytesPerRequest, this.successfulRequests, this.buffer.getBufferedPageCount(), this.buffer.getSpilledPageCount(), this.buffer.getSpilledBytes(), this.noMoreLocations, (List<PageBufferClientStatus>)pageBufferClientStatus, new TDigestHistogram(TDigest.copyOf((TDigest)this.requestDuration)));
        }
    }

    public synchronized void addLocation(TaskId taskId, URI location) {
        Objects.requireNonNull(location, "location is null");
        if (this.closed) {
            return;
        }
        Preconditions.checkArgument((!this.allClients.containsKey(location) ? 1 : 0) != 0, (String)"location already exist: %s", (Object)location);
        Preconditions.checkState((!this.noMoreLocations ? 1 : 0) != 0, (Object)"No more locations already set");
        this.buffer.addTask(taskId);
        HttpPageBufferClient client = new HttpPageBufferClient(this.selfAddress, this.httpClient, this.dataIntegrityVerification, this.maxResponseSize, this.maxErrorDuration, this.acknowledgePages, taskId, location, new ExchangeClientCallback(), this.scheduledExecutor, this.pageBufferClientCallbackExecutor);
        this.allClients.put(location, client);
        this.queuedClients.add(client);
        this.scheduleRequestIfNecessary();
    }

    public synchronized void noMoreLocations() {
        this.noMoreLocations = true;
        this.buffer.noMoreTasks();
        this.scheduleRequestIfNecessary();
    }

    public WorkProcessor<Slice> pages() {
        return WorkProcessor.create(() -> {
            Slice page = this.pollPage();
            if (page == null) {
                if (this.isFinished()) {
                    return WorkProcessor.ProcessState.finished();
                }
                ListenableFuture<Void> blocked = this.isBlocked();
                if (!blocked.isDone()) {
                    return WorkProcessor.ProcessState.blocked(blocked);
                }
                return WorkProcessor.ProcessState.yielded();
            }
            return WorkProcessor.ProcessState.ofResult(page);
        });
    }

    private void assertNotHoldsLock() {
        assert (!Thread.holdsLock(this)) : "Cannot get next page while holding a lock on this";
    }

    @Nullable
    public Slice pollPage() {
        this.assertNotHoldsLock();
        Slice page = this.buffer.pollPage();
        if (page == null) {
            return null;
        }
        this.updateRetainedMemory();
        this.scheduleRequestIfNecessary();
        return page;
    }

    public boolean isFinished() {
        return this.buffer.isFinished() && this.completedClients.size() == this.allClients.size();
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        for (HttpPageBufferClient client : this.allClients.values()) {
            DirectExchangeClient.closeQuietly(client);
        }
        try {
            this.buffer.close();
        }
        catch (RuntimeException e) {
            log.warn((Throwable)e, "error closing buffer");
        }
        finally {
            this.releaseMemoryContext();
        }
    }

    @VisibleForTesting
    synchronized int scheduleRequestIfNecessary() {
        if ((this.buffer.isFinished() || this.buffer.isFailed()) && this.completedClients.size() == this.allClients.size()) {
            return 0;
        }
        long neededBytes = this.buffer.getRemainingCapacityInBytes();
        if (neededBytes <= 0L) {
            return 0;
        }
        long reservedBytesForScheduledClients = this.runningClients.stream().mapToLong(HttpPageBufferClient::getAverageRequestSizeInBytes).sum();
        long projectedBytesToBeRequested = 0L;
        int clientCount = 0;
        Iterator<HttpPageBufferClient> clientIterator = this.queuedClients.iterator();
        while (clientIterator.hasNext()) {
            HttpPageBufferClient client = clientIterator.next();
            if (projectedBytesToBeRequested >= neededBytes * (long)this.concurrentRequestMultiplier - reservedBytesForScheduledClients) break;
            projectedBytesToBeRequested += client.getAverageRequestSizeInBytes();
            client.scheduleRequest();
            clientIterator.remove();
            this.runningClients.add(client);
            ++clientCount;
        }
        return clientCount;
    }

    public ListenableFuture<Void> isBlocked() {
        return this.buffer.isBlocked();
    }

    @VisibleForTesting
    Set<HttpPageBufferClient> getQueuedClients() {
        return this.queuedClients;
    }

    @VisibleForTesting
    Set<HttpPageBufferClient> getRunningClients() {
        return this.runningClients;
    }

    @VisibleForTesting
    Map<URI, HttpPageBufferClient> getAllClients() {
        return this.allClients;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addPages(HttpPageBufferClient client, List<Slice> pages) {
        Preconditions.checkState((!this.completedClients.contains(client) ? 1 : 0) != 0, (Object)"client is already marked as completed");
        long responseSize = 0L;
        if (!pages.isEmpty()) {
            for (Slice page : pages) {
                responseSize += (long)page.length();
            }
            this.buffer.addPages(client.getRemoteTaskId(), pages);
            this.updateRetainedMemory();
        }
        DirectExchangeClient directExchangeClient = this;
        synchronized (directExchangeClient) {
            if (this.closed || this.buffer.isFinished() || this.buffer.isFailed()) {
                return false;
            }
            ++this.successfulRequests;
            this.averageBytesPerRequest = (long)(1.0 * (double)this.averageBytesPerRequest * (double)(this.successfulRequests - 1L) / (double)this.successfulRequests + (double)responseSize / (double)this.successfulRequests);
        }
        return true;
    }

    private void updateRetainedMemory() {
        this.memoryContextReadLock.lock();
        try {
            if (this.memoryContext != null) {
                this.memoryContext.setBytes(this.buffer.getRetainedSizeInBytes());
            }
        }
        finally {
            this.memoryContextReadLock.unlock();
        }
    }

    private void releaseMemoryContext() {
        this.memoryContextWriteLock.lock();
        try {
            if (this.memoryContext != null) {
                this.memoryContext.setBytes(0L);
                this.memoryContext = null;
            }
        }
        finally {
            this.memoryContextWriteLock.unlock();
        }
    }

    private synchronized void requestComplete(HttpPageBufferClient client) {
        this.requestDuration.add((double)client.getLastRequestDurationMillis());
        if (!this.completedClients.contains(client) && !this.queuedClients.contains(client)) {
            this.queuedClients.add(client);
            this.runningClients.remove(client);
        }
        this.scheduleRequestIfNecessary();
    }

    private synchronized void clientFinished(HttpPageBufferClient client) {
        Objects.requireNonNull(client, "client is null");
        if (this.completedClients.add(client)) {
            this.runningClients.remove(client);
            this.buffer.taskFinished(client.getRemoteTaskId());
        }
        this.scheduleRequestIfNecessary();
    }

    private synchronized void clientFailed(HttpPageBufferClient client, Throwable cause) {
        Objects.requireNonNull(client, "client is null");
        if (this.completedClients.add(client)) {
            this.runningClients.remove(client);
            this.buffer.taskFailed(client.getRemoteTaskId(), cause);
            this.scheduledExecutor.execute(() -> this.taskFailureListener.onTaskFailed(client.getRemoteTaskId(), cause));
            DirectExchangeClient.closeQuietly(client);
        }
        this.scheduleRequestIfNecessary();
    }

    private static void closeQuietly(HttpPageBufferClient client) {
        try {
            client.close();
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    private class ExchangeClientCallback
    implements HttpPageBufferClient.ClientCallback {
        private ExchangeClientCallback() {
        }

        @Override
        public boolean addPages(HttpPageBufferClient client, List<Slice> pages) {
            Objects.requireNonNull(client, "client is null");
            Objects.requireNonNull(pages, "pages is null");
            return DirectExchangeClient.this.addPages(client, pages);
        }

        @Override
        public void requestComplete(HttpPageBufferClient client) {
            Objects.requireNonNull(client, "client is null");
            DirectExchangeClient.this.requestComplete(client);
        }

        @Override
        public void clientFinished(HttpPageBufferClient client) {
            DirectExchangeClient.this.clientFinished(client);
        }

        @Override
        public void clientFailed(HttpPageBufferClient client, Throwable cause) {
            Objects.requireNonNull(client, "client is null");
            Objects.requireNonNull(cause, "cause is null");
            DirectExchangeClient.this.clientFailed(client, cause);
        }
    }
}

