/*
 * Decompiled with CFR 0.152.
 */
package io.trino.server.testing.exchange;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import io.trino.server.testing.exchange.LocalFileSystemExchangeSinkHandle;
import io.trino.server.testing.exchange.LocalFileSystemExchangeSinkInstanceHandle;
import io.trino.server.testing.exchange.LocalFileSystemExchangeSourceHandle;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceSplitter;
import io.trino.spi.exchange.ExchangeSourceStatistics;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;

public class LocalFileSystemExchange
implements Exchange {
    private static final Pattern PARTITION_FILE_NAME_PATTERN = Pattern.compile("(\\d+)\\.data");
    private final Path baseDirectory;
    private final ExchangeContext exchangeContext;
    private final int outputPartitionCount;
    @GuardedBy(value="this")
    private final Set<LocalFileSystemExchangeSinkHandle> allSinks = new HashSet<LocalFileSystemExchangeSinkHandle>();
    @GuardedBy(value="this")
    private final Set<LocalFileSystemExchangeSinkHandle> finishedSinks = new HashSet<LocalFileSystemExchangeSinkHandle>();
    @GuardedBy(value="this")
    private boolean noMoreSinks;
    private final CompletableFuture<List<ExchangeSourceHandle>> exchangeSourceHandlesFuture = new CompletableFuture();
    @GuardedBy(value="this")
    private boolean exchangeSourceHandlesCreated;

    public LocalFileSystemExchange(Path baseDirectory, ExchangeContext exchangeContext, int outputPartitionCount) {
        this.baseDirectory = Objects.requireNonNull(baseDirectory, "baseDirectory is null");
        this.exchangeContext = Objects.requireNonNull(exchangeContext, "exchangeContext is null");
        this.outputPartitionCount = outputPartitionCount;
    }

    public void initialize() {
        try {
            Files.createDirectories(this.getExchangeDirectory(), new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public synchronized ExchangeSinkHandle addSink(int taskPartitionId) {
        LocalFileSystemExchangeSinkHandle sinkHandle = new LocalFileSystemExchangeSinkHandle(this.exchangeContext.getQueryId(), this.exchangeContext.getExchangeId(), taskPartitionId);
        this.allSinks.add(sinkHandle);
        return sinkHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void noMoreSinks() {
        LocalFileSystemExchange localFileSystemExchange = this;
        synchronized (localFileSystemExchange) {
            this.noMoreSinks = true;
        }
        this.checkInputReady();
    }

    public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) {
        LocalFileSystemExchangeSinkHandle localFileSystemSinkHandle = (LocalFileSystemExchangeSinkHandle)sinkHandle;
        Path outputDirectory = this.getExchangeDirectory().resolve(Integer.toString(localFileSystemSinkHandle.getTaskPartitionId())).resolve(Integer.toString(taskAttemptId));
        return new LocalFileSystemExchangeSinkInstanceHandle(localFileSystemSinkHandle, outputDirectory, this.outputPartitionCount);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sinkFinished(ExchangeSinkInstanceHandle handle) {
        LocalFileSystemExchange localFileSystemExchange = this;
        synchronized (localFileSystemExchange) {
            LocalFileSystemExchangeSinkInstanceHandle localHandle = (LocalFileSystemExchangeSinkInstanceHandle)handle;
            this.finishedSinks.add(localHandle.getSinkHandle());
        }
        this.checkInputReady();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkInputReady() {
        Verify.verify((!Thread.holdsLock(this) ? 1 : 0) != 0);
        List<ExchangeSourceHandle> exchangeSourceHandles = null;
        LocalFileSystemExchange localFileSystemExchange = this;
        synchronized (localFileSystemExchange) {
            if (this.exchangeSourceHandlesCreated) {
                return;
            }
            if (this.noMoreSinks && this.finishedSinks.containsAll(this.allSinks)) {
                exchangeSourceHandles = this.createExchangeSourceHandles();
                this.exchangeSourceHandlesCreated = true;
            }
        }
        if (exchangeSourceHandles != null) {
            this.exchangeSourceHandlesFuture.complete(exchangeSourceHandles);
        }
    }

    private synchronized List<ExchangeSourceHandle> createExchangeSourceHandles() {
        ArrayListMultimap partitionFiles = ArrayListMultimap.create();
        for (LocalFileSystemExchangeSinkHandle sinkHandle : this.finishedSinks) {
            Path committedAttemptPath = this.getCommittedAttemptPath(sinkHandle);
            Map<Integer, Path> partitions = LocalFileSystemExchange.getCommittedPartitions(committedAttemptPath);
            partitions.forEach((arg_0, arg_1) -> LocalFileSystemExchange.lambda$createExchangeSourceHandles$0((Multimap)partitionFiles, arg_0, arg_1));
        }
        ImmutableList.Builder result = ImmutableList.builder();
        for (Integer partitionId : partitionFiles.keySet()) {
            result.add((Object)new LocalFileSystemExchangeSourceHandle(partitionId, (List<String>)ImmutableList.copyOf((Collection)partitionFiles.get((Object)partitionId))));
        }
        return result.build();
    }

    private Path getCommittedAttemptPath(LocalFileSystemExchangeSinkHandle sinkHandle) {
        Path sinkOutputBasePath = this.getExchangeDirectory().resolve(Integer.toString(sinkHandle.getTaskPartitionId()));
        try {
            List<Path> attemptPaths = LocalFileSystemExchange.listFiles(sinkOutputBasePath, x$0 -> Files.isDirectory(x$0, new LinkOption[0]));
            Preconditions.checkState((!attemptPaths.isEmpty() ? 1 : 0) != 0, (String)"no attempts found for sink %s", (Object)sinkHandle);
            List committedAttemptPaths = (List)attemptPaths.stream().filter(LocalFileSystemExchange::isCommitted).collect(ImmutableList.toImmutableList());
            Preconditions.checkState((!committedAttemptPaths.isEmpty() ? 1 : 0) != 0, (String)"no committed attempts found for %s", (Object)sinkHandle);
            return (Path)committedAttemptPaths.get(0);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static boolean isCommitted(Path attemptPath) {
        Path commitMarkerFilePath = attemptPath.resolve("committed");
        return Files.exists(commitMarkerFilePath, new LinkOption[0]);
    }

    private static Map<Integer, Path> getCommittedPartitions(Path committedAttemptPath) {
        try {
            List<Path> partitionFiles = LocalFileSystemExchange.listFiles(committedAttemptPath, path -> path.toString().endsWith(".data"));
            ImmutableMap.Builder result = ImmutableMap.builder();
            for (Path partitionFile : partitionFiles) {
                Matcher matcher = PARTITION_FILE_NAME_PATTERN.matcher(partitionFile.getFileName().toString());
                Preconditions.checkState((boolean)matcher.matches(), (String)"unexpected partition file: %s", (Object)partitionFile);
                int partitionId = Integer.parseInt(matcher.group(1));
                result.put((Object)partitionId, (Object)partitionFile);
            }
            return result.build();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private Path getExchangeDirectory() {
        return this.baseDirectory.resolve(this.exchangeContext.getQueryId() + "." + this.exchangeContext.getExchangeId());
    }

    public CompletableFuture<List<ExchangeSourceHandle>> getSourceHandles() {
        return this.exchangeSourceHandlesFuture;
    }

    public ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes) {
        final LocalFileSystemExchangeSourceHandle localHandle = (LocalFileSystemExchangeSourceHandle)handle;
        final Iterator<String> filesIterator = localHandle.getFiles().iterator();
        return new ExchangeSourceSplitter(){

            public CompletableFuture<?> isBlocked() {
                return CompletableFuture.completedFuture(null);
            }

            public Optional<ExchangeSourceHandle> getNext() {
                if (filesIterator.hasNext()) {
                    return Optional.of(new LocalFileSystemExchangeSourceHandle(localHandle.getPartitionId(), (List<String>)ImmutableList.of((Object)((String)filesIterator.next()))));
                }
                return Optional.empty();
            }

            public void close() {
            }
        };
    }

    public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle handle) {
        LocalFileSystemExchangeSourceHandle localHandle = (LocalFileSystemExchangeSourceHandle)handle;
        long sizeInBytes = 0L;
        for (String file : localHandle.getFiles()) {
            try {
                sizeInBytes += Files.size(Paths.get(file, new String[0]));
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return new ExchangeSourceStatistics(sizeInBytes);
    }

    public void close() {
        try {
            Path exchangeDirectory = this.getExchangeDirectory();
            if (Files.exists(exchangeDirectory, new LinkOption[0])) {
                MoreFiles.deleteRecursively((Path)exchangeDirectory, (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static List<Path> listFiles(Path directory, Predicate<Path> predicate) throws IOException {
        ImmutableList.Builder builder = ImmutableList.builder();
        try (Stream<Path> dir = Files.list(directory);){
            dir.filter(predicate).forEach(arg_0 -> ((ImmutableList.Builder)builder).add(arg_0));
        }
        return builder.build();
    }

    private static /* synthetic */ void lambda$createExchangeSourceHandles$0(Multimap partitionFiles, Integer partition, Path file) {
        partitionFiles.put((Object)partition, (Object)file.toAbsolutePath().toString());
    }
}

