/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.csp;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.initializer.WithInitializer;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelOutput;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.csp.WithStreamToChannel;
import io.activej.promise.Promise;
import io.activej.serializer.BinarySerializer;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ChannelSerializer<T>
extends AbstractStreamConsumer<T>
implements WithStreamToChannel<ChannelSerializer<T>, T, ByteBuf>,
WithInitializer<ChannelSerializer<T>> {
    private static final Logger logger = LoggerFactory.getLogger(ChannelSerializer.class);
    private static final boolean CHECK = Checks.isEnabled(ChannelSerializer.class);
    private static final int MAX_SIZE_INT = 0x10000000;
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = MemSize.kilobytes((long)16L);
    private final BinarySerializer<T> serializer;
    private MemSize initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
    private byte @Nullable [] explicitEndOfStream;
    @Nullable
    private Duration autoFlushInterval;
    private BiConsumer<T, Exception> serializationErrorHandler = ($, e) -> this.closeEx((Exception)e);
    private Input input;
    private ChannelConsumer<ByteBuf> output;
    private final ArrayDeque<ByteBuf> bufs = new ArrayDeque();
    private boolean sending;

    private ChannelSerializer(BinarySerializer<T> serializer) {
        this.serializer = serializer;
    }

    public static <T> ChannelSerializer<T> create(BinarySerializer<T> serializer) {
        return new ChannelSerializer<T>(serializer);
    }

    public ChannelSerializer<T> withInitialBufferSize(MemSize bufferSize) {
        this.initialBufferSize = bufferSize;
        return this;
    }

    public ChannelSerializer<T> withAutoFlushInterval(@Nullable Duration autoFlushInterval) {
        this.autoFlushInterval = autoFlushInterval;
        return this;
    }

    public ChannelSerializer<T> withSkipSerializationErrors() {
        return this.withSerializationErrorHandler((item, e) -> logger.warn("Skipping serialization error for {} in {}", new Object[]{item, this, e}));
    }

    public ChannelSerializer<T> withSerializationErrorHandler(BiConsumer<T, Exception> handler) {
        this.serializationErrorHandler = handler;
        return this;
    }

    public ChannelSerializer<T> withExplicitEndOfStream() {
        return this.withExplicitEndOfStream(true);
    }

    public ChannelSerializer<T> withExplicitEndOfStream(boolean explicitEndOfStream) {
        byte[] byArray;
        if (explicitEndOfStream) {
            byte[] byArray2 = new byte[1];
            byArray = byArray2;
            byArray2[0] = 0;
        } else {
            byArray = null;
        }
        return this.withExplicitEndOfStream(byArray);
    }

    public ChannelSerializer<T> withExplicitEndOfStream(byte @Nullable [] explicitEndOfStream) {
        this.explicitEndOfStream = explicitEndOfStream;
        return this;
    }

    public ChannelOutput<ByteBuf> getOutput() {
        return output -> {
            this.output = output;
            this.resume(this.input);
        };
    }

    @Override
    protected void onInit() {
        this.input = new Input(this.serializer, this.initialBufferSize.toInt(), this.serializationErrorHandler);
    }

    @Override
    protected void onStarted() {
        if (this.output != null) {
            this.resume(this.input);
        }
    }

    @Override
    protected void onEndOfStream() {
        this.input.flush();
        this.send();
    }

    @Override
    protected void onError(Exception e) {
        this.output.closeEx(e);
    }

    @Override
    protected void onCleanup() {
        this.bufs.forEach(ByteBuf::recycle);
        this.bufs.clear();
        this.input.buf = (ByteBuf)Utils.nullify((Object)this.input.buf, ByteBuf::recycle);
    }

    private void send() {
        if (this.sending) {
            return;
        }
        if (!this.bufs.isEmpty()) {
            this.sending = true;
            while (!this.bufs.isEmpty()) {
                Promise acceptPromise = this.output.accept((Object)this.bufs.poll());
                if (acceptPromise.isResult()) continue;
                acceptPromise.run(($, e) -> {
                    if (e == null) {
                        this.sending = false;
                        this.send();
                    } else {
                        this.closeEx(e);
                    }
                });
                return;
            }
            this.sending = false;
            this.send();
        } else if (this.isEndOfStream()) {
            this.sending = true;
            Promise.complete().then(() -> this.explicitEndOfStream != null ? this.output.accept((Object)ByteBuf.wrapForReading((byte[])this.explicitEndOfStream)) : Promise.complete()).then(() -> this.output.acceptEndOfStream()).whenResult(this::acknowledge);
        } else {
            this.resume(this.input);
        }
    }

    private static int varIntSize(int value) {
        return 1 + (31 - Integer.numberOfLeadingZeros(value)) / 7;
    }

    private final class Input
    implements StreamDataAcceptor<T> {
        private final BinarySerializer<T> serializer;
        private ByteBuf buf = null;
        private int estimatedDataSize;
        private int estimatedHeaderSize;
        private int requiredRemainingSize;
        private final int initialBufferSize;
        private final int autoFlushIntervalMillis;
        private boolean flushPosted;
        private final BiConsumer<T, Exception> serializationErrorHandler;

        public Input(BinarySerializer<T> serializer, int initialBufferSize, BiConsumer<T, Exception> serializationErrorHandler) {
            this.serializer = serializer;
            this.initialBufferSize = initialBufferSize;
            this.autoFlushIntervalMillis = ChannelSerializer.this.autoFlushInterval == null ? Integer.MAX_VALUE : (int)ChannelSerializer.this.autoFlushInterval.toMillis();
            this.serializationErrorHandler = serializationErrorHandler;
        }

        @Override
        public void accept(T item) {
            int positionEnd;
            int positionData;
            int positionBegin;
            while (true) {
                if (this.buf == null || this.buf.writeRemaining() < this.requiredRemainingSize) {
                    this.ensureBuffer();
                }
                positionBegin = this.buf.tail();
                positionData = positionBegin + this.estimatedHeaderSize;
                try {
                    positionEnd = this.serializer.encode(this.buf.array(), positionData, item);
                }
                catch (ArrayIndexOutOfBoundsException e) {
                    this.enlargeBuffer();
                    continue;
                }
                catch (Exception e) {
                    this.onSerializationError(item, e);
                    return;
                }
                break;
            }
            this.buf.tail(positionEnd);
            int dataSize = positionEnd - positionData;
            if (dataSize > this.estimatedDataSize) {
                this.reestimate(positionBegin, positionData, dataSize);
            }
            this.writeSize(this.buf.array(), positionBegin, dataSize);
        }

        private void writeSize(byte[] buf, int pos, int size) {
            if (this.estimatedHeaderSize == 1) {
                buf[pos] = (byte)size;
                return;
            }
            buf[pos] = (byte)(size & 0x7F | 0x80);
            size >>>= 7;
            if (this.estimatedHeaderSize == 2) {
                buf[pos + 1] = (byte)size;
                return;
            }
            buf[pos + 1] = (byte)(size & 0x7F | 0x80);
            size >>>= 7;
            if (this.estimatedHeaderSize == 3) {
                buf[pos + 2] = (byte)size;
                return;
            }
            assert (this.estimatedHeaderSize == 4);
            buf[pos + 2] = (byte)(size & 0x7F | 0x80);
            buf[pos + 3] = (byte)(size >>>= 7);
        }

        private void ensureBuffer() {
            this.flush();
            this.buf = ByteBufPool.allocate((int)Math.max(this.initialBufferSize, this.requiredRemainingSize));
            if (!this.flushPosted) {
                this.postFlush();
            }
        }

        private void enlargeBuffer() {
            int writeRemaining = this.buf.writeRemaining();
            this.flush();
            this.buf = ByteBufPool.allocate((int)Math.max(this.initialBufferSize, writeRemaining + (writeRemaining >>> 1) + 1));
        }

        private void reestimate(int positionBegin, int positionData, int dataSize) {
            if (CHECK) {
                Checks.checkArgument((dataSize < 0x10000000 ? 1 : 0) != 0, (Object)"Serialized data size exceeds 256MB");
            }
            this.estimatedDataSize = dataSize;
            this.estimatedHeaderSize = ChannelSerializer.varIntSize(this.estimatedDataSize);
            this.requiredRemainingSize = this.estimatedHeaderSize + this.estimatedDataSize + (this.estimatedDataSize >>> 2);
            this.ensureHeaderSize(positionBegin, positionData, dataSize);
        }

        private void ensureHeaderSize(int positionBegin, int positionData, int dataSize) {
            int previousHeaderSize = positionData - positionBegin;
            if (previousHeaderSize == this.estimatedHeaderSize) {
                return;
            }
            int headerDelta = this.estimatedHeaderSize - previousHeaderSize;
            assert (headerDelta > 0);
            int newPositionData = positionData + headerDelta;
            int newPositionEnd = newPositionData + dataSize;
            if (newPositionEnd < this.buf.array().length) {
                System.arraycopy(this.buf.array(), positionData, this.buf.array(), newPositionData, dataSize);
            } else {
                ByteBuf old = this.buf;
                this.buf = ByteBufPool.allocate((int)Math.max(this.initialBufferSize, newPositionEnd));
                System.arraycopy(old.array(), 0, this.buf.array(), 0, positionBegin);
                System.arraycopy(old.array(), positionData, this.buf.array(), newPositionData, dataSize);
                old.recycle();
            }
            this.buf.tail(newPositionEnd);
        }

        private void postFlush() {
            this.flushPosted = true;
            if (this.autoFlushIntervalMillis <= 0) {
                ChannelSerializer.this.eventloop.postLast(() -> {
                    this.flushPosted = false;
                    this.flush();
                });
            } else if (this.autoFlushIntervalMillis < Integer.MAX_VALUE) {
                ChannelSerializer.this.eventloop.delayBackground((long)this.autoFlushIntervalMillis, () -> {
                    this.flushPosted = false;
                    this.flush();
                });
            }
        }

        private void flush() {
            if (this.buf == null) {
                return;
            }
            if (this.buf.canRead()) {
                if (!ChannelSerializer.this.bufs.isEmpty()) {
                    ChannelSerializer.this.suspend();
                }
                ChannelSerializer.this.bufs.add(this.buf);
                this.buf = null;
                ChannelSerializer.this.send();
            } else {
                this.buf.recycle();
                this.buf = null;
            }
        }

        private void onSerializationError(T item, Exception e) {
            this.serializationErrorHandler.accept(item, e);
        }
    }
}

