/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cxf.jaxrs.reactivestreams.server;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.TimeoutHandler;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.jaxrs.ext.StreamingResponse;
import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
import org.reactivestreams.Subscription;

public class StreamingAsyncSubscriber<T>
extends AbstractSubscriber<T> {
    private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
    private String openTag;
    private String closeTag;
    private String separator;
    private long pollTimeout;
    private long asyncTimeout;
    private volatile boolean completed;
    private AtomicBoolean firstWriteDone = new AtomicBoolean();

    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) {
        this(ar, openTag, closeTag, sep, 1000L);
    }

    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, long pollTimeout) {
        this(ar, openTag, closeTag, sep, pollTimeout, 0L);
    }

    public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, long pollTimeout, long asyncTimeout) {
        super(ar);
        this.openTag = openTag;
        this.closeTag = closeTag;
        this.separator = sep;
        this.pollTimeout = pollTimeout;
        this.asyncTimeout = 0L;
        if (asyncTimeout > 0L) {
            ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS);
            ar.setTimeoutHandler((TimeoutHandler)new TimeoutHandlerImpl());
        }
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        if (this.asyncTimeout == 0L) {
            this.resumeAsyncResponse();
        }
        super.onSubscribe(subscription);
    }

    private void resumeAsyncResponse() {
        super.resume(new StreamingResponseImpl());
    }

    @Override
    public void onComplete() {
        this.completed = true;
    }

    @Override
    public void onNext(T bean) {
        if (this.asyncTimeout > 0L && this.getAsyncResponse().isSuspended()) {
            this.resumeAsyncResponse();
        }
        this.queue.add(bean);
        super.requestNext();
    }

    public class TimeoutHandlerImpl
    implements TimeoutHandler {
        public void handleTimeout(AsyncResponse asyncResponse) {
            if (StreamingAsyncSubscriber.this.queue.isEmpty()) {
                asyncResponse.setTimeout(StreamingAsyncSubscriber.this.asyncTimeout, TimeUnit.MILLISECONDS);
            } else {
                StreamingAsyncSubscriber.this.resumeAsyncResponse();
            }
        }
    }

    private class StreamingResponseImpl
    implements StreamingResponse<T> {
        private StreamingResponseImpl() {
        }

        public void writeTo(StreamingResponse.Writer<T> writer) throws IOException {
            if (StreamingAsyncSubscriber.this.openTag != null) {
                writer.getEntityStream().write(StringUtils.toBytesUTF8((String)StreamingAsyncSubscriber.this.openTag));
            }
            while (!StreamingAsyncSubscriber.this.completed || !StreamingAsyncSubscriber.this.queue.isEmpty()) {
                try {
                    Object bean = StreamingAsyncSubscriber.this.queue.poll(StreamingAsyncSubscriber.this.pollTimeout, TimeUnit.MILLISECONDS);
                    if (bean == null) continue;
                    if (StreamingAsyncSubscriber.this.firstWriteDone.getAndSet(true)) {
                        writer.getEntityStream().write(StringUtils.toBytesUTF8((String)StreamingAsyncSubscriber.this.separator));
                    }
                    writer.write(bean);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (StreamingAsyncSubscriber.this.closeTag != null) {
                writer.getEntityStream().write(StringUtils.toBytesUTF8((String)StreamingAsyncSubscriber.this.closeTag));
            }
        }
    }
}

