/*
 * Decompiled with CFR 0.152.
 */
package com.artipie.http.misc;

import com.artipie.http.misc.BufAccumulator;
import com.artipie.http.misc.ByteBufferTokenizer;
import com.artipie.http.misc.DummySubscription;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class TokenizerFlatProc
implements Processor<ByteBuffer, ByteBuffer>,
ByteBufferTokenizer.Receiver {
    private static final int CAP_BUF = 128;
    private final ByteBufferTokenizer tokenizer;
    private final BufAccumulator accumulator;
    private final AtomicBoolean completed;
    private final Object lock;
    private volatile Subscriber<? super ByteBuffer> downstream;
    private volatile ProxySubscription upstream;

    public TokenizerFlatProc(String delim) {
        this(delim, 128);
    }

    public TokenizerFlatProc(String delim, int cap) {
        this.tokenizer = new ByteBufferTokenizer(this, delim.getBytes(StandardCharsets.US_ASCII));
        this.accumulator = new BufAccumulator(cap);
        this.completed = new AtomicBoolean();
        this.lock = new Object();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(Subscriber<? super ByteBuffer> sub) {
        Object object = this.lock;
        synchronized (object) {
            if (this.downstream != null) {
                sub.onSubscribe((Subscription)DummySubscription.VALUE);
                sub.onError((Throwable)new IllegalStateException("Only one downstream supported"));
                return;
            }
            this.downstream = sub;
            if (this.upstream != null) {
                this.downstream.onSubscribe((Subscription)this.upstream);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onSubscribe(Subscription sub) {
        Object object = this.lock;
        synchronized (object) {
            if (this.upstream != null) {
                throw new IllegalStateException("Already subscribed");
            }
            this.upstream = new ProxySubscription(sub);
            if (this.downstream != null) {
                this.downstream.onSubscribe((Subscription)this.upstream);
            }
        }
    }

    public void onNext(ByteBuffer buffer) {
        this.tokenizer.push(buffer);
    }

    public void onError(Throwable err) {
        this.downstream.onError(err);
    }

    public void onComplete() {
        if (this.completed.compareAndSet(false, true)) {
            this.tokenizer.close();
        }
    }

    @Override
    public void receive(ByteBuffer next, boolean end) {
        this.upstream.receive();
        this.accumulator.write(next);
        if (end) {
            ByteBuffer dst = ByteBuffer.allocate(this.accumulator.size());
            this.accumulator.read(dst);
            dst.flip();
            this.downstream.onNext((Object)dst);
            if (this.completed.get()) {
                this.downstream.onComplete();
                this.accumulator.close();
            }
        }
    }

    private static final class ProxySubscription
    implements Subscription {
        private final Subscription upstream;
        private final AtomicBoolean requested;

        ProxySubscription(Subscription upstream) {
            this.upstream = upstream;
            this.requested = new AtomicBoolean();
        }

        public void request(long amount) {
            if (this.requested.compareAndSet(false, true)) {
                this.upstream.request(Long.MAX_VALUE);
            }
        }

        public void cancel() {
            this.upstream.cancel();
        }

        public void receive() {
        }
    }
}

