/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.cassandra.impl;

import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CassandraRowStreamImpl
implements CassandraRowStream {
    private static final Object DONE = new Object();
    private final ContextInternal context;
    private final Queue internalQueue;
    private Handler<Row> rowHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private ExecutionInfo executionInfo;
    private ColumnDefinitions columnDefinitions;
    private final Lock lock = new ReentrantLock();
    private final EventExecutor executor = new EventExecutor(){

        public boolean inThread() {
            return true;
        }

        public void execute(Runnable command) {
            CassandraRowStreamImpl.this.lock.lock();
            try {
                command.run();
            }
            finally {
                CassandraRowStreamImpl.this.lock.unlock();
            }
        }
    };

    public CassandraRowStreamImpl(Context context) {
        Queue queue = new Queue((ContextInternal)context);
        queue.pause();
        this.context = (ContextInternal)context;
        this.internalQueue = queue;
    }

    void init(ResultSet resultSet) {
        this.executionInfo = resultSet.getExecutionInfo();
        this.columnDefinitions = resultSet.getColumnDefinitions();
        this.internalQueue.init(resultSet);
    }

    @Override
    public synchronized CassandraRowStream exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CassandraRowStream handler(Handler<Row> handler) {
        CassandraRowStreamImpl cassandraRowStreamImpl = this;
        synchronized (cassandraRowStreamImpl) {
            this.rowHandler = handler;
        }
        if (handler == null) {
            this.pause();
        } else {
            this.resume();
        }
        return this;
    }

    @Override
    public synchronized CassandraRowStream endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    @Override
    public CassandraRowStream pause() {
        this.internalQueue.pause();
        return this;
    }

    @Override
    public CassandraRowStream resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public CassandraRowStream fetch(long l) {
        this.internalQueue.fetch(l);
        return this;
    }

    @Override
    public ExecutionInfo executionInfo() {
        return this.executionInfo;
    }

    @Override
    public ColumnDefinitions columnDefinitions() {
        return this.columnDefinitions;
    }

    private static class StreamItem {
        public final ExecutionInfo executionInfo;
        public final ColumnDefinitions columnDefinitions;
        public final Row row;

        StreamItem(ResultSet resultSet, Row row) {
            this.executionInfo = resultSet.getExecutionInfo();
            this.columnDefinitions = resultSet.getColumnDefinitions();
            this.row = row;
        }
    }

    private class Queue
    extends InboundMessageQueue<Object> {
        private ResultSet resultSet;
        private boolean paused;

        public Queue(ContextInternal context) {
            super(CassandraRowStreamImpl.this.executor, context.executor());
        }

        void init(ResultSet rs) {
            this.transfer(rs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void transfer(ResultSet rs) {
            Iterable<Row> page = rs.currentPage();
            CassandraRowStreamImpl.this.lock.lock();
            try {
                for (Row row : page) {
                    this.write(new StreamItem(rs, row));
                }
            }
            finally {
                CassandraRowStreamImpl.this.lock.unlock();
            }
            if (rs.hasMorePages()) {
                Future<ResultSet> next = rs.fetchNextPage();
                next.onComplete((res, err) -> {
                    if (err == null) {
                        this.resultSet = res;
                        if (!this.paused) {
                            this.transfer((ResultSet)res);
                        }
                    } else {
                        this.write(err);
                    }
                });
            } else {
                this.write(DONE);
            }
        }

        protected void handleResume() {
            this.paused = false;
            ResultSet rs = this.resultSet;
            this.resultSet = null;
            if (rs != null) {
                this.transfer(rs);
            }
        }

        protected void handlePause() {
            this.paused = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void handleMessage(Object msg) {
            if (msg == DONE) {
                Handler<Void> handler;
                CassandraRowStreamImpl cassandraRowStreamImpl = CassandraRowStreamImpl.this;
                synchronized (cassandraRowStreamImpl) {
                    handler = CassandraRowStreamImpl.this.endHandler;
                }
                if (handler != null) {
                    CassandraRowStreamImpl.this.context.emit(null, handler);
                }
            } else if (msg instanceof StreamItem) {
                Handler<Row> handler;
                StreamItem item = (StreamItem)msg;
                CassandraRowStreamImpl cassandraRowStreamImpl = CassandraRowStreamImpl.this;
                synchronized (cassandraRowStreamImpl) {
                    handler = CassandraRowStreamImpl.this.rowHandler;
                }
                CassandraRowStreamImpl.this.executionInfo = item.executionInfo;
                CassandraRowStreamImpl.this.columnDefinitions = item.columnDefinitions;
                if (handler != null) {
                    CassandraRowStreamImpl.this.context.emit((Object)item.row, handler);
                }
            } else if (msg instanceof Throwable) {
                Handler<Throwable> handler;
                Throwable err = (Throwable)msg;
                CassandraRowStreamImpl cassandraRowStreamImpl = CassandraRowStreamImpl.this;
                synchronized (cassandraRowStreamImpl) {
                    handler = CassandraRowStreamImpl.this.exceptionHandler;
                }
                if (handler != null) {
                    CassandraRowStreamImpl.this.context.emit((Object)err, handler);
                }
            }
        }
    }
}

