/*
 * Decompiled with CFR 0.152.
 */
package com.artipie.http.stream;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.ArrayUtils;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class ByteByByteSplit
implements Processor<ByteBuffer, Publisher<ByteBuffer>> {
    private final CircularFifoQueue<Byte> ring;
    private final byte[] delim;
    private final LinkedBlockingQueue<Optional<ByteBuffer>> storage;
    private final AtomicReference<Optional<Subscription>> upstream;
    private final AtomicReference<Optional<Subscriber<? super Publisher<ByteBuffer>>>> downstream;
    private final AtomicReference<Optional<Subscriber<? super ByteBuffer>>> downDownstream;
    private final AtomicBoolean started;
    private final AtomicBoolean upstreamTerminated;
    private final AtomicLong downDemand;
    private final AtomicLong downDownDemand;
    private final Object upSync;
    private final Object downSync;

    public ByteByByteSplit(byte[] delim) {
        this.ring = new CircularFifoQueue(delim.length);
        this.delim = Arrays.copyOf(delim, delim.length);
        this.upstream = new AtomicReference(Optional.empty());
        this.downstream = new AtomicReference(Optional.empty());
        this.started = new AtomicBoolean(false);
        this.downDownstream = new AtomicReference(Optional.empty());
        this.storage = new LinkedBlockingQueue();
        this.upstreamTerminated = new AtomicBoolean(false);
        this.downDemand = new AtomicLong(0L);
        this.downDownDemand = new AtomicLong(0L);
        this.upSync = new Object();
        this.downSync = new Object();
    }

    public void subscribe(Subscriber<? super Publisher<ByteBuffer>> sub) {
        if (this.downstream.get().isPresent()) {
            throw new IllegalStateException("Only one subscription is allowed");
        }
        this.downstream.set(Optional.of(sub));
        sub.onSubscribe(new Subscription(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void request(long ask) {
                Object object = ByteByByteSplit.this.downSync;
                synchronized (object) {
                    ByteByByteSplit.this.downDemand.updateAndGet(operand -> operand + ask);
                    ((Subscription)((Optional)ByteByByteSplit.this.upstream.get()).get()).request(ask);
                }
            }

            public void cancel() {
                Object object = ByteByByteSplit.this.downSync;
                synchronized (object) {
                    throw new IllegalStateException("Cancel is not allowed");
                }
            }
        });
        this.tryToStart();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onSubscribe(Subscription sub) {
        Object object = this.upSync;
        synchronized (object) {
            if (this.downstream.get().isPresent()) {
                throw new IllegalStateException("Only one subscription is allowed");
            }
            this.upstream.set(Optional.of(sub));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(ByteBuffer next) {
        Object object = this.upSync;
        synchronized (object) {
            byte[] bytes = new byte[next.remaining()];
            next.get(bytes);
            ByteBuffer current = ByteByByteSplit.bufWithInitMark(bytes.length);
            for (byte each : bytes) {
                boolean eviction = this.ring.isAtFullCapacity();
                if (eviction) {
                    Byte last = (Byte)this.ring.get(0);
                    current.put(last);
                }
                this.ring.add((Object)each);
                if (!Arrays.equals(this.delim, this.ringBytes())) continue;
                this.ring.clear();
                current.limit(current.position());
                current.reset();
                this.emit(Optional.of(current));
                this.emit(Optional.empty());
                current = ByteByByteSplit.bufWithInitMark(bytes.length);
            }
            current.limit(current.position());
            current.reset();
            this.emit(Optional.of(current));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Throwable throwable) {
        Object object = this.upSync;
        synchronized (object) {
            this.upstreamTerminated.set(true);
            this.downstream.get().ifPresent(value -> value.onError(throwable));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onComplete() {
        Object object = this.upSync;
        synchronized (object) {
            this.upstreamTerminated.set(true);
            this.emit(Optional.of(ByteBuffer.wrap(this.ringBytes())));
        }
    }

    private static ByteBuffer bufWithInitMark(int size) {
        ByteBuffer current = ByteBuffer.allocate(size);
        current.mark();
        return current;
    }

    private byte[] ringBytes() {
        return ArrayUtils.toPrimitive((Byte[])((Byte[])this.ring.stream().toArray(Byte[]::new)));
    }

    private void tryToStart() {
        if (this.downstream.get().isPresent() && this.upstream.get().isPresent() && this.started.compareAndSet(false, true)) {
            this.emitNextSubSub();
        }
    }

    private void emit(Optional<ByteBuffer> buffer) {
        this.storage.add(buffer);
        this.meetDemand();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void meetDemand() {
        Object object = this.downSync;
        synchronized (object) {
            if (this.downDownstream.get().isPresent()) {
                while (this.downDownDemand.get() > 0L && this.storage.size() > 0) {
                    this.downDownDemand.decrementAndGet();
                    Optional<ByteBuffer> poll = this.storage.poll();
                    if (poll.isPresent()) {
                        this.downDownstream.get().get().onNext((Object)poll.get());
                        continue;
                    }
                    this.downDownstream.get().get().onComplete();
                    this.emitNextSubSub();
                }
                if (this.upstreamTerminated.get()) {
                    this.downDownstream.get().get().onComplete();
                    this.downstream.get().get().onComplete();
                }
            }
        }
    }

    private void emitNextSubSub() {
        this.downstream.get().get().onNext(sub -> {
            this.downDownstream.set(Optional.of(sub));
            sub.onSubscribe(new Subscription(){

                public void request(long requested) {
                    ByteByByteSplit.this.downDownDemand.updateAndGet(operand -> operand + requested);
                    ByteByByteSplit.this.meetDemand();
                }

                public void cancel() {
                }
            });
        });
    }
}

