/*
 * Decompiled with CFR 0.152.
 */
package com.artipie.vertx;

import com.artipie.http.Connection;
import com.artipie.http.Headers;
import com.artipie.http.rs.RsStatus;
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpServerResponse;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.reactivestreams.Publisher;

final class VertxConnection
implements Connection {
    private final HttpServerResponse rsp;

    VertxConnection(HttpServerResponse rsp) {
        this.rsp = rsp;
    }

    public CompletionStage<Void> accept(RsStatus status, Headers headers, Publisher<ByteBuffer> body) {
        int code = Integer.parseInt(status.code());
        this.rsp.setStatusCode(code);
        for (Map.Entry header : headers) {
            this.rsp.putHeader((String)header.getKey(), (String)header.getValue());
        }
        CompletableFuture promise = new CompletableFuture();
        Flowable vpb = Flowable.fromPublisher(body).map(VertxConnection::mapBuffer).doOnError(promise::completeExceptionally);
        if (this.rsp.headers().contains("Content-Length")) {
            this.rsp.setChunked(false);
            vpb.doOnComplete(() -> {
                this.rsp.end();
                promise.complete(this.rsp);
            }).forEach(arg_0 -> ((HttpServerResponse)this.rsp).write(arg_0));
        } else {
            this.rsp.setChunked(true);
            vpb.doOnComplete(() -> promise.complete(this.rsp)).subscribe((FlowableSubscriber)this.rsp.toSubscriber());
        }
        return promise.thenCompose(ignored -> CompletableFuture.allOf(new CompletableFuture[0]));
    }

    private static Buffer mapBuffer(ByteBuffer buffer) {
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        return Buffer.buffer((byte[])bytes);
    }
}

