/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.http.server;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.reactivex.netty.channel.ChannelOperations;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.channel.FlushSelectorOperator;
import io.reactivex.netty.protocol.http.TrailingHeaders;
import io.reactivex.netty.protocol.http.internal.OperatorTrailer;
import io.reactivex.netty.protocol.http.server.ResponseContentWriter;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;

final class ContentWriterImpl<C>
extends ResponseContentWriter<C> {
    private final Connection connection;
    private final Observable headersObservable;
    private final Observable contentObservable;
    private final HttpResponse headers;
    private final Func1<C, Boolean> flushOnEachSelector = new Func1<C, Boolean>(){

        public Boolean call(C w) {
            return true;
        }
    };

    ContentWriterImpl(final Connection connection, final HttpResponse headers) {
        super(new Observable.OnSubscribe<Void>(){

            public void call(Subscriber<? super Void> subscriber) {
                if (!HttpUtil.isTransferEncodingChunked((HttpMessage)headers)) {
                    headers.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)0);
                }
                connection.write(Observable.just((Object)headers)).unsafeSubscribe(subscriber);
            }
        });
        this.connection = connection;
        this.headers = headers;
        this.headersObservable = Observable.just((Object)headers);
        this.contentObservable = null;
    }

    private ContentWriterImpl(ContentWriterImpl<C> parent, final Observable content, final boolean appendTrailer) {
        super(new Observable.OnSubscribe<Void>(){

            public void call(Subscriber<? super Void> subscriber) {
                ContentWriterImpl.this.connection.write(ContentWriterImpl.getHttpStream(ContentWriterImpl.this, content, appendTrailer)).unsafeSubscribe(subscriber);
            }
        });
        Observable rawMerged;
        this.connection = parent.connection;
        this.headers = parent.headers;
        this.headersObservable = parent.headersObservable;
        this.contentObservable = null == parent.contentObservable ? content : (rawMerged = parent.contentObservable.mergeWith(content));
    }

    @Override
    public ResponseContentWriter<C> write(Observable<C> msgs) {
        return new ContentWriterImpl<C>(this, msgs, true);
    }

    @Override
    public <T extends TrailingHeaders> Observable<Void> write(Observable<C> contentSource, Func0<T> trailerFactory, Func2<T, C, T> trailerMutator) {
        Observable<C> rawObservable = contentSource;
        return new ContentWriterImpl<C>(this, OperatorTrailer.liftFrom(rawObservable, trailerFactory, trailerMutator), false);
    }

    @Override
    public <T extends TrailingHeaders> Observable<Void> write(Observable<C> contentSource, Func0<T> trailerFactory, Func2<T, C, T> trailerMutator, Func1<C, Boolean> flushSelector) {
        return this.write(contentSource.lift((Observable.Operator)new FlushSelectorOperator(flushSelector, (ChannelOperations)this.connection)), trailerFactory, trailerMutator);
    }

    @Override
    public ResponseContentWriter<C> write(Observable<C> msgs, Func1<C, Boolean> flushSelector) {
        return new ContentWriterImpl<C>(this, msgs.lift((Observable.Operator)new FlushSelectorOperator(flushSelector, (ChannelOperations)this.connection)), true);
    }

    @Override
    public ResponseContentWriter<C> writeAndFlushOnEach(Observable<C> msgs) {
        return this.write(msgs, this.flushOnEachSelector);
    }

    @Override
    public ResponseContentWriter<C> writeString(Observable<String> msgs) {
        return new ContentWriterImpl<C>(this, msgs, true);
    }

    @Override
    public <T extends TrailingHeaders> Observable<Void> writeString(Observable<String> contentSource, Func0<T> trailerFactory, Func2<T, String, T> trailerMutator) {
        Observable<String> rawObservable = contentSource;
        return new ContentWriterImpl<C>(this, OperatorTrailer.liftFrom(rawObservable, trailerFactory, trailerMutator), false);
    }

    @Override
    public <T extends TrailingHeaders> Observable<Void> writeString(Observable<String> contentSource, Func0<T> trailerFactory, Func2<T, String, T> trailerMutator, Func1<String, Boolean> flushSelector) {
        Observable rawObservable = contentSource.lift((Observable.Operator)new FlushSelectorOperator(flushSelector, (ChannelOperations)this.connection));
        return new ContentWriterImpl<C>(this, OperatorTrailer.liftFrom(rawObservable, trailerFactory, trailerMutator), false);
    }

    @Override
    public ResponseContentWriter<C> writeString(Observable<String> msgs, Func1<String, Boolean> flushSelector) {
        return new ContentWriterImpl<C>(this, msgs.lift((Observable.Operator)new FlushSelectorOperator(flushSelector, (ChannelOperations)this.connection)), true);
    }

    @Override
    public ResponseContentWriter<C> writeStringAndFlushOnEach(Observable<String> msgs) {
        return this.writeString(msgs, (Func1<String, Boolean>)ChannelOperations.FLUSH_ON_EACH_STRING);
    }

    @Override
    public ResponseContentWriter<C> writeBytes(Observable<byte[]> msgs) {
        return new ContentWriterImpl<C>(this, msgs, true);
    }

    @Override
    public <T extends TrailingHeaders> Observable<Void> writeBytes(Observable<byte[]> contentSource, Func0<T> trailerFactory, Func2<T, byte[], T> trailerMutator) {
        Observable<byte[]> rawObservable = contentSource;
        return new ContentWriterImpl<C>(this, OperatorTrailer.liftFrom(rawObservable, trailerFactory, trailerMutator), false);
    }

    @Override
    public <T extends TrailingHeaders> Observable<Void> writeBytes(Observable<byte[]> contentSource, Func0<T> trailerFactory, Func2<T, byte[], T> trailerMutator, Func1<byte[], Boolean> flushSelector) {
        Observable rawObservable = contentSource.lift((Observable.Operator)new FlushSelectorOperator(flushSelector, (ChannelOperations)this.connection));
        return new ContentWriterImpl<C>(this, OperatorTrailer.liftFrom(rawObservable, trailerFactory, trailerMutator), false);
    }

    @Override
    public ResponseContentWriter<C> writeBytes(Observable<byte[]> msgs, Func1<byte[], Boolean> flushSelector) {
        return new ContentWriterImpl<C>(this, msgs.lift((Observable.Operator)new FlushSelectorOperator(flushSelector, (ChannelOperations)this.connection)), true);
    }

    @Override
    public ResponseContentWriter<C> writeBytesAndFlushOnEach(Observable<byte[]> msgs) {
        return this.writeBytes(msgs, (Func1<byte[], Boolean>)ChannelOperations.FLUSH_ON_EACH_BYTES);
    }

    private static Observable getHttpStream(ContentWriterImpl parent, Observable content, boolean appendTrailer) {
        Observable httpStream = parent.headersObservable;
        httpStream = null != parent.contentObservable ? httpStream.concatWith(parent.contentObservable.mergeWith(content)) : httpStream.concatWith(content);
        if (appendTrailer) {
            httpStream = httpStream.concatWith(Observable.just((Object)LastHttpContent.EMPTY_LAST_CONTENT));
        }
        return httpStream;
    }
}

