/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.integrations.ipfix.codecs;

import com.github.joschi.jadconfig.util.Size;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.graylog.integrations.ipfix.InformationElementDefinitions;
import org.graylog.integrations.ipfix.IpfixJournal;
import org.graylog.integrations.ipfix.IpfixParser;
import org.graylog.integrations.ipfix.ShallowDataSet;
import org.graylog.integrations.ipfix.ShallowTemplateSet;
import org.graylog.integrations.ipfix.codecs.TemplateKey;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IpfixAggregator
implements RemoteAddressCodecAggregator {
    private static final Logger LOG = LoggerFactory.getLogger(IpfixAggregator.class);
    private final Cache<TemplateKey, ShallowTemplateSet.Record> templateCache;
    private final Cache<TemplateKey, Queue<ShallowDataSet>> packetCache;
    private final IpfixParser shallowParser = new IpfixParser(InformationElementDefinitions.empty());

    public IpfixAggregator() {
        this.templateCache = CacheBuilder.newBuilder().maximumSize(5000L).removalListener(notification -> LOG.debug("Removed [{}] from template cache for reason [{}]", notification.getKey(), (Object)notification.getCause())).recordStats().build();
        this.packetCache = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).maximumWeight(Size.megabytes((long)1L).toBytes()).removalListener(notification -> LOG.debug("Removed [{}] from packet cache for reason [{}]", notification.getKey(), (Object)notification.getCause())).weigher((key, value) -> value.stream().map(shallowDataSet -> shallowDataSet.content().length).reduce(0, Integer::sum)).recordStats().build();
    }

    @Override
    @Nonnull
    public CodecAggregator.Result addChunk(ByteBuf buf, @Nullable SocketAddress remoteAddress) {
        if (!buf.isReadable(2)) {
            return new CodecAggregator.Result(null, false);
        }
        try {
            IpfixParser.MessageDescription messageDescription = this.shallowParser.shallowParseMessage(buf);
            long observationDomainId = messageDescription.getHeader().observationDomainId();
            this.addTemplateKeyInCache(remoteAddress, messageDescription, observationDomainId);
            HashSet<ShallowDataSet> packetsToSendCollection = new HashSet<ShallowDataSet>();
            HashSet<Integer> bufferedTemplateIdList = new HashSet<Integer>();
            if (!messageDescription.declaredTemplateIds().isEmpty()) {
                HashSet<Integer> knownTemplateIdsList = new HashSet<Integer>();
                this.collectAllTemplateIds(remoteAddress, observationDomainId, knownTemplateIdsList);
                Queue bufferedPackets = (Queue)this.packetCache.getIfPresent((Object)TemplateKey.idForExporter(remoteAddress, observationDomainId));
                this.handleBufferedPackets(packetsToSendCollection, bufferedTemplateIdList, knownTemplateIdsList, bufferedPackets);
            }
            boolean packetBuffered = false;
            HashSet<TemplateKey> templatesList = new HashSet<TemplateKey>(this.templateCache.asMap().keySet());
            bufferedTemplateIdList.addAll(messageDescription.referencedTemplateIds());
            LOG.debug("Finding the needed templates for the buffered and current packets");
            Iterator iterator = bufferedTemplateIdList.iterator();
            while (iterator.hasNext()) {
                int templateId = (Integer)iterator.next();
                TemplateKey templateKey = new TemplateKey(remoteAddress, observationDomainId, templateId);
                Object template = this.templateCache.getIfPresent((Object)templateKey);
                if (template == null) {
                    LOG.debug("Template is null, packet needs to be buffered until templates have been received.");
                    try {
                        TemplateKey newTemplateKey = TemplateKey.idForExporter(remoteAddress, observationDomainId);
                        Queue bufferedPackets = (Queue)this.packetCache.get((Object)newTemplateKey, ConcurrentLinkedQueue::new);
                        byte[] bytes = ByteBufUtil.getBytes((ByteBuf)buf);
                        bufferedPackets.addAll(messageDescription.dataSets());
                        packetBuffered = true;
                    }
                    catch (ExecutionException executionException) {}
                    continue;
                }
                LOG.debug("Template [{}] has been added to template list.", (Object)templateKey);
                templatesList.add(templateKey);
                packetsToSendCollection.addAll(messageDescription.dataSets());
            }
            if (packetBuffered) {
                LOG.debug("Packet has been buffered and will not be processed now, returning result.");
                return new CodecAggregator.Result(null, true);
            }
            if (packetsToSendCollection.isEmpty()) {
                LOG.debug("Packet has not been buffered and no packet is queued.");
                return new CodecAggregator.Result(null, true);
            }
            IpfixJournal.RawIpfix.Builder journalBuilder = IpfixJournal.RawIpfix.newBuilder();
            this.buildJournalObject(packetsToSendCollection, templatesList, journalBuilder);
            IpfixJournal.RawIpfix rawIpfix = journalBuilder.build();
            return this.getCompleteResult(rawIpfix);
        }
        catch (Exception e) {
            LOG.error("Unable to aggregate IPFIX message due to the following error ", (Throwable)e);
            return new CodecAggregator.Result(null, false);
        }
    }

    public void buildJournalObject(Set<ShallowDataSet> packetsToSendCollection, Set<TemplateKey> templatesList, IpfixJournal.RawIpfix.Builder journalBuilder) {
        LOG.debug("Assembling the packet with necessary templates and data records which include the templates needed.");
        for (TemplateKey templateKey : templatesList) {
            ShallowTemplateSet.Record record = (ShallowTemplateSet.Record)this.templateCache.getIfPresent((Object)templateKey);
            journalBuilder.putTemplates(templateKey.getTemplateId(), ByteString.copyFrom((byte[])record.getRecordBytes()));
        }
        LOG.debug("IPFIX data set has been processed for the same template id, adding data set to IPFIX journal.");
        for (ShallowDataSet dataSet : packetsToSendCollection) {
            journalBuilder.addDataSets(IpfixJournal.DataSet.newBuilder().setTemplateId(dataSet.templateId()).setTimestampEpochSeconds(dataSet.epochSeconds()).setDataRecords(ByteString.copyFrom((byte[])dataSet.content())).build());
        }
    }

    public CodecAggregator.Result getCompleteResult(IpfixJournal.RawIpfix rawIpfix) {
        LOG.debug("Raw ipfix object complete, returning result.");
        return new CodecAggregator.Result(Unpooled.wrappedBuffer((byte[])rawIpfix.toByteArray()), true);
    }

    public void handleBufferedPackets(Set<ShallowDataSet> packetsToSendCollection, Set<Integer> bufferedTemplateIdList, Set<Integer> knownTemplateIdsList, Queue<ShallowDataSet> bufferedPackets) {
        if (bufferedPackets != null) {
            ShallowDataSet previousPacket;
            LOG.debug("Buffered packets detected in the packet cache.");
            ArrayList<ShallowDataSet> tempQueue = new ArrayList<ShallowDataSet>(bufferedPackets.size());
            int addedPackets = 0;
            while (null != (previousPacket = bufferedPackets.poll())) {
                if (knownTemplateIdsList.contains(previousPacket.templateId())) {
                    LOG.debug("Packet contains template id from a known template, adding to packets to send set.");
                    packetsToSendCollection.add(previousPacket);
                    bufferedTemplateIdList.add(previousPacket.templateId());
                    ++addedPackets;
                    continue;
                }
                LOG.debug("Packet contains unknown template id, adding to temporary queue.");
                tempQueue.add(previousPacket);
            }
            LOG.debug("Processing [{}] previously buffered packets, [{}] packets require more templates.", (Object)addedPackets, (Object)tempQueue.size());
            if (!tempQueue.isEmpty()) {
                LOG.debug("Buffered packets could not be processed, adding to temporary queue to wait for more templates.");
                bufferedPackets.addAll(tempQueue);
            }
        }
    }

    public void collectAllTemplateIds(@Nullable SocketAddress remoteAddress, long observationDomainId, Set<Integer> knownTemplateIdsList) {
        LOG.debug("Collecting all templateIds from templateKeys stored in templateCache.");
        for (TemplateKey templateKey : this.templateCache.asMap().keySet()) {
            if (templateKey.getRemoteAddress() != remoteAddress || templateKey.getObservationDomainId() != observationDomainId) continue;
            Integer templateId = templateKey.getTemplateId();
            knownTemplateIdsList.add(templateId);
        }
    }

    public void addTemplateKeyInCache(@Nullable SocketAddress remoteAddress, IpfixParser.MessageDescription messageDescription, long observationDomainId) {
        for (Integer templateId : messageDescription.declaredTemplateIds()) {
            TemplateKey templateKey = new TemplateKey(remoteAddress, observationDomainId, templateId);
            LOG.debug("Created template key with remote address [{}], observation domain ID [{}] and template ID [{}].", new Object[]{templateKey.getRemoteAddress(), templateKey.getObservationDomainId(), templateKey.getTemplateId()});
            this.templateCache.put((Object)templateKey, (Object)messageDescription.getTemplateRecord(templateId));
            LOG.debug("Saving templates key (raw bytes) in template cache to combine in new message later.");
        }
    }
}

