/*
 * Decompiled with CFR 0.152.
 */
package com.navercorp.spring.data.jdbc.plus.sql.support.template;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.RowCountCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class JdbcReactiveTemplate {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Scheduler scheduler;
    private final int defaultQueueSize;
    private final long defaultBufferTimeout;

    public JdbcReactiveTemplate() {
        this(Schedulers.boundedElastic(), 100, 10000L);
    }

    public JdbcReactiveTemplate(Scheduler scheduler, int defaultQueueSize, long defaultBufferTimeout) {
        this.scheduler = scheduler;
        this.defaultQueueSize = defaultQueueSize;
        this.defaultBufferTimeout = defaultBufferTimeout;
    }

    private static <R> FluxItem<R> endItem() {
        return FluxItem.END_ITEM;
    }

    private static <R> FluxItem<R> errorItem(Exception e) {
        return FluxItem.errorInstance(e);
    }

    public <R> Flux<R> queryFlux(String sql, NamedParameterJdbcOperations jdbcOperations, SqlParameterSource params, RowMapper<R> rowMapper) {
        return this.queryFlux(sql, jdbcOperations, params, rowMapper, this.scheduler, this.defaultQueueSize, this.defaultBufferTimeout);
    }

    public <R> Flux<R> queryFlux(String sql, NamedParameterJdbcOperations jdbcOperations, SqlParameterSource params, RowMapper<R> rowMapper, Scheduler scheduler) {
        return this.queryFlux(sql, jdbcOperations, params, rowMapper, scheduler, this.defaultQueueSize, this.defaultBufferTimeout);
    }

    public <R> Flux<R> queryFlux(String sql, NamedParameterJdbcOperations jdbcOperations, SqlParameterSource params, RowMapper<R> rowMapper, int queueSize, long bufferTimeout) {
        return this.queryFlux(sql, jdbcOperations, params, rowMapper, this.scheduler, queueSize, bufferTimeout);
    }

    public <R> Flux<R> queryFlux(String sql, NamedParameterJdbcOperations jdbcOperations, SqlParameterSource params, final RowMapper<R> rowMapper, Scheduler scheduler, int queueSize, final long bufferTimeout) {
        final LinkedBlockingQueue queue = new LinkedBlockingQueue(queueSize);
        final AtomicBoolean isClosed = new AtomicBoolean(false);
        return this.generateFluxFromQueue(queue, bufferTimeout, isClosed).doOnCancel(() -> isClosed.set(true)).doFirst(() -> scheduler.schedule(() -> {
            try {
                jdbcOperations.query(sql, params, (RowCallbackHandler)new RowCountCallbackHandler(){

                    public void processRow(ResultSet resultSet, int rowNum) throws SQLException {
                        if (isClosed.get()) {
                            throw new DataAccessResourceFailureException("Connection closed by client.");
                        }
                        FluxItem item = new FluxItem(rowMapper.mapRow(resultSet, rowNum));
                        JdbcReactiveTemplate.this.insertToBlockingQueue(queue, item, isClosed, bufferTimeout);
                    }
                });
            }
            catch (Exception e) {
                isClosed.set(true);
                this.insertToBlockingQueue(queue, JdbcReactiveTemplate.errorItem(e), isClosed, bufferTimeout);
                this.logger.error("Failed to generate flux.", (Throwable)e);
                throw e;
            }
            if (!isClosed.get()) {
                this.insertToBlockingQueue(queue, JdbcReactiveTemplate.endItem(), isClosed, bufferTimeout);
            }
        }));
    }

    protected void handleError(Exception e) throws Exception {
        if (e == null) {
            return;
        }
        this.logger.error("Exception occured while reading flux", (Throwable)e);
    }

    private <R> void insertToBlockingQueue(BlockingQueue<FluxItem<R>> queue, FluxItem<R> item, AtomicBoolean isClosed, long bufferTimeout) {
        try {
            if (!queue.offer(item, bufferTimeout, TimeUnit.MILLISECONDS)) {
                isClosed.set(true);
                throw new TimeoutException("Cannot insert into blocking queue.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.error("InterruptedException occurred", (Throwable)e);
        }
        catch (TimeoutException e) {
            throw new DataAccessResourceFailureException("Timeout to get item from queue.", (Throwable)e);
        }
    }

    private <R> Flux<R> generateFluxFromQueue(BlockingQueue<FluxItem<R>> queue, long bufferTimeout, AtomicBoolean isClosed) {
        return Flux.generate(sink -> {
            if (isClosed.get()) {
                sink.error((Throwable)new DataAccessResourceFailureException("Database Connection is closed."));
            }
            try {
                FluxItem row = (FluxItem)queue.poll(bufferTimeout, TimeUnit.MILLISECONDS);
                if (row == null) {
                    sink.error((Throwable)new DataAccessResourceFailureException("Cannot take element from blocking queue."));
                    return;
                }
                if (row.isError()) {
                    try {
                        this.handleError(row.getError());
                    }
                    catch (Exception e) {
                        sink.error((Throwable)e);
                    }
                }
                if (row.isEnd()) {
                    sink.complete();
                    return;
                }
                sink.next(row.getItem());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.error("InterruptedException occurred", (Throwable)e);
            }
        });
    }

    private static class ErrorFluxItem<R>
    extends FluxItem<R> {
        Exception error;

        private ErrorFluxItem(Exception error) {
            super(null);
            this.error = error;
        }

        @Override
        protected boolean isError() {
            return true;
        }

        @Override
        protected boolean isEnd() {
            return true;
        }

        @Override
        public Exception getError() {
            return this.error;
        }
    }

    private static class EndOfFluxItem<R>
    extends FluxItem<R> {
        private EndOfFluxItem() {
            super(null);
        }

        @Override
        protected boolean isEnd() {
            return true;
        }
    }

    private static class FluxItem<R> {
        private static final FluxItem END_ITEM = new EndOfFluxItem();
        private final R item;

        private FluxItem(R item) {
            this.item = item;
        }

        protected boolean isEnd() {
            return false;
        }

        protected boolean isError() {
            return false;
        }

        private R getItem() {
            return this.item;
        }

        @Nullable
        protected Exception getError() {
            return null;
        }

        private static FluxItem errorInstance(Exception e) {
            return new ErrorFluxItem(e);
        }
    }
}

