/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordination;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.ByteArrayInputStream;
import java.net.URL;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.easymock.EasyMock;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.Test;

public class ChangeRequestHttpSyncerTest {
    @Test(timeout=60000L)
    public void testSimple() throws Exception {
        ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
        TypeReference<ChangeRequestsSnapshot<String>> typeRef = new TypeReference<ChangeRequestsSnapshot<String>>(){};
        TestHttpClient httpClient = new TestHttpClient((List<ListenableFuture>)ImmutableList.of((Object)Futures.immediateFuture((Object)new ByteArrayInputStream(jsonMapper.writerWithType((TypeReference)typeRef).writeValueAsBytes((Object)new ChangeRequestsSnapshot(false, null, ChangeRequestHistory.Counter.ZERO, (List)ImmutableList.of((Object)"s1"))))), (Object)Futures.immediateFuture((Object)new ByteArrayInputStream(jsonMapper.writerWithType((TypeReference)typeRef).writeValueAsBytes((Object)new ChangeRequestsSnapshot(false, null, ChangeRequestHistory.Counter.ZERO, (List)ImmutableList.of((Object)"s2"))))), (Object)Futures.immediateFuture((Object)new ByteArrayInputStream(jsonMapper.writerWithType((TypeReference)typeRef).writeValueAsBytes((Object)new ChangeRequestsSnapshot(true, "reset the counter", ChangeRequestHistory.Counter.ZERO, (List)ImmutableList.of())))), (Object)Futures.immediateFuture((Object)new ByteArrayInputStream(jsonMapper.writerWithType((TypeReference)typeRef).writeValueAsBytes((Object)new ChangeRequestsSnapshot(false, null, ChangeRequestHistory.Counter.ZERO, (List)ImmutableList.of((Object)"s3"))))), (Object)Futures.immediateFuture((Object)new ByteArrayInputStream(jsonMapper.writerWithType((TypeReference)typeRef).writeValueAsBytes((Object)new ChangeRequestsSnapshot(false, null, ChangeRequestHistory.Counter.ZERO, (List)ImmutableList.of((Object)"s4")))))));
        ChangeRequestHttpSyncer.Listener listener = (ChangeRequestHttpSyncer.Listener)EasyMock.mock(ChangeRequestHttpSyncer.Listener.class);
        listener.fullSync((List)ImmutableList.of((Object)"s1"));
        listener.deltaSync((List)ImmutableList.of((Object)"s2"));
        listener.fullSync((List)ImmutableList.of((Object)"s3"));
        listener.deltaSync((List)ImmutableList.of((Object)"s4"));
        EasyMock.replay((Object[])new Object[]{listener});
        ChangeRequestHttpSyncer syncer = new ChangeRequestHttpSyncer(jsonMapper, (HttpClient)httpClient, Execs.scheduledSingleThreaded((String)"ChangeRequestHttpSyncerTest"), new URL("http://localhost:8080/"), "/xx", (TypeReference)typeRef, 50000L, 10000L, listener);
        syncer.start();
        while (httpClient.results.size() != 0) {
            Thread.sleep(100L);
        }
        syncer.stop();
        EasyMock.verify((Object[])new Object[]{listener});
    }

    private static class TestHttpClient
    implements HttpClient {
        BlockingQueue<ListenableFuture> results;
        AtomicInteger requestNum = new AtomicInteger(0);

        TestHttpClient(List<ListenableFuture> resultsList) {
            this.results = new LinkedBlockingQueue<ListenableFuture>();
            this.results.addAll(resultsList);
        }

        public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler) {
            throw new UnsupportedOperationException("Not Implemented.");
        }

        public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, Duration duration) {
            if (this.requestNum.getAndIncrement() == 0) {
                throw new RuntimeException("simulating couldn't send request to server for some reason.");
            }
            if (this.requestNum.get() == 2) {
                DefaultHttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
                httpResponse.setContent(ChannelBuffers.buffer((int)0));
                httpResponseHandler.handleResponse((HttpResponse)httpResponse, null);
                return Futures.immediateFailedFuture((Throwable)new RuntimeException("server error"));
            }
            DefaultHttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            httpResponse.setContent(ChannelBuffers.buffer((int)0));
            httpResponseHandler.handleResponse((HttpResponse)httpResponse, null);
            try {
                return this.results.take();
            }
            catch (InterruptedException ex) {
                throw new RE((Throwable)ex, "Interrupted.", new Object[0]);
            }
        }
    }
}

