/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import com.google.protobuf.Message;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.hive.llap.AsyncResponseHandler;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class AsyncResponseHandlerTest {
    private AsyncResponseHandler responseHandler;

    @Before
    public void setup() {
        AsyncPbRpcProxy.RequestManager requestManager = (AsyncPbRpcProxy.RequestManager)Mockito.mock(AsyncPbRpcProxy.RequestManager.class);
        this.responseHandler = new AsyncResponseHandler(requestManager);
        this.responseHandler.start();
    }

    @After
    public void teardown() {
        this.responseHandler.shutdownNow();
    }

    @Test
    public void testAck() throws InterruptedException {
        AsyncPbRpcProxy.ExecuteRequestCallback callback = (AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.mock(AsyncPbRpcProxy.ExecuteRequestCallback.class);
        Message returnMessage = (Message)Mockito.mock(Message.class);
        DummyAsyncRequest asyncRequest = this.createAsyncRequest(returnMessage, null, (AsyncPbRpcProxy.ExecuteRequestCallback<Message>)callback);
        this.responseHandler.addToAsyncResponseFutureQueue((AsyncPbRpcProxy.AsyncCallableRequest)asyncRequest);
        ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)0))).setResponse((Message)Matchers.any());
        ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)0))).indicateError((Throwable)Matchers.any());
        asyncRequest.finish();
        this.assertTrueEventually(() -> {
            ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).setResponse(returnMessage);
            ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)0))).indicateError((Throwable)Matchers.any());
        });
    }

    @Test
    public void testRemoteFail() throws InterruptedException {
        AsyncPbRpcProxy.ExecuteRequestCallback callback = (AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.mock(AsyncPbRpcProxy.ExecuteRequestCallback.class);
        Exception returnException = new Exception();
        DummyAsyncRequest asyncRequest = this.createAsyncRequest(null, returnException, (AsyncPbRpcProxy.ExecuteRequestCallback<Message>)callback);
        this.responseHandler.addToAsyncResponseFutureQueue((AsyncPbRpcProxy.AsyncCallableRequest)asyncRequest);
        ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)0))).setResponse((Message)Matchers.any());
        ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)0))).indicateError((Throwable)Matchers.any());
        asyncRequest.finish();
        this.assertTrueEventually(() -> {
            ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).indicateError((Throwable)returnException);
            ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)0))).setResponse((Message)Matchers.any());
        });
    }

    @Test
    public void testStress() throws InterruptedException {
        int i;
        int numCommunicators = 10;
        int totalCallbacks = 200000;
        AsyncPbRpcProxy.ExecuteRequestCallback[] callbacks = new AsyncPbRpcProxy.ExecuteRequestCallback[200000];
        final DummyAsyncRequest[] asyncRequests = new DummyAsyncRequest[200000];
        for (int i2 = 0; i2 < 200000; ++i2) {
            callbacks[i2] = (AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.mock(AsyncPbRpcProxy.ExecuteRequestCallback.class);
            asyncRequests[i2] = this.createAsyncRequest(null, null, (AsyncPbRpcProxy.ExecuteRequestCallback<Message>)callbacks[i2]);
        }
        Thread[] communicators = new Thread[10];
        for (int i3 = 0; i3 < 10; ++i3) {
            final int communicatorStart = i3 * 20000;
            final int communicatorEnd = (i3 + 1) * 20000;
            communicators[i3] = new Thread(new Runnable(){

                @Override
                public void run() {
                    for (int j = communicatorStart; j < communicatorEnd; ++j) {
                        AsyncResponseHandlerTest.this.responseHandler.addToAsyncResponseFutureQueue((AsyncPbRpcProxy.AsyncCallableRequest)asyncRequests[j]);
                    }
                }
            });
        }
        Thread ackerThread = new Thread(new Runnable(){

            @Override
            public void run() {
                int i;
                Random random = new Random();
                int[] ackOrder = new int[200000];
                for (i = 0; i < 200000; ++i) {
                    ackOrder[i] = i;
                }
                for (i = 0; i < 200000; ++i) {
                    int swapx = random.nextInt(200000);
                    int swapy = random.nextInt(200000);
                    int temp = ackOrder[swapx];
                    ackOrder[swapx] = ackOrder[swapy];
                    ackOrder[swapy] = temp;
                }
                for (i = 0; i < 200000; ++i) {
                    asyncRequests[i].finish();
                }
            }
        });
        for (i = 0; i < 10; ++i) {
            communicators[i].start();
        }
        ackerThread.start();
        for (i = 0; i < 10; ++i) {
            communicators[i].join();
        }
        ackerThread.join();
        this.assertTrueEventually(() -> {
            for (int i = 0; i < 200000; ++i) {
                ((AsyncPbRpcProxy.ExecuteRequestCallback)Mockito.verify((Object)callbacks[i], (VerificationMode)Mockito.times((int)1))).setResponse(null);
            }
        });
    }

    private DummyAsyncRequest createAsyncRequest(Message returnValue, Exception returnException, AsyncPbRpcProxy.ExecuteRequestCallback<Message> callback) {
        return new DummyAsyncRequest(returnValue, returnException, callback);
    }

    private void assertTrueEventually(AssertTask assertTask) throws InterruptedException {
        this.assertTrueEventually(assertTask, 100000);
    }

    private void assertTrueEventually(AssertTask assertTask, int timeoutMillis) throws InterruptedException {
        long endTime = System.currentTimeMillis() + (long)timeoutMillis;
        AssertionError assertionError = null;
        while (System.currentTimeMillis() < endTime) {
            try {
                assertTask.call();
                return;
            }
            catch (AssertionError e) {
                assertionError = e;
                Thread.sleep(50L);
            }
        }
        throw assertionError;
    }

    private static interface AssertTask {
        public void call() throws AssertionError;
    }

    private final class DummyAsyncRequest
    extends AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> {
        private volatile boolean isFinished;
        private Message returnValue;
        private Exception remoteException;

        protected DummyAsyncRequest(Message returnValue, Exception remoteException, AsyncPbRpcProxy.ExecuteRequestCallback<Message> callback) {
            super((LlapNodeId)Mockito.mock(LlapNodeId.class), (Message)Mockito.mock(Message.class), callback);
            this.isFinished = false;
            this.returnValue = returnValue;
            this.remoteException = remoteException;
        }

        public void callInternal() throws Exception {
        }

        public AsyncGet<Message, Exception> getResponseFuture() {
            AsyncGet<Message, Exception> asyncGet = new AsyncGet<Message, Exception>(){

                public Message get(long timeout, TimeUnit unit) throws Exception {
                    if (DummyAsyncRequest.this.remoteException != null) {
                        throw DummyAsyncRequest.this.remoteException;
                    }
                    return DummyAsyncRequest.this.returnValue;
                }

                public boolean isDone() {
                    return DummyAsyncRequest.this.isFinished;
                }
            };
            return asyncGet;
        }

        public void finish() {
            this.isFinished = true;
        }
    }
}

