/*
 * Decompiled with CFR 0.152.
 */
package org.asynchttpclient.reactivestreams;

import io.netty.channel.Channel;
import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
import org.asynchttpclient.reactivestreams.ReactiveStreamsTest;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class FailingReactiveStreamsTest
extends AbstractBasicTest {
    @Test
    public void testRetryingOnFailingStream() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient();){
            CountDownLatch streamStarted = new CountDownLatch(1);
            CountDownLatch streamOnHold = new CountDownLatch(1);
            CountDownLatch replayingRequest = new CountDownLatch(1);
            final AtomicReference<Object> publisherRef = new AtomicReference<Object>(null);
            ((BoundRequestBuilder)client.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new ReplayedSimpleAsyncHandler(replayingRequest, new BlockedStreamSubscriber(streamStarted, streamOnHold)){

                @Override
                public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
                    if (!(publisher instanceof StreamedResponsePublisher)) {
                        throw new IllegalStateException(String.format("publisher %s is expected to be an instance of %s", publisher, StreamedResponsePublisher.class));
                    }
                    if (!publisherRef.compareAndSet(null, (StreamedResponsePublisher)publisher)) {
                        return AsyncHandler.State.ABORT;
                    }
                    return super.onStream(publisher);
                }
            });
            streamStarted.await();
            Assert.assertTrue((publisherRef.get() != null ? 1 : 0) != 0, (String)"Expected a not null publisher.");
            StreamedResponsePublisher publisher = publisherRef.get();
            CountDownLatch channelClosed = new CountDownLatch(1);
            this.getChannel(publisher).close().addListener(future -> channelClosed.countDown());
            streamOnHold.countDown();
            channelClosed.await();
            replayingRequest.await();
            Assert.assertTrue((boolean)true);
        }
    }

    private Channel getChannel(StreamedResponsePublisher publisher) throws Exception {
        Field field = publisher.getClass().getDeclaredField("channel");
        field.setAccessible(true);
        return (Channel)field.get(publisher);
    }

    private static class ReplayedSimpleAsyncHandler
    extends ReactiveStreamsTest.SimpleStreamedAsyncHandler {
        private final CountDownLatch replaying;

        ReplayedSimpleAsyncHandler(CountDownLatch replaying, ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> subscriber) {
            super(subscriber);
            this.replaying = replaying;
        }

        public void onRetry() {
            this.replaying.countDown();
        }
    }

    private static class BlockedStreamSubscriber
    extends ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> {
        private static final Logger LOGGER = LoggerFactory.getLogger(BlockedStreamSubscriber.class);
        private final CountDownLatch streamStarted;
        private final CountDownLatch streamOnHold;

        BlockedStreamSubscriber(CountDownLatch streamStarted, CountDownLatch streamOnHold) {
            this.streamStarted = streamStarted;
            this.streamOnHold = streamOnHold;
        }

        @Override
        public void onNext(HttpResponseBodyPart t) {
            this.streamStarted.countDown();
            try {
                this.streamOnHold.await();
            }
            catch (InterruptedException e) {
                LOGGER.error("`streamOnHold` latch was interrupted", (Throwable)e);
            }
            super.onNext(t);
        }
    }
}

