/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AsyncContinuousPagingResult;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.ContinuousPagingOptions;
import com.datastax.driver.core.DefaultAsyncContinuousPagingResult;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.Message;
import com.datastax.driver.core.MultiResponseRequestHandler;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Requests;
import com.datastax.driver.core.Responses;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.internal.com_google_common.util.concurrent.Futures;
import com.datastax.internal.com_google_common.util.concurrent.ListenableFuture;
import com.datastax.internal.com_google_common.util.concurrent.SettableFuture;
import io.netty.channel.EventLoop;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ContinuousPagingQueue
implements MultiResponseRequestHandler.Callback {
    private static final Logger logger = LoggerFactory.getLogger(ContinuousPagingQueue.class);
    private final Message.Request request;
    private final ContinuousPagingOptions continuousPagingOptions;
    private final ReentrantLock lock;
    private final Queue<Object> queue;
    private SettableFuture<AsyncContinuousPagingResult> pendingResult;
    private volatile MultiResponseRequestHandler handler;
    private volatile long timeoutMillis;
    private volatile int state;
    private static final int STATE_FINISHED = -1;
    private static final int STATE_FAILED = -2;
    private volatile Connection connection;
    private volatile ColumnDefinitions columnDefinitions;
    private volatile int numPagesRequested;

    ContinuousPagingQueue(Message.Request request, ProtocolVersion protocolVersion, SettableFuture<AsyncContinuousPagingResult> firstResult) {
        this.request = request;
        this.continuousPagingOptions = request.options().continuousPagingOptions;
        this.lock = new ReentrantLock();
        this.pendingResult = firstResult;
        this.queue = new ConcurrentLinkedQueue<Object>();
        this.state = 1;
        this.numPagesRequested = protocolVersion.compareTo(ProtocolVersion.DSE_V2) >= 0 ? this.continuousPagingOptions.getMaxEnqueuedPages() : 0;
    }

    @Override
    public void register(MultiResponseRequestHandler handler) {
        this.handler = handler;
        this.timeoutMillis = handler.timeoutMillis;
    }

    @Override
    public Message.Request getRequest() {
        return this.request;
    }

    @Override
    public Message.Request getCancelRequest(int streamId) {
        return Requests.ReviseRequest.continuousPagingCancel(streamId);
    }

    @Override
    public Message.Request getBackpressureRequest(int streamId, int nextPages) {
        assert (this.numPagesRequested > 0);
        return Requests.ReviseRequest.continuousPagingBackpressure(streamId, nextPages);
    }

    @Override
    public void onResponse(Connection connection, Message.Response response, ExecutionInfo info, Statement statement) {
        assert (connection.channel.eventLoop().inEventLoop());
        if (this.state < 0) {
            logger.debug("Discarding {} response because the request has already completed", (Object)response.type);
            return;
        }
        this.connection = connection;
        if (response.type == Message.Response.Type.RESULT && ((Responses.Result)response).kind == Responses.Result.Kind.ROWS) {
            Responses.Result.Rows rows = (Responses.Result.Rows)response;
            if (rows.metadata.continuousPage.seqNo != this.state) {
                this.fail(new DriverInternalError(String.format("Received page number %d but was expecting %d", rows.metadata.continuousPage.seqNo, this.state)), false);
            } else {
                if (rows.metadata.continuousPage.last) {
                    logger.debug("Received last page ({})", (Object)rows.metadata.continuousPage.seqNo);
                    this.state = -1;
                    connection.channel.config().setAutoRead(true);
                    this.handler.release();
                } else {
                    logger.debug("Received page {}", (Object)rows.metadata.continuousPage.seqNo);
                    ++this.state;
                }
                this.enqueueOrCompletePending(this.newResult(rows, info));
            }
        } else if (response.type == Message.Response.Type.ERROR) {
            this.fail(((Responses.Error)response).asException(connection.endPoint), true);
        } else {
            this.fail(new DriverInternalError("Unexpected response " + (Object)((Object)response.type)), false);
        }
    }

    @Override
    public void onException(final Connection connection, final Exception exception, final boolean fromServer) {
        if (connection == null) {
            logger.debug("Fail {} ({})", (Object)exception.getClass().getSimpleName(), (Object)exception.getMessage());
            this.enqueueOrCompletePending(exception);
        } else {
            EventLoop eventLoop = connection.channel.eventLoop();
            if (!eventLoop.inEventLoop()) {
                eventLoop.execute(new Runnable(){

                    @Override
                    public void run() {
                        ContinuousPagingQueue.this.onException(connection, exception, fromServer);
                    }
                });
            } else if (this.state > 0) {
                this.fail(exception, fromServer);
            }
        }
    }

    private void fail(Exception exception, boolean fromServer) {
        logger.debug("Got failure {} ({})", (Object)exception.getClass().getSimpleName(), (Object)exception.getMessage());
        if (this.state >= 0) {
            if (fromServer) {
                this.handler.release();
            } else {
                this.handler.cancel();
            }
            if (this.connection != null) {
                this.connection.channel.config().setAutoRead(true);
            }
            this.enqueueOrCompletePending(exception);
            this.state = -2;
        }
    }

    private void enqueueOrCompletePending(Object pageOrError) {
        this.lock.lock();
        try {
            if (this.pendingResult != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Client was waiting on empty queue, completing with {}", (Object)this.asDebugString(pageOrError));
                }
                SettableFuture<AsyncContinuousPagingResult> tmp = this.pendingResult;
                this.pendingResult = null;
                this.complete(tmp, pageOrError);
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Enqueuing {}", (Object)this.asDebugString(pageOrError));
                }
                this.enqueue(pageOrError);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ListenableFuture<AsyncContinuousPagingResult> dequeueOrCreatePending() {
        this.lock.lock();
        try {
            assert (this.pendingResult == null);
            Object head = this.dequeue();
            this.maybeRequestMore();
            if (head != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Client queries on non-empty queue, returning immediate future of {}", (Object)this.asDebugString(head));
                }
                ListenableFuture<AsyncContinuousPagingResult> listenableFuture = this.immediateFuture(head);
                return listenableFuture;
            }
            if (this.state == -2) {
                logger.debug("Client queries on failed empty queue, returning failed future");
                ListenableFuture<AsyncContinuousPagingResult> listenableFuture = this.immediateFuture(new IllegalStateException("Can't get more results because the continuous query has failed already. Most likely this is because the query was cancelled"));
                return listenableFuture;
            }
            logger.debug("Client queries on empty queue, installing future");
            final SettableFuture<AsyncContinuousPagingResult> future = SettableFuture.create();
            future.addListener(new Runnable(){

                @Override
                public void run() {
                    if (future.isCancelled()) {
                        ContinuousPagingQueue.this.cancel();
                    }
                }
            }, GuavaCompatibility.INSTANCE.sameThreadExecutor());
            this.pendingResult = future;
            this.startTimeout();
            SettableFuture<AsyncContinuousPagingResult> settableFuture = future;
            return settableFuture;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void enqueue(Object pageOrError) {
        assert (this.lock.isHeldByCurrentThread());
        this.queue.add(pageOrError);
        if (this.numPagesRequested == 0 && this.queue.size() == this.continuousPagingOptions.getMaxEnqueuedPages() && this.state > 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("Exceeded {} queued response pages, disabling auto-read", (Object)this.queue.size());
            }
            this.connection.channel.config().setAutoRead(false);
        }
    }

    private Object dequeue() {
        assert (this.lock.isHeldByCurrentThread());
        Object head = this.queue.poll();
        if (this.numPagesRequested == 0 && head != null && this.queue.size() == this.continuousPagingOptions.getMaxEnqueuedPages() - 1) {
            if (logger.isDebugEnabled()) {
                logger.debug("Back to {} queued response pages, re-enabling auto-read", (Object)this.queue.size());
            }
            this.connection.channel.config().setAutoRead(true);
        }
        return head;
    }

    private void maybeRequestMore() {
        if (this.state < 0 || this.numPagesRequested == 0) {
            return;
        }
        assert (this.lock.isHeldByCurrentThread());
        int numPagesReceived = this.state - 1;
        int maxEnqueuedPages = this.continuousPagingOptions.getMaxEnqueuedPages();
        int numPagesFittingInQueue = maxEnqueuedPages - this.queue.size() - (this.numPagesRequested - numPagesReceived);
        if (numPagesFittingInQueue >= maxEnqueuedPages / 2) {
            if (this.continuousPagingOptions.getMaxPages() > 0 && this.numPagesRequested >= this.continuousPagingOptions.getMaxPages()) {
                return;
            }
            this.numPagesRequested += numPagesFittingInQueue;
            logger.debug("Requesting pages ({}/{})", (Object)this.numPagesRequested, (Object)numPagesReceived);
            this.handler.requestMore(numPagesFittingInQueue);
        }
    }

    private void complete(SettableFuture<AsyncContinuousPagingResult> future, Object pageOrError) {
        if (pageOrError instanceof AsyncContinuousPagingResult) {
            future.set((AsyncContinuousPagingResult)pageOrError);
        } else {
            future.setException((Throwable)pageOrError);
        }
    }

    private ListenableFuture<AsyncContinuousPagingResult> immediateFuture(Object pageOrError) {
        return pageOrError instanceof AsyncContinuousPagingResult ? Futures.immediateFuture((AsyncContinuousPagingResult)pageOrError) : Futures.immediateFailedFuture((Throwable)pageOrError);
    }

    private AsyncContinuousPagingResult newResult(Responses.Result.Rows rows, ExecutionInfo info) {
        if (this.columnDefinitions == null) {
            this.columnDefinitions = rows.metadata.columns;
        }
        Token.Factory tokenFactory = this.handler.manager.cluster.getMetadata().tokenFactory();
        ProtocolVersion protocolVersion = this.handler.manager.cluster.manager.protocolVersion();
        CodecRegistry codecRegistry = this.handler.manager.configuration().getCodecRegistry();
        info = info.with(null, rows.warnings, rows.metadata.pagingState, this.handler.statement, protocolVersion, codecRegistry);
        return new DefaultAsyncContinuousPagingResult(rows.data, this.columnDefinitions, rows.metadata.continuousPage.seqNo, rows.metadata.continuousPage.last, info, tokenFactory, protocolVersion, this);
    }

    void cancel() {
        if (logger.isTraceEnabled()) {
            logger.trace("Cancelling cont. paging session with state {} and connection {}", (Object)this.state, (Object)this.connection);
        }
        if (this.state >= 0) {
            this.state = -2;
            this.handler.cancel();
            this.cancelPendingResult();
            if (this.connection != null) {
                this.connection.channel.config().setAutoRead(true);
            }
        }
    }

    private void cancelPendingResult() {
        this.lock.lock();
        try {
            if (this.pendingResult != null) {
                this.pendingResult.cancel(true);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void startTimeout() {
        assert (this.connection != null);
        final int expectedPage = this.state;
        if (expectedPage < 0) {
            return;
        }
        assert (expectedPage > 1) : expectedPage;
        if (this.timeoutMillis > 0L) {
            this.connection.channel.eventLoop().schedule(new Runnable(){

                @Override
                public void run() {
                    if (ContinuousPagingQueue.this.state == expectedPage) {
                        ContinuousPagingQueue.this.fail(new OperationTimedOutException(((ContinuousPagingQueue)ContinuousPagingQueue.this).connection.endPoint, String.format("Timed out waiting for page %d", expectedPage)), false);
                    } else {
                        logger.trace("Timeout fired for page {} but query already at state {}, skipping", (Object)expectedPage, (Object)ContinuousPagingQueue.this.state);
                    }
                }
            }, this.timeoutMillis, TimeUnit.MILLISECONDS);
        }
    }

    private String asDebugString(Object pageOrError) {
        return pageOrError instanceof AsyncContinuousPagingResult ? "page " + ((AsyncContinuousPagingResult)pageOrError).pageNumber() : ((Exception)pageOrError).getClass().getSimpleName();
    }
}

