/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.shared.protocol.netty;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.io.netty.buffer.ByteBuf;
import io.pravega.shaded.io.netty.buffer.ByteBufOutputStream;
import io.pravega.shaded.io.netty.channel.Channel;
import io.pravega.shaded.io.netty.channel.ChannelHandlerContext;
import io.pravega.shared.NameUtils;
import io.pravega.shared.metrics.ClientMetricKeys;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.FlushingMessageToByteEncoder;
import io.pravega.shared.protocol.netty.InvalidMessageException;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommandType;
import io.pravega.shared.protocol.netty.WireCommands;
import java.beans.ConstructorProperties;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class CommandEncoder
extends FlushingMessageToByteEncoder<Object> {
    @SuppressFBWarnings(justification="generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommandEncoder.class);
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    private final Function<Long, AppendBatchSizeTracker> appendTracker;
    private final MetricNotifier metricNotifier;
    private final Map<Map.Entry<String, UUID>, Session> setupSegments = new HashMap<Map.Entry<String, UUID>, Session>();
    private final AtomicLong tokenCounter = new AtomicLong(0L);
    private String segmentBeingAppendedTo;
    private UUID writerIdPerformingAppends;
    private int currentBlockSize;
    private int bytesLeftInBlock;
    private final Map<UUID, Session> pendingWrites = new HashMap<UUID, Session>();

    public CommandEncoder(Function<Long, AppendBatchSizeTracker> appendTracker, MetricNotifier metricNotifier) {
        this.appendTracker = appendTracker;
        this.metricNotifier = metricNotifier;
    }

    private void flushAll(ByteBuf out) {
        if (!this.pendingWrites.isEmpty()) {
            ArrayList<Session> sessions = new ArrayList<Session>(this.pendingWrites.values());
            sessions.forEach(session -> ((Session)session).flush(out));
        }
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        log.trace("Encoding message to send over the wire {}", msg);
        if (msg instanceof Append) {
            AppendBatchSizeTracker blockSizeSupplier;
            Append append = (Append)msg;
            Session session = this.setupSegments.get(new AbstractMap.SimpleImmutableEntry<String, UUID>(append.segment, append.getWriterId()));
            this.validateAppend(append, session);
            ByteBuf data = append.getData().slice();
            AppendBatchSizeTracker appendBatchSizeTracker = blockSizeSupplier = this.appendTracker == null ? null : this.appendTracker.apply(append.getFlowId());
            if (blockSizeSupplier != null) {
                blockSizeSupplier.recordAppend(append.getEventNumber(), data.readableBytes());
            }
            if (this.isChannelFree()) {
                if (session.isFree()) {
                    session.record(append);
                    this.startAppend(ctx, blockSizeSupplier, append, out);
                    this.continueAppend(data, out);
                    if (this.bytesLeftInBlock == 0) {
                        this.completeAppend(null, out);
                        this.flushRequired();
                    }
                } else {
                    session.record(append);
                    session.write(data, out);
                    session.flush(out);
                }
            } else {
                session.record(append);
                if (this.isChannelOwner(append.getWriterId(), append.getSegment())) {
                    if (this.bytesLeftInBlock > data.readableBytes()) {
                        this.continueAppend(data, out);
                    } else {
                        ByteBuf dataInsideBlock = data.readSlice(this.bytesLeftInBlock);
                        this.completeAppend(dataInsideBlock, data, out);
                        this.flushAll(out);
                        this.flushRequired();
                    }
                } else {
                    session.write(data, out);
                }
            }
        } else if (msg instanceof WireCommands.SetupAppend) {
            this.breakCurrentAppend(out);
            this.flushAll(out);
            CommandEncoder.writeMessage((WireCommands.SetupAppend)msg, out);
            WireCommands.SetupAppend setup = (WireCommands.SetupAppend)msg;
            this.setupSegments.put(new AbstractMap.SimpleImmutableEntry<String, UUID>(setup.getSegment(), setup.getWriterId()), new Session(setup.getWriterId(), setup.getRequestId()));
            this.flushRequired();
        } else if (msg instanceof BlockTimeout) {
            BlockTimeout timeoutMsg = (BlockTimeout)msg;
            if (this.tokenCounter.get() == timeoutMsg.token) {
                this.breakCurrentAppend(out);
                this.flushAll(out);
            }
            this.flushRequired();
        } else if (msg instanceof WireCommands.Hello) {
            Preconditions.checkState(this.isChannelFree());
            Preconditions.checkState(this.pendingWrites.isEmpty());
            CommandEncoder.writeMessage((WireCommand)msg, out);
            this.flushRequired();
        } else if (msg instanceof WireCommand) {
            this.breakCurrentAppend(out);
            this.flushAll(out);
            CommandEncoder.writeMessage((WireCommand)msg, out);
            this.flushRequired();
        } else {
            throw new IllegalArgumentException("Expected a wire command and found: " + msg);
        }
    }

    private void validateAppend(Append append, Session session) {
        if (append.getEventCount() <= 0) {
            throw new InvalidMessageException("Invalid eventCount : " + append.getEventCount() + " in the append for Writer id: " + append.getWriterId());
        }
        if (session == null || !session.id.equals(append.getWriterId())) {
            throw new InvalidMessageException("Sending appends without setting up the append. Append Writer id: " + append.getWriterId());
        }
        if (append.getEventNumber() <= session.lastEventNumber) {
            throw new InvalidMessageException("Events written out of order. Received: " + append.getEventNumber() + " following: " + session.lastEventNumber + " for Writer id: " + append.getWriterId());
        }
        if (append.isConditional()) {
            throw new IllegalArgumentException("Conditional appends should be written via a ConditionalAppend object.");
        }
    }

    private boolean isChannelFree() {
        return this.writerIdPerformingAppends == null;
    }

    private boolean isChannelOwner(UUID writerID, String segment) {
        return writerID.equals(this.writerIdPerformingAppends) && segment.equals(this.segmentBeingAppendedTo);
    }

    private void startAppend(ChannelHandlerContext ctx, AppendBatchSizeTracker blockSizeSupplier, Append append, ByteBuf out) {
        int msgSize = append.getData().readableBytes();
        int blockSize = 0;
        if (blockSizeSupplier != null) {
            blockSize = blockSizeSupplier.getAppendBlockSize();
            if (!this.metricNotifier.equals(MetricNotifier.NO_OP_METRIC_NOTIFIER)) {
                this.metricNotifier.updateSuccessMetric(ClientMetricKeys.CLIENT_APPEND_BLOCK_SIZE, NameUtils.segmentTags(append.getSegment(), append.getWriterId().toString()), blockSize);
            }
        }
        this.segmentBeingAppendedTo = append.segment;
        this.writerIdPerformingAppends = append.writerId;
        if (ctx != null && blockSize > msgSize) {
            this.currentBlockSize = blockSize;
            this.writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize + 8, out);
            ctx.executor().schedule(new BlockTimeouter(ctx.channel(), this.tokenCounter.incrementAndGet()), (long)blockSizeSupplier.getBatchTimeout(), TimeUnit.MILLISECONDS);
        } else {
            this.currentBlockSize = msgSize;
            this.writeMessage(new WireCommands.AppendBlock(this.writerIdPerformingAppends), this.currentBlockSize, out);
        }
        this.bytesLeftInBlock = this.currentBlockSize;
    }

    private void continueAppend(ByteBuf data, ByteBuf out) {
        this.bytesLeftInBlock -= data.readableBytes();
        out.writeBytes(data);
    }

    private void completeAppend(ByteBuf pendingData, ByteBuf out) {
        Session session = this.setupSegments.get(new AbstractMap.SimpleImmutableEntry<String, UUID>(this.segmentBeingAppendedTo, this.writerIdPerformingAppends));
        session.flush(this.currentBlockSize - this.bytesLeftInBlock, pendingData, out);
        this.tokenCounter.incrementAndGet();
        this.bytesLeftInBlock = 0;
        this.currentBlockSize = 0;
        this.segmentBeingAppendedTo = null;
        this.writerIdPerformingAppends = null;
    }

    private void completeAppend(ByteBuf data, ByteBuf pendingData, ByteBuf out) {
        CommandEncoder.writeMessage(new WireCommands.PartialEvent(data), out);
        this.completeAppend(pendingData, out);
    }

    private void breakCurrentAppend(ByteBuf out) {
        if (this.isChannelFree()) {
            return;
        }
        this.writePadding(out);
        this.completeAppend(null, out);
    }

    private void writePadding(ByteBuf out) {
        out.writeInt(WireCommandType.PADDING.getCode());
        out.writeInt(this.bytesLeftInBlock);
        out.writeZero(this.bytesLeftInBlock);
    }

    private void writeMessage(WireCommands.AppendBlock block, int blockSize, ByteBuf out) {
        int startIdx = out.writerIndex();
        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        bout.writeInt(block.getType().getCode());
        bout.write(LENGTH_PLACEHOLDER);
        block.writeFields(bout);
        bout.flush();
        bout.close();
        int endIdx = out.writerIndex();
        int fieldsSize = endIdx - startIdx - 8;
        out.setInt(startIdx + 4, fieldsSize + blockSize);
    }

    @VisibleForTesting
    static int writeMessage(WireCommand msg, ByteBuf out) {
        int startIdx = out.writerIndex();
        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        bout.writeInt(msg.getType().getCode());
        bout.write(LENGTH_PLACEHOLDER);
        msg.writeFields(bout);
        bout.flush();
        bout.close();
        int endIdx = out.writerIndex();
        int fieldsSize = endIdx - startIdx - 8;
        out.setInt(startIdx + 4, fieldsSize);
        return endIdx - startIdx;
    }

    private final class BlockTimeouter
    implements Runnable {
        private final Channel channel;
        private final long token;

        @Override
        public void run() {
            if (CommandEncoder.this.tokenCounter.get() == this.token) {
                this.channel.writeAndFlush(new BlockTimeout(this.token));
            }
        }

        @ConstructorProperties(value={"channel", "token"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public BlockTimeouter(Channel channel, long token) {
            this.channel = channel;
            this.token = token;
        }
    }

    private static final class BlockTimeout {
        private final long token;

        @ConstructorProperties(value={"token"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public BlockTimeout(long token) {
            this.token = token;
        }
    }

    private final class Session {
        private static final int MAX_EVENTS = 500;
        private static final int MAX_DATA_SIZE = 0x100000;
        private final UUID id;
        private final long requestId;
        private final List<ByteBuf> pendingList = new ArrayList<ByteBuf>();
        private int pendingBytes = 0;
        private long lastEventNumber = -1L;
        private int eventCount = 0;

        private void record(Append append) {
            this.lastEventNumber = append.getEventNumber();
            this.eventCount += append.getEventCount();
        }

        private boolean isFree() {
            return this.eventCount == 0;
        }

        private void write(ByteBuf data, ByteBuf out) {
            CommandEncoder.this.pendingWrites.putIfAbsent(this.id, this);
            if (data.readableBytes() > 0) {
                this.pendingBytes += data.readableBytes();
                this.pendingList.add(data);
            }
            this.conditionalFlush(out);
        }

        private void conditionalFlush(ByteBuf out) {
            if (this.pendingBytes > 0x100000 || this.eventCount > 500) {
                CommandEncoder.this.breakCurrentAppend(out);
                this.flush(out);
            }
        }

        private void flush(ByteBuf out) {
            if (!this.isFree()) {
                CommandEncoder.this.pendingWrites.remove(this.id);
                CommandEncoder.this.writeMessage(new WireCommands.AppendBlock(this.id), this.pendingBytes, out);
                if (this.pendingBytes > 0) {
                    this.pendingList.forEach(out::writeBytes);
                    this.pendingList.clear();
                }
                this.flush(this.pendingBytes, null, out);
                this.pendingBytes = 0;
            }
        }

        private void flush(int sizeOfWholeEvents, ByteBuf data, ByteBuf out) {
            CommandEncoder.writeMessage(new WireCommands.AppendBlockEnd(this.id, sizeOfWholeEvents, data, this.eventCount, this.lastEventNumber, this.requestId), out);
            this.eventCount = 0;
        }

        @ConstructorProperties(value={"id", "requestId"})
        @SuppressFBWarnings(justification="generated code")
        @Generated
        public Session(UUID id, long requestId) {
            this.id = id;
            this.requestId = requestId;
        }
    }
}

