/*
 * Decompiled with CFR 0.152.
 */
package com.artipie.http.rq.multipart;

import com.artipie.ArtipieException;
import com.artipie.http.misc.ByteBufferTokenizer;
import com.artipie.http.misc.Pipeline;
import com.artipie.http.rq.multipart.Completion;
import com.artipie.http.rq.multipart.MultiPart;
import com.artipie.http.rq.multipart.RqMultipart;
import com.artipie.http.rq.multipart.State;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class MultiParts
implements Processor<ByteBuffer, RqMultipart.Part>,
ByteBufferTokenizer.Receiver {
    private final Pipeline<RqMultipart.Part> pipeline;
    private final ByteBufferTokenizer tokenizer;
    private final ExecutorService exec;
    private final Object lock;
    private volatile MultiPart current;
    private final State state;
    private final Completion<?> completion;

    MultiParts(String boundary) {
        this.tokenizer = new ByteBufferTokenizer(this, boundary.getBytes(StandardCharsets.US_ASCII));
        this.exec = Executors.newSingleThreadExecutor();
        this.pipeline = new Pipeline();
        this.completion = new Completion<RqMultipart.Part>(this.pipeline);
        this.state = new State();
        this.lock = new Object();
    }

    public void subscribeAsync(Publisher<ByteBuffer> pub) {
        this.exec.submit(() -> pub.subscribe((Subscriber)this));
    }

    public void subscribe(Subscriber<? super RqMultipart.Part> sub) {
        this.pipeline.connect(sub);
    }

    public void onSubscribe(Subscription sub) {
        this.pipeline.onSubscribe(sub);
    }

    public void onNext(ByteBuffer chunk) {
        this.tokenizer.push(chunk);
        this.pipeline.request(1L);
    }

    public void onError(Throwable err) {
        this.pipeline.onError((Throwable)new ArtipieException("Upstream failed", err));
        this.exec.shutdown();
    }

    public void onComplete() {
        this.completion.upstreamCompleted();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receive(ByteBuffer next, boolean end) {
        Object object = this.lock;
        synchronized (object) {
            this.state.patch(next, end);
            if (this.state.shouldIgnore()) {
                return;
            }
            if (this.state.started()) {
                this.completion.itemStarted();
                this.current = new MultiPart(this.completion, part -> this.exec.submit(() -> this.pipeline.onNext((RqMultipart.Part)part)));
            }
            this.current.push(next);
            if (this.state.ended()) {
                this.current.flush();
            }
        }
    }
}

