/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.plugins.netflow.codecs;

import com.github.joschi.jadconfig.util.Size;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog.plugins.netflow.codecs.TemplateKey;
import org.graylog.plugins.netflow.v9.NetFlowV9Journal;
import org.graylog.plugins.netflow.v9.NetFlowV9Parser;
import org.graylog.plugins.netflow.v9.RawNetFlowV9Packet;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetflowV9CodecAggregator
implements RemoteAddressCodecAggregator {
    private static final Logger LOG = LoggerFactory.getLogger(NetflowV9CodecAggregator.class);
    private static final ByteBuf PASSTHROUGH_MARKER = Unpooled.wrappedBuffer((byte[])new byte[]{0});
    private final Cache<TemplateKey, TemplateBytes> templateCache = CacheBuilder.newBuilder().maximumSize(5000L).removalListener(notification -> LOG.debug("Removed {} from template cache for reason {}", notification.getKey(), (Object)notification.getCause())).recordStats().build();
    private final Cache<TemplateKey, Queue<PacketBytes>> packetCache = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).maximumWeight(Size.megabytes((long)1L).toBytes()).removalListener(notification -> LOG.debug("Removed {} from packet cache for reason {}", notification.getKey(), (Object)notification.getCause())).weigher((key, value) -> value.stream().map(PacketBytes::readableBytes).reduce(0, Integer::sum)).recordStats().build();

    @Inject
    public NetflowV9CodecAggregator() {
    }

    @Override
    @Nonnull
    public CodecAggregator.Result addChunk(ByteBuf buf, SocketAddress remoteAddress) {
        if (buf.readableBytes() < 2) {
            return new CodecAggregator.Result(null, false);
        }
        try {
            Object tempQueue;
            short netFlowVersion = buf.getShort(0);
            if (netFlowVersion != 9) {
                return new CodecAggregator.Result(Unpooled.copiedBuffer((ByteBuf[])new ByteBuf[]{PASSTHROUGH_MARKER, buf}), true);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received V9 packet:\n{}", (Object)ByteBufUtil.prettyHexDump((ByteBuf)buf));
            }
            RawNetFlowV9Packet rawNetFlowV9Packet = NetFlowV9Parser.parsePacketShallow(buf);
            long sourceId = rawNetFlowV9Packet.header().sourceId();
            LOG.trace("Incoming NetFlow V9 packet contains: {}", (Object)rawNetFlowV9Packet);
            for (Map.Entry<Integer, byte[]> template : rawNetFlowV9Packet.templates().entrySet()) {
                int templateId = template.getKey();
                byte[] bytes = template.getValue();
                TemplateKey templateKey = new TemplateKey(remoteAddress, sourceId, templateId);
                TemplateBytes templateBytes = new TemplateBytes(bytes, false);
                this.templateCache.put((Object)templateKey, (Object)templateBytes);
            }
            Map.Entry<Integer, byte[]> optionTemplate = rawNetFlowV9Packet.optionTemplate();
            if (optionTemplate != null) {
                int templateId = optionTemplate.getKey();
                byte[] bytes = optionTemplate.getValue();
                TemplateKey templateKey = new TemplateKey(remoteAddress, sourceId, templateId);
                TemplateBytes templateBytes = new TemplateBytes(bytes, true);
                this.templateCache.put((Object)templateKey, (Object)templateBytes);
            }
            HashSet<ByteBuf> packetsToSend = new HashSet<ByteBuf>();
            if (!rawNetFlowV9Packet.templates().isEmpty() || rawNetFlowV9Packet.optionTemplate() != null) {
                HashSet<Integer> knownTemplateIds = new HashSet<Integer>();
                for (TemplateKey templateKey : this.templateCache.asMap().keySet()) {
                    if (templateKey.getRemoteAddress() != remoteAddress || templateKey.getSourceId() != sourceId) continue;
                    Integer templateId = templateKey.getTemplateId();
                    knownTemplateIds.add(templateId);
                }
                Queue bufferedPackets = (Queue)this.packetCache.getIfPresent((Object)TemplateKey.idForExporter(remoteAddress, sourceId));
                if (bufferedPackets != null) {
                    PacketBytes previousPacket;
                    tempQueue = new ArrayList(bufferedPackets.size());
                    int addedPackets = 0;
                    while (null != (previousPacket = (PacketBytes)bufferedPackets.poll())) {
                        if (knownTemplateIds.containsAll(previousPacket.getUsedTemplates())) {
                            packetsToSend.add(Unpooled.wrappedBuffer((byte[])previousPacket.getBytes()));
                            ++addedPackets;
                            continue;
                        }
                        tempQueue.add(previousPacket);
                    }
                    LOG.debug("Processing {} previously buffered packets, {} packets require more templates.", (Object)addedPackets, (Object)tempQueue.size());
                    if (!tempQueue.isEmpty()) {
                        bufferedPackets.addAll(tempQueue);
                    }
                }
            }
            boolean packetBuffered = false;
            HashSet<TemplateKey> templates = new HashSet<TemplateKey>();
            tempQueue = rawNetFlowV9Packet.usedTemplates().iterator();
            while (tempQueue.hasNext()) {
                int templateId = tempQueue.next();
                TemplateKey templateKey = new TemplateKey(remoteAddress, sourceId, templateId);
                TemplateBytes template = (TemplateBytes)this.templateCache.getIfPresent((Object)templateKey);
                if (template == null) {
                    try {
                        TemplateKey newTemplateKey = TemplateKey.idForExporter(remoteAddress, sourceId);
                        Queue bufferedPackets = (Queue)this.packetCache.get((Object)newTemplateKey, ConcurrentLinkedQueue::new);
                        byte[] bytes = ByteBufUtil.getBytes((ByteBuf)buf);
                        bufferedPackets.add(new PacketBytes(bytes, rawNetFlowV9Packet.usedTemplates()));
                        packetBuffered = true;
                    }
                    catch (ExecutionException newTemplateKey) {}
                    continue;
                }
                templates.add(templateKey);
                ByteBuf packet = buf.slice();
                packetsToSend.add(packet);
            }
            if (packetBuffered) {
                return new CodecAggregator.Result(null, true);
            }
            if (packetsToSend.isEmpty()) {
                return new CodecAggregator.Result(null, true);
            }
            NetFlowV9Journal.RawNetflowV9.Builder builder = NetFlowV9Journal.RawNetflowV9.newBuilder();
            for (TemplateKey templateKey : templates) {
                byte[] bytes;
                TemplateBytes templateBytes = (TemplateBytes)this.templateCache.getIfPresent((Object)templateKey);
                if (templateBytes == null) {
                    LOG.warn("Template {} expired while processing, discarding netflow packet", (Object)templateKey);
                    continue;
                }
                if (templateBytes.isOptionTemplate()) {
                    LOG.debug("Writing options template flow {}", (Object)templateKey);
                    bytes = templateBytes.getBytes();
                    builder.putOptionTemplate(1, ByteString.copyFrom((byte[])bytes));
                    continue;
                }
                LOG.debug("Writing template {}", (Object)templateKey);
                bytes = templateBytes.getBytes();
                builder.putTemplates(templateKey.getTemplateId(), ByteString.copyFrom((byte[])bytes));
            }
            for (ByteBuf packetBuffer : packetsToSend) {
                byte[] bytes = ByteBufUtil.getBytes((ByteBuf)packetBuffer);
                ByteString value = ByteString.copyFrom((byte[])bytes);
                builder.addPackets(value);
            }
            byte[] bytes = builder.build().toByteArray();
            ByteBuf resultBuffer = Unpooled.buffer((int)(bytes.length + 1)).writeByte(1).writeBytes(bytes);
            return new CodecAggregator.Result(resultBuffer, true);
        }
        catch (Exception e) {
            LOG.error("Unexpected failure while aggregating NetFlowV9 packet, discarding packet.", ExceptionUtils.getRootCause(e));
            return new CodecAggregator.Result(null, false);
        }
    }

    public static class PacketBytes {
        private final byte[] bytes;
        private final Set<Integer> usedTemplates;

        public PacketBytes(byte[] bytes, Set<Integer> usedTemplates) {
            this.bytes = bytes;
            this.usedTemplates = usedTemplates;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public Set<Integer> getUsedTemplates() {
            return this.usedTemplates;
        }

        public int readableBytes() {
            return this.bytes.length;
        }
    }

    private static class TemplateBytes {
        private final byte[] bytes;
        private final boolean optionTemplate;

        public TemplateBytes(byte[] bytes, boolean optionTemplate) {
            this.bytes = bytes;
            this.optionTemplate = optionTemplate;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public boolean isOptionTemplate() {
            return this.optionTemplate;
        }
    }
}

