/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncResponseHandler {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncResponseHandler.class);
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final AsyncPbRpcProxy.RequestManager requestManager;
    private final ExecutorService responseWaitingService = Executors.newSingleThreadExecutor();
    private final LinkedBlockingDeque<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> incomingResponseFutures = new LinkedBlockingDeque();

    public AsyncResponseHandler(AsyncPbRpcProxy.RequestManager requestManager) {
        this.requestManager = requestManager;
    }

    public void start() {
        this.responseWaitingService.submit(new AsyncResponseHandlerRunnable());
    }

    public void addToAsyncResponseFutureQueue(AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> request) {
        this.incomingResponseFutures.add(request);
    }

    public void shutdownNow() {
        this.isShutdown.set(true);
        this.responseWaitingService.shutdownNow();
    }

    private final class AsyncResponseHandlerRunnable
    implements Runnable {
        private final List<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> responseFuturesQueue = new ArrayList<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>>();

        private AsyncResponseHandlerRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!AsyncResponseHandler.this.isShutdown.get()) {
                AsyncPbRpcProxy.AsyncCallableRequest request;
                try {
                    if (this.responseFuturesQueue.isEmpty()) {
                        AsyncPbRpcProxy.AsyncCallableRequest request2 = (AsyncPbRpcProxy.AsyncCallableRequest)AsyncResponseHandler.this.incomingResponseFutures.take();
                        this.responseFuturesQueue.add(request2);
                    }
                }
                catch (InterruptedException e) {
                    LOG.warn("Async response handler was interrupted", (Throwable)e);
                }
                Iterator<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> iterator = this.responseFuturesQueue.iterator();
                while (iterator.hasNext()) {
                    request = iterator.next();
                    AsyncGet<Message, Exception> responseFuture = request.getResponseFuture();
                    if (responseFuture == null || !responseFuture.isDone()) continue;
                    try {
                        iterator.remove();
                        LlapNodeId nodeId = request.getNodeId();
                        try {
                            Message remoteValue = (Message)responseFuture.get(-1L, TimeUnit.MILLISECONDS);
                            if (remoteValue instanceof Throwable) {
                                request.getCallback().indicateError((Throwable)remoteValue);
                                continue;
                            }
                            request.getCallback().setResponse(remoteValue);
                        }
                        catch (Exception e) {
                            request.getCallback().indicateError(e);
                        }
                        finally {
                            AsyncResponseHandler.this.requestManager.requestFinished(nodeId);
                        }
                    }
                    catch (Throwable e) {
                        LOG.warn("ResponseDispatcher caught", e);
                    }
                }
                while (!AsyncResponseHandler.this.incomingResponseFutures.isEmpty()) {
                    request = (AsyncPbRpcProxy.AsyncCallableRequest)AsyncResponseHandler.this.incomingResponseFutures.poll();
                    this.responseFuturesQueue.add(request);
                }
            }
            LOG.info("Async response handler exiting");
        }
    }
}

