/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieMessageQueue;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.HoodieLazyInsertIterable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import scala.Tuple2;

public class TestBoundedInMemoryQueue
extends HoodieSparkClientTestHarness {
    private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()).withWriteBufferLimitBytes(1024).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        this.initTestDataGenerator();
        this.initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    @Test
    @Timeout(value=60L)
    public void testRecordReading() throws Exception {
        int numRecords = 128;
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(128));
        BoundedInMemoryQueue queue = new BoundedInMemoryQueue(1024L, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig));
        Future<Boolean> resFuture = this.executorService.submit(() -> {
            new IteratorBasedQueueProducer(hoodieRecords.iterator()).produce((HoodieMessageQueue)queue);
            queue.seal();
            return true;
        });
        Iterator originalRecordIterator = hoodieRecords.iterator();
        int recordsRead = 0;
        while (queue.iterator().hasNext()) {
            HoodieAvroRecord originalRecord = (HoodieAvroRecord)originalRecordIterator.next();
            Option originalInsertValue = originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
            HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = (HoodieLazyInsertIterable.HoodieInsertValueGenResult)queue.iterator().next();
            Assertions.assertEquals((Object)originalRecord, (Object)genResult.getResult());
            Assertions.assertEquals((Object)originalInsertValue, (Object)((HoodieAvroRecord)genResult.getResult()).getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA));
            ++recordsRead;
        }
        Assertions.assertFalse((queue.iterator().hasNext() || originalRecordIterator.hasNext() ? 1 : 0) != 0);
        Assertions.assertEquals((int)128, (int)recordsRead);
        resFuture.get();
    }

    @Test
    @Timeout(value=60L)
    public void testCompositeProducerRecordReading() throws Exception {
        int numRecords = 1000;
        int numProducers = 40;
        ArrayList<List> recs = new ArrayList<List>();
        BoundedInMemoryQueue queue = new BoundedInMemoryQueue(1024L, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig));
        HashMap<String, Tuple2> keyToProducerAndIndexMap = new HashMap<String, Tuple2>();
        for (int i = 0; i < 40; ++i) {
            List pRecs = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(1000));
            int j = 0;
            for (HoodieRecord r : pRecs) {
                Assertions.assertFalse((boolean)keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
                keyToProducerAndIndexMap.put(r.getRecordKey(), new Tuple2((Object)i, (Object)j));
                ++j;
            }
            recs.add(pRecs);
        }
        ArrayList<Object> producers = new ArrayList<Object>();
        for (int i = 0; i < recs.size(); ++i) {
            List r = (List)recs.get(i);
            if (i % 2 == 0) {
                producers.add(new IteratorBasedQueueProducer(r.iterator()));
                continue;
            }
            producers.add(new FunctionBasedQueueProducer(buf -> {
                Iterator itr = r.iterator();
                while (itr.hasNext()) {
                    try {
                        buf.insertRecord(itr.next());
                    }
                    catch (Exception e) {
                        throw new HoodieException((Throwable)e);
                    }
                }
                return true;
            }));
        }
        List futureList = producers.stream().map(producer -> this.executorService.submit(() -> {
            producer.produce((HoodieMessageQueue)queue);
            return true;
        })).collect(Collectors.toList());
        Future<Boolean> closeFuture = this.executorService.submit(() -> {
            try {
                for (Future f : futureList) {
                    f.get();
                }
                queue.seal();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return true;
        });
        Map lastSeenMap = IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
        Map countMap = IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
        while (queue.iterator().hasNext()) {
            HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = (HoodieLazyInsertIterable.HoodieInsertValueGenResult)queue.iterator().next();
            HoodieRecord rec = genResult.getResult();
            Tuple2 producerPos = (Tuple2)keyToProducerAndIndexMap.get(rec.getRecordKey());
            Integer lastSeenPos = lastSeenMap.get(producerPos._1());
            countMap.put(producerPos._1(), countMap.get(producerPos._1()) + 1);
            lastSeenMap.put(producerPos._1(), lastSeenPos + 1);
            Assertions.assertEquals((int)(lastSeenPos + 1), (int)((Integer)producerPos._2()));
        }
        for (int i = 0; i < 40; ++i) {
            Assertions.assertEquals((Integer)1000, (Integer)countMap.get(i));
        }
        closeFuture.get();
    }

    @Test
    @Timeout(value=60L)
    public void testMemoryLimitForBuffering() throws Exception {
        int numRecords = 128;
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(128));
        int recordLimit = 5;
        DefaultSizeEstimator sizeEstimator = new DefaultSizeEstimator();
        HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = (HoodieLazyInsertIterable.HoodieInsertValueGenResult)HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig).apply(hoodieRecords.get(0));
        long objSize = sizeEstimator.sizeEstimate((Object)genResult);
        long memoryLimitInBytes = 5L * objSize;
        BoundedInMemoryQueue queue = new BoundedInMemoryQueue(memoryLimitInBytes, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig));
        this.executorService.submit(() -> {
            new IteratorBasedQueueProducer(hoodieRecords.iterator()).produce((HoodieMessageQueue)queue);
            return true;
        });
        while (!this.isQueueFull(queue.rateLimiter)) {
            Thread.sleep(10L);
        }
        Assertions.assertEquals((int)0, (int)queue.rateLimiter.availablePermits());
        Assertions.assertEquals((int)5, (int)queue.currentRateLimit);
        Assertions.assertEquals((long)5L, (long)queue.size());
        Assertions.assertEquals((long)4L, (long)queue.samplingRecordCounter.get());
        Assertions.assertEquals(hoodieRecords.get(0), (Object)((HoodieLazyInsertIterable.HoodieInsertValueGenResult)queue.iterator().next()).getResult());
        Assertions.assertEquals(hoodieRecords.get(1), (Object)((HoodieLazyInsertIterable.HoodieInsertValueGenResult)queue.iterator().next()).getResult());
        while (!this.isQueueFull(queue.rateLimiter)) {
            Thread.sleep(10L);
        }
        Assertions.assertEquals((int)0, (int)queue.rateLimiter.availablePermits());
        Assertions.assertEquals((int)5, (int)queue.currentRateLimit);
        Assertions.assertEquals((long)5L, (long)queue.size());
        Assertions.assertEquals((long)6L, (long)queue.samplingRecordCounter.get());
    }

    @Test
    @Timeout(value=60L)
    public void testException() throws Exception {
        int numRecords = 256;
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(256));
        DefaultSizeEstimator sizeEstimator = new DefaultSizeEstimator();
        HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = (HoodieLazyInsertIterable.HoodieInsertValueGenResult)HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig).apply(hoodieRecords.get(0));
        long objSize = sizeEstimator.sizeEstimate((Object)new Tuple2((Object)genResult.getResult(), (Object)genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
        long memoryLimitInBytes = 4L * objSize;
        BoundedInMemoryQueue queue1 = new BoundedInMemoryQueue(memoryLimitInBytes, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig));
        Future<Boolean> resFuture = this.executorService.submit(() -> {
            new IteratorBasedQueueProducer(hoodieRecords.iterator()).produce((HoodieMessageQueue)queue1);
            return true;
        });
        while (!this.isQueueFull(queue1.rateLimiter)) {
            Thread.sleep(10L);
        }
        Exception e = new Exception("Failing it :)");
        queue1.markAsFailed((Throwable)e);
        Throwable thrown1 = Assertions.assertThrows(ExecutionException.class, resFuture::get, (String)"exception is expected");
        Assertions.assertEquals(HoodieException.class, thrown1.getCause().getClass());
        Assertions.assertEquals((Object)e, (Object)thrown1.getCause().getCause());
        RuntimeException expectedException = new RuntimeException("failing record reading");
        Iterator mockHoodieRecordsIterator = (Iterator)Mockito.mock(Iterator.class);
        Mockito.when((Object)mockHoodieRecordsIterator.hasNext()).thenReturn((Object)true);
        Mockito.when(mockHoodieRecordsIterator.next()).thenThrow(new Throwable[]{expectedException});
        BoundedInMemoryQueue queue2 = new BoundedInMemoryQueue(memoryLimitInBytes, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig));
        Future<Boolean> res = this.executorService.submit(() -> {
            try {
                new IteratorBasedQueueProducer(mockHoodieRecordsIterator).produce((HoodieMessageQueue)queue2);
            }
            catch (Exception ex) {
                queue2.markAsFailed((Throwable)ex);
                throw ex;
            }
            return true;
        });
        Throwable thrown2 = Assertions.assertThrows(Exception.class, () -> queue2.iterator().hasNext(), (String)"exception is expected");
        Assertions.assertEquals((Object)expectedException, (Object)thrown2.getCause());
        Throwable thrown3 = Assertions.assertThrows(ExecutionException.class, res::get, (String)"exception is expected");
        Assertions.assertEquals((Object)expectedException, (Object)thrown3.getCause());
    }

    private boolean isQueueFull(Semaphore rateLimiter) {
        return rateLimiter.availablePermits() == 0 && rateLimiter.hasQueuedThreads();
    }
}

