/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.exchange.filesystem;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.opentelemetry.api.trace.Span;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Execution(value=ExecutionMode.CONCURRENT)
public abstract class AbstractTestExchangeManager {
    private ExchangeManager exchangeManager;

    @BeforeAll
    public void init() throws Exception {
        this.exchangeManager = this.createExchangeManager();
    }

    @AfterAll
    public void destroy() throws Exception {
        if (this.exchangeManager != null) {
            this.exchangeManager = null;
        }
    }

    protected abstract ExchangeManager createExchangeManager();

    @Test
    public void testHappyPath() throws Exception {
        ExchangeId exchangeId = ExchangeId.createRandomExchangeId();
        Exchange exchange = this.exchangeManager.createExchange((ExchangeContext)new TestExchangeContext(exchangeId), 2, false);
        ExchangeSinkHandle sinkHandle0 = exchange.addSink(0);
        ExchangeSinkHandle sinkHandle1 = exchange.addSink(1);
        ExchangeSinkHandle sinkHandle2 = exchange.addSink(2);
        exchange.noMoreSinks();
        ExchangeSinkInstanceHandle sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle0, 0).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"0-0-0", (Object)1, (Object)"0-1-0", (Object)0, (Object)"0-0-1", (Object)1, (Object)"0-1-1"), true);
        exchange.sinkFinished(sinkHandle0, 0);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle0, 1).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"0-0-0", (Object)1, (Object)"0-1-0", (Object)0, (Object)"0-0-1", (Object)1, (Object)"0-1-1"), true);
        exchange.sinkFinished(sinkHandle0, 1);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle0, 2).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"failed", (Object)1, (Object)"another failed"), false);
        exchange.sinkFinished(sinkHandle0, 2);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle1, 0).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"1-0-0", (Object)1, (Object)"1-1-0", (Object)0, (Object)"1-0-1", (Object)1, (Object)"1-1-1"), true);
        exchange.sinkFinished(sinkHandle1, 0);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle1, 1).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"1-0-0", (Object)1, (Object)"1-1-0", (Object)0, (Object)"1-0-1", (Object)1, (Object)"1-1-1"), true);
        exchange.sinkFinished(sinkHandle1, 1);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle1, 2).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"more failed", (Object)1, (Object)"another failed"), false);
        exchange.sinkFinished(sinkHandle1, 2);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle2, 2).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)ImmutableListMultimap.of((Object)0, (Object)"2-0-0", (Object)1, (Object)"2-1-0"), true);
        exchange.sinkFinished(sinkHandle2, 2);
        exchange.allRequiredSinksFinished();
        ExchangeSourceHandleSource.ExchangeSourceHandleBatch sourceHandleBatch = (ExchangeSourceHandleSource.ExchangeSourceHandleBatch)exchange.getSourceHandles().getNextBatch().get();
        Assertions.assertThat((boolean)sourceHandleBatch.lastBatch()).isTrue();
        List partitionHandles = sourceHandleBatch.handles();
        Assertions.assertThat((List)partitionHandles).hasSize(2);
        Map partitions = (Map)partitionHandles.stream().collect(ImmutableMap.toImmutableMap(ExchangeSourceHandle::getPartitionId, Function.identity()));
        ExchangeSourceOutputSelector outputSelector = ExchangeSourceOutputSelector.builder((Set)ImmutableSet.of((Object)exchangeId)).include(exchangeId, 0, 0).include(exchangeId, 1, 0).include(exchangeId, 2, 2).setPartitionCount(exchangeId, 3).setFinal().build();
        Assertions.assertThat(this.readData((ExchangeSourceHandle)partitions.get(0), outputSelector)).containsExactlyInAnyOrder((Object[])new String[]{"0-0-0", "0-0-1", "1-0-0", "1-0-1", "2-0-0"});
        Assertions.assertThat(this.readData((ExchangeSourceHandle)partitions.get(1), outputSelector)).containsExactlyInAnyOrder((Object[])new String[]{"0-1-0", "0-1-1", "1-1-0", "1-1-1", "2-1-0"});
        exchange.close();
    }

    @Test
    public void testLargePages() throws Exception {
        String smallPage = "a".repeat(Math.toIntExact(DataSize.of((long)123L, (DataSize.Unit)DataSize.Unit.BYTE).toBytes()));
        String mediumPage = "b".repeat(Math.toIntExact(DataSize.of((long)66L, (DataSize.Unit)DataSize.Unit.KILOBYTE).toBytes()));
        String largePage = "c".repeat(Math.toIntExact(DataSize.of((long)5L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes()) - 4);
        String maxPage = "d".repeat(Math.toIntExact(DataSize.of((long)16L, (DataSize.Unit)DataSize.Unit.MEGABYTE).toBytes()) - 4);
        ExchangeId exchangeId = ExchangeId.createRandomExchangeId();
        Exchange exchange = this.exchangeManager.createExchange((ExchangeContext)new TestExchangeContext(exchangeId), 3, false);
        ExchangeSinkHandle sinkHandle0 = exchange.addSink(0);
        ExchangeSinkHandle sinkHandle1 = exchange.addSink(1);
        ExchangeSinkHandle sinkHandle2 = exchange.addSink(2);
        exchange.noMoreSinks();
        ExchangeSinkInstanceHandle sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle0, 0).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)new ImmutableListMultimap.Builder().putAll((Object)0, (Iterable)ImmutableList.of((Object)smallPage)).putAll((Object)1, (Iterable)ImmutableList.of((Object)maxPage, (Object)mediumPage)).putAll((Object)2, (Iterable)ImmutableList.of()).build(), true);
        exchange.sinkFinished(sinkHandle0, 0);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle1, 0).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)new ImmutableListMultimap.Builder().putAll((Object)0, (Iterable)ImmutableList.of((Object)mediumPage)).putAll((Object)1, (Iterable)ImmutableList.of((Object)largePage)).putAll((Object)2, (Iterable)ImmutableList.of((Object)smallPage)).build(), true);
        exchange.sinkFinished(sinkHandle1, 0);
        sinkInstanceHandle = (ExchangeSinkInstanceHandle)exchange.instantiateSink(sinkHandle2, 0).get();
        this.writeData(sinkInstanceHandle, (Multimap<Integer, String>)new ImmutableListMultimap.Builder().putAll((Object)0, (Iterable)ImmutableList.of((Object)largePage, (Object)maxPage)).putAll((Object)1, (Iterable)ImmutableList.of((Object)smallPage)).putAll((Object)2, (Iterable)ImmutableList.of((Object)maxPage, (Object)largePage, (Object)mediumPage)).build(), true);
        exchange.sinkFinished(sinkHandle2, 0);
        exchange.allRequiredSinksFinished();
        ExchangeSourceHandleSource.ExchangeSourceHandleBatch sourceHandleBatch = (ExchangeSourceHandleSource.ExchangeSourceHandleBatch)exchange.getSourceHandles().getNextBatch().get();
        Assertions.assertThat((boolean)sourceHandleBatch.lastBatch()).isTrue();
        List partitionHandles = sourceHandleBatch.handles();
        Assertions.assertThat((List)partitionHandles).hasSize(10);
        ListMultimap partitions = (ListMultimap)partitionHandles.stream().collect(ImmutableListMultimap.toImmutableListMultimap(ExchangeSourceHandle::getPartitionId, Function.identity()));
        ExchangeSourceOutputSelector outputSelector = ExchangeSourceOutputSelector.builder((Set)ImmutableSet.of((Object)exchangeId)).include(exchangeId, 0, 0).include(exchangeId, 1, 0).include(exchangeId, 2, 0).setPartitionCount(exchangeId, 3).setFinal().build();
        Assertions.assertThat(this.readData(partitions.get((Object)0), outputSelector)).containsExactlyInAnyOrder((Object[])new String[]{smallPage, mediumPage, largePage, maxPage});
        Assertions.assertThat(this.readData(partitions.get((Object)1), outputSelector)).containsExactlyInAnyOrder((Object[])new String[]{smallPage, mediumPage, largePage, maxPage});
        Assertions.assertThat(this.readData(partitions.get((Object)2), outputSelector)).containsExactlyInAnyOrder((Object[])new String[]{smallPage, mediumPage, largePage, maxPage});
        exchange.close();
    }

    @Test
    public void testMaxOutputPartitionCountCheck() {
        Assertions.assertThatThrownBy(() -> this.exchangeManager.createExchange((ExchangeContext)new TestExchangeContext(ExchangeId.createRandomExchangeId()), 51, false)).hasMessageContaining("Max number of output partitions exceeded for exchange").hasFieldOrPropertyWithValue("errorCode", (Object)FileSystemExchangeErrorCode.MAX_OUTPUT_PARTITION_COUNT_EXCEEDED.toErrorCode());
    }

    private void writeData(ExchangeSinkInstanceHandle handle, Multimap<Integer, String> data, boolean finish) {
        ExchangeSink sink = this.exchangeManager.createSink(handle);
        data.forEach((key, value) -> sink.add(key.intValue(), Slices.utf8Slice((String)value)));
        if (finish) {
            MoreFutures.getFutureValue((Future)sink.finish());
        } else {
            MoreFutures.getFutureValue((Future)sink.abort());
        }
    }

    private List<String> readData(ExchangeSourceHandle handle, ExchangeSourceOutputSelector outputSelector) {
        return this.readData((List<ExchangeSourceHandle>)ImmutableList.of((Object)handle), outputSelector);
    }

    private List<String> readData(List<ExchangeSourceHandle> handles, ExchangeSourceOutputSelector outputSelector) {
        ImmutableList.Builder result = ImmutableList.builder();
        try (ExchangeSource source = this.exchangeManager.createSource();){
            source.setOutputSelector(outputSelector);
            ArrayDeque<ExchangeSourceHandle> remainingHandles = new ArrayDeque<ExchangeSourceHandle>(handles);
            while (!source.isFinished()) {
                ExchangeSourceHandle handle;
                Slice data = source.read();
                if (data != null) {
                    result.add((Object)data.toStringUtf8());
                }
                if ((handle = (ExchangeSourceHandle)remainingHandles.poll()) != null) {
                    source.addSourceHandles((List)ImmutableList.of((Object)handle));
                    continue;
                }
                source.noMoreSourceHandles();
            }
        }
        return result.build();
    }

    private record TestExchangeContext(ExchangeId exchangeId) implements ExchangeContext
    {
        private TestExchangeContext(ExchangeId exchangeId) {
            this.exchangeId = Objects.requireNonNull(exchangeId, "exchangeId is null");
        }

        public QueryId getQueryId() {
            return new QueryId("query");
        }

        public ExchangeId getExchangeId() {
            return this.exchangeId;
        }

        public Span getParentSpan() {
            return Span.getInvalid();
        }
    }
}

