/*
 * Decompiled with CFR 0.152.
 */
package com.gc.iotools.stream.os;

import com.gc.iotools.stream.base.ExecutionModel;
import com.gc.iotools.stream.base.ExecutorServiceFactory;
import com.gc.iotools.stream.utils.LogUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.input.CloseShieldInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class OutputStreamToInputStream<T>
extends PipedOutputStream {
    private static final int DEFAULT_TIMEOUT = 900000;
    private static int defaultPipeSize = 4096;
    private static final Logger LOG = LoggerFactory.getLogger(OutputStreamToInputStream.class);
    private boolean abort = false;
    private boolean closeCalled = false;
    private final ExecutorService executorService;
    private final InputStream inputstream;
    private final boolean joinOnClose;
    private Future<T> writingResult = null;

    public static void setDefaultPipeSize(int defaultPipeSize) {
        OutputStreamToInputStream.defaultPipeSize = defaultPipeSize;
    }

    public OutputStreamToInputStream() {
        this(true, ExecutionModel.THREAD_PER_INSTANCE);
    }

    public OutputStreamToInputStream(boolean startImmediately) {
        this(startImmediately, true, ExecutorServiceFactory.getExecutor(ExecutionModel.THREAD_PER_INSTANCE), defaultPipeSize);
    }

    public OutputStreamToInputStream(boolean startImmediately, boolean joinOnClose, ExecutorService executorService, int pipeBufferSize) {
        if (executorService == null) {
            throw new IllegalArgumentException("executor service can't be null");
        }
        String callerId = LogUtils.getCaller(this.getClass());
        MyPipedInputStream pipedIS = new MyPipedInputStream(pipeBufferSize);
        try {
            pipedIS.connect(this);
        }
        catch (IOException e) {
            throw new IllegalStateException("Error during pipe creaton", e);
        }
        this.joinOnClose = joinOnClose;
        this.inputstream = pipedIS;
        this.executorService = executorService;
        LOG.debug("invoked by[{}] queued for start.", (Object)callerId);
        if (startImmediately) {
            this.initializeIfNecessary();
        }
    }

    public OutputStreamToInputStream(boolean joinOnClose, ExecutionModel executionModel) {
        this(joinOnClose, ExecutorServiceFactory.getExecutor(executionModel));
    }

    public OutputStreamToInputStream(boolean joinOnClose, ExecutorService executorService) {
        this(joinOnClose, executorService, defaultPipeSize);
    }

    public OutputStreamToInputStream(boolean joinOnClose, ExecutorService executorService, int pipeBufferSize) {
        this(false, joinOnClose, executorService, pipeBufferSize);
    }

    protected void afterClose() throws IOException {
    }

    @Override
    public final void close() throws IOException {
        this.internalClose(this.joinOnClose, TimeUnit.MILLISECONDS, 900000L);
    }

    public final void close(long timeout, TimeUnit tu) throws IOException {
        this.internalClose(true, tu, timeout);
    }

    protected abstract T doRead(InputStream var1) throws Exception;

    @Override
    public final void flush() throws IOException {
        this.initializeIfNecessary();
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            super.flush();
        }
    }

    public final T getResult() throws InterruptedException, ExecutionException {
        this.initializeIfNecessary();
        if (!this.closeCalled) {
            throw new IllegalStateException("Method close() must be called before getResults");
        }
        return this.writingResult.get();
    }

    @Deprecated
    public final T getResults() throws InterruptedException, ExecutionException {
        return this.getResult();
    }

    private void initializeIfNecessary() {
        if (this.writingResult == null) {
            DataConsumer executingProcess = new DataConsumer();
            this.writingResult = this.executorService.submit(executingProcess);
        }
    }

    private void internalClose(boolean join, TimeUnit timeUnit, long timeout) throws IOException {
        if (!this.closeCalled) {
            this.initializeIfNecessary();
            this.closeCalled = true;
            super.close();
            if (join) {
                try {
                    this.writingResult.get(timeout, timeUnit);
                }
                catch (ExecutionException e) {
                    IOException e1 = new IOException("The doRead() threw exception. Use getCause() for details.");
                    e1.initCause(e.getCause());
                    throw e1;
                }
                catch (InterruptedException e) {
                    IOException e1 = new IOException("Waiting of the thread has been interrupted");
                    e1.initCause(e);
                    throw e1;
                }
                catch (TimeoutException e) {
                    if (!this.writingResult.isDone()) {
                        this.writingResult.cancel(true);
                    }
                    IOException e1 = new IOException("Waiting for the internal thread to finish took more than [" + timeout + "] " + (Object)((Object)timeUnit));
                    e1.initCause(e);
                    throw e1;
                }
            }
            this.afterClose();
        }
    }

    @Override
    public final void write(byte[] bytes) throws IOException {
        this.initializeIfNecessary();
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            super.write(bytes);
        }
    }

    @Override
    public final void write(byte[] bytes, int offset, int length) throws IOException {
        this.initializeIfNecessary();
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            super.write(bytes, offset, length);
        }
    }

    @Override
    public final void write(int bytetowr) throws IOException {
        this.initializeIfNecessary();
        if (this.abort) {
            this.internalClose(true, TimeUnit.SECONDS, 1L);
        } else {
            super.write(bytetowr);
        }
    }

    private final class MyPipedInputStream
    extends PipedInputStream {
        MyPipedInputStream(int bufferSize) {
            this.buffer = new byte[bufferSize];
        }
    }

    private final class DataConsumer
    implements Callable<T> {
        private DataConsumer() {
        }

        @Override
        public synchronized T call() throws Exception {
            Object processResult;
            try {
                CloseShieldInputStream istream = new CloseShieldInputStream(OutputStreamToInputStream.this.inputstream);
                processResult = OutputStreamToInputStream.this.doRead((InputStream)istream);
            }
            catch (Exception e) {
                OutputStreamToInputStream.this.abort = true;
                throw e;
            }
            finally {
                this.emptyInputStream();
                OutputStreamToInputStream.this.inputstream.close();
            }
            return processResult;
        }

        private void emptyInputStream() {
            try {
                byte[] buffer = new byte[8192];
                while (OutputStreamToInputStream.this.inputstream.read(buffer) >= 0) {
                }
            }
            catch (IOException e) {
                if (e.getMessage() != null && e.getMessage().indexOf("closed") > 0) {
                    LOG.debug("Stream already closed");
                } else {
                    LOG.error("IOException while empty InputStream a thread can be locked", (Throwable)e);
                }
            }
            catch (Throwable e) {
                LOG.error("IOException while empty InputStream a thread can be locked", e);
            }
        }
    }
}

