/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.seekablestream.RecordSupplierInputSource;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class RecordSupplierInputSourceTest
extends InitializedNullHandlingTest {
    private static final int NUM_COLS = 16;
    private static final int NUM_ROWS = 128;
    private static final String TIMESTAMP_STRING = "2019-01-01";
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testRead() throws IOException {
        int read;
        RandomCsvSupplier supplier = new RandomCsvSupplier();
        RecordSupplierInputSource inputSource = new RecordSupplierInputSource("topic", (RecordSupplier)supplier, false, null);
        List colNames = IntStream.range(0, 16).mapToObj(i -> StringUtils.format((String)"col_%d", (Object[])new Object[]{i})).collect(Collectors.toList());
        CsvInputFormat inputFormat = new CsvInputFormat(colNames, null, null, Boolean.valueOf(false), 0);
        InputSourceReader reader = inputSource.reader(new InputRowSchema(new TimestampSpec("col_0", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, colNames.size()))), ColumnsFilter.all()), (InputFormat)inputFormat, this.temporaryFolder.newFolder());
        try (CloseableIterator iterator = reader.read();){
            for (read = 0; read < 128 && iterator.hasNext(); ++read) {
                InputRow inputRow = (InputRow)iterator.next();
                Assert.assertEquals((Object)DateTimes.of((String)TIMESTAMP_STRING), (Object)inputRow.getTimestamp());
                Assert.assertEquals((long)15L, (long)inputRow.getDimensions().size());
            }
        }
        Assert.assertEquals((long)128L, (long)read);
        Assert.assertTrue((boolean)supplier.isClosed());
    }

    @Test
    public void testReadTimeout() throws IOException {
        int read;
        RandomCsvSupplier supplier = new RandomCsvSupplier();
        RecordSupplierInputSource inputSource = new RecordSupplierInputSource("topic", (RecordSupplier)supplier, false, Integer.valueOf(-1000));
        List colNames = IntStream.range(0, 16).mapToObj(i -> StringUtils.format((String)"col_%d", (Object[])new Object[]{i})).collect(Collectors.toList());
        CsvInputFormat inputFormat = new CsvInputFormat(colNames, null, null, Boolean.valueOf(false), 0);
        InputSourceReader reader = inputSource.reader(new InputRowSchema(new TimestampSpec("col_0", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, colNames.size()))), ColumnsFilter.all()), (InputFormat)inputFormat, this.temporaryFolder.newFolder());
        try (CloseableIterator iterator = reader.read();){
            for (read = 0; read < 128 && iterator.hasNext(); ++read) {
                iterator.next();
            }
        }
        Assert.assertEquals((long)0L, (long)read);
        Assert.assertTrue((boolean)supplier.isClosed());
    }

    private static class RandomCsvSupplier
    implements RecordSupplier<Integer, Integer, ByteEntity> {
        private static final int STR_LEN = 8;
        private final Random random = ThreadLocalRandom.current();
        private final Map<Integer, Integer> partitionToOffset = Maps.newHashMapWithExpectedSize((int)3);
        private volatile boolean closed = false;

        private RandomCsvSupplier() {
            for (int i = 0; i < 3; ++i) {
                this.partitionToOffset.put(i, 0);
            }
        }

        public void assign(Set<StreamPartition<Integer>> streamPartitions) {
        }

        public void seekToEarliest(Set<StreamPartition<Integer>> streamPartitions) {
        }

        public void seekToLatest(Set<StreamPartition<Integer>> streamPartitions) {
        }

        @NotNull
        public List<OrderedPartitionableRecord<Integer, Integer, ByteEntity>> poll(long timeout) {
            long sleepTime = this.random.nextInt((int)timeout);
            try {
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (sleepTime == timeout) {
                return Collections.emptyList();
            }
            int numRecords = this.random.nextInt(8);
            ArrayList<OrderedPartitionableRecord<Integer, Integer, ByteEntity>> records = new ArrayList<OrderedPartitionableRecord<Integer, Integer, ByteEntity>>(numRecords);
            for (int i = 0; i < numRecords; ++i) {
                int partitionId = this.random.nextInt(this.partitionToOffset.size());
                int offset = this.partitionToOffset.get(partitionId);
                int numBytes = this.random.nextInt(3);
                List bytes = IntStream.range(0, numBytes).mapToObj(j -> {
                    ArrayList<String> columns = new ArrayList<String>(16);
                    columns.add(RecordSupplierInputSourceTest.TIMESTAMP_STRING);
                    for (int k = 0; k < 15; ++k) {
                        columns.add(RandomStringUtils.random((int)8, (boolean)true, (boolean)false));
                    }
                    return new ByteEntity(StringUtils.toUtf8((String)String.join((CharSequence)",", columns)));
                }).collect(Collectors.toList());
                records.add((OrderedPartitionableRecord<Integer, Integer, ByteEntity>)new OrderedPartitionableRecord("topic", (Object)partitionId, (Object)offset, bytes));
            }
            return records;
        }

        public Set<Integer> getPartitionIds(String stream) {
            return this.partitionToOffset.keySet();
        }

        public void close() {
            this.closed = true;
        }

        boolean isClosed() {
            return this.closed;
        }

        public void seek(StreamPartition<Integer> partition, Integer sequenceNumber) {
            throw new UnsupportedOperationException();
        }

        public Collection<StreamPartition<Integer>> getAssignment() {
            throw new UnsupportedOperationException();
        }

        @Nullable
        public Integer getLatestSequenceNumber(StreamPartition<Integer> partition) {
            throw new UnsupportedOperationException();
        }

        @Nullable
        public Integer getEarliestSequenceNumber(StreamPartition<Integer> partition) {
            throw new UnsupportedOperationException();
        }

        public boolean isOffsetAvailable(StreamPartition<Integer> partition, OrderedSequenceNumber<Integer> offset) {
            return true;
        }

        public Integer getPosition(StreamPartition<Integer> partition) {
            throw new UnsupportedOperationException();
        }
    }
}

