/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.csp;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.exception.UnexpectedDataException;
import io.activej.common.exception.UnknownFormatException;
import io.activej.common.initializer.WithInitializer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelSupplier;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.csp.WithChannelToStream;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.CorruptedDataException;

public final class ChannelDeserializer<T>
extends AbstractStreamSupplier<T>
implements WithChannelToStream<ChannelDeserializer<T>, ByteBuf, T>,
WithInitializer<ChannelDeserializer<T>> {
    private ChannelSupplier<ByteBuf> input;
    private final BinarySerializer<T> valueSerializer;
    private final ByteBufs bufs = new ByteBufs();
    private boolean explicitEndOfStream = false;

    private ChannelDeserializer(BinarySerializer<T> valueSerializer) {
        this.valueSerializer = valueSerializer;
    }

    public static <T> ChannelDeserializer<T> create(BinarySerializer<T> valueSerializer) {
        return new ChannelDeserializer<T>(valueSerializer);
    }

    public ChannelDeserializer<T> withExplicitEndOfStream() {
        return this.withExplicitEndOfStream(true);
    }

    public ChannelDeserializer<T> withExplicitEndOfStream(boolean explicitEndOfStream) {
        this.explicitEndOfStream = explicitEndOfStream;
        return this;
    }

    public ChannelInput<ByteBuf> getInput() {
        return input -> {
            this.input = input;
            return this.getAcknowledgement();
        };
    }

    @Override
    protected void onResumed() {
        boolean endOfStream;
        this.asyncBegin();
        try {
            endOfStream = this.process();
        }
        catch (CorruptedDataException e) {
            this.closeEx((Exception)new MalformedDataException("Data is corrupted", (Throwable)e));
            return;
        }
        catch (Exception e) {
            this.closeEx((Exception)new UnknownFormatException(String.format("Parse exception, %s : %s", this, this.bufs), (Throwable)e));
            return;
        }
        if (endOfStream) {
            assert (this.bufs.hasRemainingBytes(1));
            this.bufs.skip(1);
            if (!this.explicitEndOfStream) {
                this.closeEx((Exception)new TruncatedDataException(String.format("Unexpected end-of-stream, %s : %s", this, this.bufs)));
                return;
            }
            if (this.bufs.hasRemaining()) {
                this.closeEx((Exception)new UnexpectedDataException(String.format("Unexpected data after end-of-stream, %s : %s", this, this.bufs)));
                return;
            }
        }
        if (this.isReady()) {
            this.input.get().whenResult(buf -> {
                if (buf != null) {
                    if (endOfStream) {
                        buf.recycle();
                        this.closeEx((Exception)new UnexpectedDataException(String.format("Unexpected data after end-of-stream, %s : %s", this, this.bufs)));
                        return;
                    }
                    this.bufs.add(buf);
                    this.asyncResume();
                } else {
                    if (this.explicitEndOfStream && !endOfStream) {
                        this.closeEx((Exception)new UnknownFormatException(String.format("Explicit end-of-stream is missing, %s : %s", this, this.bufs)));
                        return;
                    }
                    if (this.bufs.isEmpty()) {
                        this.sendEndOfStream();
                    } else {
                        this.closeEx((Exception)new TruncatedDataException(String.format("Truncated serialized data stream, %s : %s", this, this.bufs)));
                    }
                }
            }).whenException(this::closeEx);
        } else {
            this.asyncEnd();
        }
    }

    private boolean process() {
        ByteBuf firstBuf;
        while (this.isReady() && (firstBuf = this.bufs.peekBuf()) != null) {
            int r;
            int firstBufRemaining = firstBuf.readRemaining();
            if (firstBufRemaining >= 4) {
                int headerSize;
                int messageSize;
                int pos;
                byte[] array = firstBuf.array();
                byte b = array[pos = firstBuf.head()];
                if (b > 0) {
                    messageSize = b + 1;
                    headerSize = 1;
                } else {
                    int encodedSize = ChannelDeserializer.readEncodedSize(array, pos, b);
                    if (encodedSize == 0) {
                        return true;
                    }
                    headerSize = encodedSize >>> 28;
                    messageSize = (encodedSize & 0xFFFFFFF) + headerSize;
                }
                if (firstBufRemaining >= messageSize) {
                    Object item = this.valueSerializer.decode(array, pos + headerSize);
                    this.send(item);
                    if (firstBufRemaining != messageSize) {
                        firstBuf.moveHead(messageSize);
                        continue;
                    }
                    this.bufs.take().recycle();
                    continue;
                }
            }
            if ((r = this.doProcess()) == 0) {
                return true;
            }
            if (r >= 0) continue;
            break;
        }
        return false;
    }

    private int doProcess() {
        int encodedSize = this.readEncodedSize();
        if (encodedSize == 0) {
            return 0;
        }
        int messageSize = encodedSize & 0xFFFFFFF;
        int headerSize = encodedSize >>> 28;
        if (!this.bufs.hasRemainingBytes(messageSize)) {
            return -1;
        }
        this.bufs.consume(messageSize, buf -> {
            Object item = this.valueSerializer.decode(buf.array(), buf.head() + headerSize);
            this.send(item);
        });
        return 1;
    }

    private static int readEncodedSize(byte[] array, int pos, byte b) {
        if (b < 0) {
            int dataSize = b & 0x7F;
            b = array[pos + 1];
            if (b >= 0) {
                return (dataSize += b << 7) + 0x20000000;
            }
            dataSize += (b & 0x7F) << 7;
            b = array[pos + 2];
            if (b >= 0) {
                return (dataSize += b << 14) + 0x30000000;
            }
            dataSize += (b & 0x7F) << 14;
            b = array[pos + 3];
            if (b >= 0) {
                return (dataSize += b << 21) + 0x40000000;
            }
            throw new CorruptedDataException("Invalid header size");
        }
        return 0;
    }

    private int readEncodedSize() {
        byte b = this.bufs.peekByte();
        if (b > 0) {
            return b + 1 + 0x10000000;
        }
        if (b == 0) {
            return 0;
        }
        if (this.bufs.hasRemainingBytes(2)) {
            int dataSize = b & 0x7F;
            b = this.bufs.peekByte(1);
            if (b >= 0) {
                return (dataSize += b << 7) + 2 + 0x20000000;
            }
            if (this.bufs.hasRemainingBytes(3)) {
                dataSize += (b & 0x7F) << 7;
                b = this.bufs.peekByte(2);
                if (b >= 0) {
                    return (dataSize += b << 14) + 3 + 0x30000000;
                }
                if (this.bufs.hasRemainingBytes(4)) {
                    dataSize += (b & 0x7F) << 14;
                    b = this.bufs.peekByte(3);
                    if (b >= 0) {
                        return (dataSize += b << 21) + 4 + 0x40000000;
                    }
                    throw new CorruptedDataException("Invalid header size");
                }
            }
            return Integer.MAX_VALUE;
        }
        return Integer.MAX_VALUE;
    }

    @Override
    protected void onError(Exception e) {
        this.input.closeEx(e);
    }

    @Override
    protected void onCleanup() {
        this.bufs.recycle();
    }
}

