/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jena.riot.lang;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.NodeFactory;
import org.apache.jena.graph.Triple;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.lang.PipedQuadsStream;
import org.apache.jena.riot.lang.PipedRDFIterator;
import org.apache.jena.riot.lang.PipedTriplesStream;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.util.NodeFactoryExtra;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestPipedRDFIterators {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestPipedRDFIterators.class);
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(10);
    }

    @AfterClass
    public static void teardown() throws InterruptedException {
        executor.shutdownNow();
        executor.awaitTermination(10L, TimeUnit.SECONDS);
    }

    private void test_streamed_triples(int bufferSize, final int generateSize, boolean fair) throws InterruptedException, ExecutionException, TimeoutException {
        final PipedRDFIterator it = new PipedRDFIterator(bufferSize, fair);
        final PipedTriplesStream out = new PipedTriplesStream(it);
        Runnable genTriples = new Runnable(){

            @Override
            public void run() {
                out.start();
                for (int i = 1; i <= generateSize; ++i) {
                    Triple t = new Triple(NodeFactory.createBlankNode(), NodeFactory.createURI((String)"http://predicate"), NodeFactoryExtra.intToNode((int)i));
                    out.triple(t);
                }
                out.finish();
            }
        };
        Callable<Integer> consumeTriples = new Callable<Integer>(){

            @Override
            public Integer call() {
                int count = 0;
                while (it.hasNext()) {
                    it.next();
                    ++count;
                }
                return count;
            }
        };
        Future<?> genResult = executor.submit(genTriples);
        Future<Integer> result = executor.submit(consumeTriples);
        Integer count = 0;
        try {
            count = result.get(10L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            genResult.get();
            throw e;
        }
        Assert.assertEquals((long)generateSize, (long)count.intValue());
    }

    @Test
    public void streamed_triples_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(1, 100, true);
    }

    @Test
    public void streamed_triples_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(10, 1000, false);
    }

    @Test
    public void streamed_triples_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(100, 1000, false);
    }

    @Test
    public void streamed_triples_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(1000, 1000, false);
    }

    @Test
    public void streamed_triples_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(10000, 1000, false);
    }

    @Test
    public void streamed_triples_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(1000, 100000, false);
    }

    @Test
    public void streamed_triples_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_triples(10000, 100000, false);
    }

    private void test_streamed_quads(int bufferSize, final int generateSize, boolean fair) throws InterruptedException, ExecutionException, TimeoutException {
        final PipedRDFIterator it = new PipedRDFIterator(bufferSize, fair);
        final PipedQuadsStream out = new PipedQuadsStream(it);
        Runnable genQuads = new Runnable(){

            @Override
            public void run() {
                out.start();
                for (int i = 1; i <= generateSize; ++i) {
                    Quad q = new Quad(NodeFactory.createURI((String)"http://graph"), NodeFactory.createBlankNode(), NodeFactory.createURI((String)"http://predicate"), NodeFactoryExtra.intToNode((int)i));
                    out.quad(q);
                }
                out.finish();
            }
        };
        Callable<Integer> consumeQuads = new Callable<Integer>(){

            @Override
            public Integer call() {
                int count = 0;
                while (it.hasNext()) {
                    it.next();
                    ++count;
                }
                return count;
            }
        };
        Future<?> genResult = executor.submit(genQuads);
        Future<Integer> result = executor.submit(consumeQuads);
        Integer count = 0;
        try {
            count = result.get(10L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            genResult.get();
            throw e;
        }
        Assert.assertEquals((long)generateSize, (long)count.intValue());
    }

    @Test
    public void streamed_quads_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(1, 100, true);
    }

    @Test
    public void streamed_quads_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(10, 1000, false);
    }

    @Test
    public void streamed_quads_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(100, 1000, false);
    }

    @Test
    public void streamed_quads_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(1000, 1000, false);
    }

    @Test
    public void streamed_quads_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(10000, 1000, false);
    }

    @Test
    public void streamed_quads_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(1000, 100000, false);
    }

    @Test
    public void streamed_quads_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
        this.test_streamed_quads(10000, 100000, false);
    }

    @Test(expected=IllegalArgumentException.class)
    public void streamed_instantiation_bad_01() {
        new PipedRDFIterator(0);
    }

    @Test(expected=IllegalArgumentException.class)
    public void streamed_instantiation_bad_02() {
        new PipedRDFIterator(-1);
    }

    private void test_streamed_triples_bad(final String data, int expected) throws TimeoutException, InterruptedException {
        final PipedRDFIterator it = new PipedRDFIterator();
        final PipedTriplesStream out = new PipedTriplesStream(it);
        Runnable runParser = new Runnable(){

            @Override
            public void run() {
                Charset utf8 = StandardCharsets.UTF_8;
                ByteArrayInputStream input = new ByteArrayInputStream(data.getBytes(utf8));
                try {
                    RDFParser.source((InputStream)input).lang(Lang.TTL).parse((StreamRDF)out);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        };
        Callable<Integer> consumeTriples = new Callable<Integer>(){

            @Override
            public Integer call() {
                int count = 0;
                while (it.hasNext()) {
                    it.next();
                    ++count;
                }
                return count;
            }
        };
        Future<?> genResult = executor.submit(runParser);
        Future<Integer> result = executor.submit(consumeTriples);
        Integer count = 0;
        try {
            count = result.get(10L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            try {
                genResult.get();
            }
            catch (ExecutionException ex) {
                LOGGER.warn("Errored as expected", (Throwable)ex);
            }
            throw e;
        }
        catch (ExecutionException e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertEquals((long)expected, (long)count.intValue());
    }

    @Test
    public void streamed_triples_bad_01() throws TimeoutException, InterruptedException {
        this.test_streamed_triples_bad("@prefix : <http://unterminated", 0);
    }

    @Test
    public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
        this.test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
    }

    @Test(expected=IllegalStateException.class)
    public void streamed_state_bad_01() {
        PipedRDFIterator it = new PipedRDFIterator();
        it.hasNext();
    }

    public void streamed_state_bad_02() {
        PipedRDFIterator it = new PipedRDFIterator();
        final PipedTriplesStream out = new PipedTriplesStream(it);
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                out.start();
                out.triple(Triple.create((Node)NodeFactory.createURI((String)"urn:s"), (Node)NodeFactory.createURI((String)"urn:p"), (Node)NodeFactory.createURI((String)"urn:o")));
                out.finish();
                throw new RuntimeException("die!");
            }
        });
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
            }
        });
        t.start();
        Assert.assertTrue((boolean)it.hasNext());
        it.next();
        it.hasNext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void streamed_state_bad_03() {
        final PipedRDFIterator it = new PipedRDFIterator();
        final PipedTriplesStream out = new PipedTriplesStream(it);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Runnable producer = new Runnable(){

            @Override
            public void run() {
                out.start();
                out.triple(Triple.create((Node)NodeFactory.createURI((String)"urn:s"), (Node)NodeFactory.createURI((String)"urn:p"), (Node)NodeFactory.createURI((String)"urn:o")));
                throw new RuntimeException("die!");
            }
        };
        executor.submit(producer);
        Runnable consumer = new Runnable(){

            @Override
            public void run() {
                it.hasNext();
                it.next();
                it.next();
            }
        };
        Future<?> f = executor.submit(consumer);
        try {
            f.get(3L, TimeUnit.SECONDS);
            Assert.fail((String)"Expected an error");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((e.getCause() != null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)(e.getCause() instanceof RiotException));
        }
        catch (TimeoutException e) {
            Assert.fail((String)"Expected an error but a timeout occurred indicating the consumer deadlocked");
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Expected an execution error but an interrupt occurred");
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void streamed_state_bad_04() {
        final PipedRDFIterator iter = new PipedRDFIterator();
        final PipedTriplesStream stream = new PipedTriplesStream(iter);
        Runnable producer = new Runnable(){

            @Override
            public void run() {
                try {
                    stream.start();
                    throw new RuntimeException("die");
                }
                catch (Throwable throwable) {
                    stream.finish();
                    throw throwable;
                }
            }
        };
        Future<?> f = executor.submit(producer);
        try {
            f.get(3L, TimeUnit.SECONDS);
            Assert.fail((String)"Expected an error");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((e.getCause() != null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)(e.getCause() instanceof RuntimeException));
        }
        catch (TimeoutException e) {
            Assert.fail((String)"Unexpected timeout");
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Unexpected interrupt");
        }
        Runnable consumer = new Runnable(){

            @Override
            public void run() {
                iter.hasNext();
            }
        };
        f = executor.submit(consumer);
        try {
            Object result = f.get(3L, TimeUnit.SECONDS);
            Assert.assertNull(result);
        }
        catch (ExecutionException e) {
            Assert.fail((String)"An error was not expected");
        }
        catch (TimeoutException e) {
            Assert.fail((String)"A timeout occurred indicating the consumer deadlocked");
        }
        catch (InterruptedException e) {
            Assert.fail((String)"An interrupt occurred");
        }
    }

    @Test
    public void streamed_state_bad_05() {
        final PipedRDFIterator iter = new PipedRDFIterator(1, false, 1000, 3);
        PipedTriplesStream stream = new PipedTriplesStream(iter);
        Runnable producer = new Runnable(){

            @Override
            public void run() {
                throw new RuntimeException("die");
            }
        };
        Future<?> f = executor.submit(producer);
        try {
            f.get(3L, TimeUnit.SECONDS);
            Assert.fail((String)"Expected an error");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((e.getCause() != null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)(e.getCause() instanceof RuntimeException));
        }
        catch (TimeoutException e) {
            Assert.fail((String)"Unexpected timeout");
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Unexpected interrupt");
        }
        Runnable consumer = new Runnable(){

            @Override
            public void run() {
                iter.hasNext();
            }
        };
        f = executor.submit(consumer);
        try {
            Object result = f.get(10L, TimeUnit.SECONDS);
            Assert.fail((String)"An error was expected");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((e.getCause() != null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)(e.getCause() instanceof RiotException));
        }
        catch (TimeoutException e) {
            Assert.fail((String)"A timeout occurred indicating the consumer deadlocked");
        }
        catch (InterruptedException e) {
            Assert.fail((String)"An interrupt occurred");
        }
    }

    @Test
    public void streamed_iterator_usage_01() {
        PipedRDFIterator iter = new PipedRDFIterator();
        PipedTriplesStream stream = new PipedTriplesStream(iter);
        stream.start();
        stream.finish();
        Assert.assertFalse((boolean)iter.hasNext());
        Assert.assertFalse((boolean)iter.hasNext());
    }

    @Test(expected=RiotException.class)
    public void streamed_iterator_usage_02() {
        PipedRDFIterator iter = new PipedRDFIterator();
        PipedTriplesStream stream = new PipedTriplesStream(iter);
        stream.start();
        stream.finish();
        Assert.assertFalse((boolean)iter.hasNext());
        iter.close();
        iter.hasNext();
    }
}

