/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.codecs;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog2.inputs.codecs.gelf.GELFMessage;
import org.graylog2.inputs.codecs.gelf.GELFMessageChunk;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GelfChunkAggregator
implements CodecAggregator {
    private static final Logger log = LoggerFactory.getLogger(GelfChunkAggregator.class);
    private static final int MAX_CHUNKS = 128;
    public static final CodecAggregator.Result VALID_EMPTY_RESULT = new CodecAggregator.Result(null, true);
    public static final CodecAggregator.Result INVALID_RESULT = new CodecAggregator.Result(null, false);
    public static final int VALIDITY_PERIOD = 5000;
    private static final long CHECK_PERIOD = 1000L;
    public static final String CHUNK_COUNTER = MetricRegistry.name(GelfChunkAggregator.class, (String[])new String[]{"total-chunks"});
    public static final String WAITING_MESSAGES = MetricRegistry.name(GelfChunkAggregator.class, (String[])new String[]{"waiting-messages"});
    public static final String COMPLETE_MESSAGES = MetricRegistry.name(GelfChunkAggregator.class, (String[])new String[]{"complete-messages"});
    public static final String EXPIRED_MESSAGES = MetricRegistry.name(GelfChunkAggregator.class, (String[])new String[]{"expired-messages"});
    public static final String EXPIRED_CHUNKS = MetricRegistry.name(GelfChunkAggregator.class, (String[])new String[]{"expired-chunks"});
    public static final String DUPLICATE_CHUNKS = MetricRegistry.name(GelfChunkAggregator.class, (String[])new String[]{"duplicate-chunks"});
    private final ConcurrentMap<String, ChunkEntry> chunks = Maps.newConcurrentMap();
    private final ConcurrentSkipListSet<ChunkEntry> sortedEvictionSet = new ConcurrentSkipListSet();
    private final Counter chunkCounter;
    private final Counter waitingMessages;
    private final Counter expiredMessages;
    private final Counter expiredChunks;
    private final Counter duplicateChunks;
    private final Counter completeMessages;

    @Inject
    public GelfChunkAggregator(@Named(value="daemonScheduler") ScheduledExecutorService scheduler, MetricRegistry metricRegistry) {
        scheduler.scheduleAtFixedRate(new ChunkEvictionTask(), 5000L, 1000L, TimeUnit.MILLISECONDS);
        this.chunkCounter = metricRegistry.counter(CHUNK_COUNTER);
        this.waitingMessages = metricRegistry.counter(WAITING_MESSAGES);
        this.completeMessages = metricRegistry.counter(COMPLETE_MESSAGES);
        this.expiredMessages = metricRegistry.counter(EXPIRED_MESSAGES);
        this.expiredChunks = metricRegistry.counter(EXPIRED_CHUNKS);
        this.duplicateChunks = metricRegistry.counter(DUPLICATE_CHUNKS);
    }

    @Override
    @Nonnull
    public CodecAggregator.Result addChunk(ByteBuf buffer) {
        ByteBuf aggregatedBuffer;
        byte[] readable = new byte[buffer.readableBytes()];
        buffer.readBytes(readable, buffer.readerIndex(), buffer.readableBytes());
        GELFMessage msg = new GELFMessage(readable);
        switch (msg.getGELFType()) {
            case CHUNKED: {
                try {
                    this.chunkCounter.inc();
                    aggregatedBuffer = this.checkForCompletion(msg);
                    if (aggregatedBuffer == null) {
                        return VALID_EMPTY_RESULT;
                    }
                    break;
                }
                catch (IllegalArgumentException | IllegalStateException | IndexOutOfBoundsException e) {
                    log.debug("Invalid gelf message chunk, dropping message.", (Throwable)e);
                    return INVALID_RESULT;
                }
            }
            case ZLIB: 
            case GZIP: 
            case UNCOMPRESSED: {
                aggregatedBuffer = Unpooled.wrappedBuffer((byte[])readable);
                break;
            }
            case UNSUPPORTED: {
                return INVALID_RESULT;
            }
            default: {
                return INVALID_RESULT;
            }
        }
        return new CodecAggregator.Result(aggregatedBuffer, true);
    }

    @Nullable
    private ByteBuf checkForCompletion(GELFMessage gelfMessage) {
        ChunkEntry entry;
        if (!this.chunks.isEmpty() && log.isDebugEnabled()) {
            log.debug("Dumping GELF chunk map [chunks for {} messages]:\n{}", (Object)this.chunks.size(), (Object)this.humanReadableChunkMap());
        }
        GELFMessageChunk chunk = new GELFMessageChunk(gelfMessage, null);
        int sequenceCount = chunk.getSequenceCount();
        String messageId = chunk.getId();
        ChunkEntry existing = this.chunks.putIfAbsent(messageId, entry = new ChunkEntry(sequenceCount, chunk.getArrival(), messageId));
        if (existing == null) {
            this.waitingMessages.inc();
            this.sortedEvictionSet.add(entry);
        } else {
            entry = existing;
        }
        int sequenceNumber = chunk.getSequenceNumber();
        if (!entry.payloadArray.compareAndSet(sequenceNumber, null, chunk)) {
            log.error("Received duplicate chunk {} for message {} from {}", new Object[]{sequenceNumber, messageId, gelfMessage.getSourceAddress()});
            this.duplicateChunks.inc();
            return null;
        }
        int chunkWatermark = entry.chunkSlotsWritten.incrementAndGet();
        if (chunkWatermark > 128) {
            this.getAndCleanupEntry(messageId);
            throw new IllegalStateException("Maximum number of chunks reached, discarding message");
        }
        if (chunkWatermark == sequenceCount) {
            entry = this.getAndCleanupEntry(messageId);
            byte[][] allChunks = new byte[sequenceCount][];
            for (int i = 0; i < entry.payloadArray.length(); ++i) {
                GELFMessageChunk messageChunk = entry.payloadArray.get(i);
                if (messageChunk == null) {
                    log.debug("Couldn't read chunk {} of message {}, skipping this chunk.", (Object)i, (Object)messageId);
                    continue;
                }
                allChunks[i] = messageChunk.getData();
            }
            this.completeMessages.inc();
            return Unpooled.wrappedBuffer((byte[][])allChunks);
        }
        if (this.isOutdated(entry)) {
            log.debug("Not all chunks of <{}> arrived within {}ms. Dropping chunks.", (Object)messageId, (Object)5000);
            this.expireEntry(messageId);
        }
        return null;
    }

    private void expireEntry(String messageId) {
        ChunkEntry cleanupEntry = this.getAndCleanupEntry(messageId);
        this.expiredMessages.inc();
        this.expiredChunks.inc((long)cleanupEntry.chunkSlotsWritten.get());
    }

    private boolean isOutdated(ChunkEntry entry) {
        return Tools.nowUTC().getMillis() - entry.firstTimestamp > 5000L;
    }

    private ChunkEntry getAndCleanupEntry(String id) {
        ChunkEntry entry = (ChunkEntry)this.chunks.remove(id);
        this.sortedEvictionSet.remove(entry);
        this.waitingMessages.dec();
        return entry;
    }

    private String humanReadableChunkMap() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry entry : this.chunks.entrySet()) {
            sb.append("Message <").append((String)entry.getKey()).append("> ");
            sb.append("\tChunks:\n");
            for (int i = 0; i < ((ChunkEntry)entry.getValue()).payloadArray.length(); ++i) {
                GELFMessageChunk chunk = ((ChunkEntry)entry.getValue()).payloadArray.get(i);
                sb.append("\t\t").append(chunk == null ? "<not arrived yet>" : chunk).append("\n");
            }
        }
        return sb.toString();
    }

    @VisibleForTesting
    class ChunkEvictionTask
    implements Runnable {
        ChunkEvictionTask() {
        }

        @Override
        public void run() {
            try {
                while (!GelfChunkAggregator.this.sortedEvictionSet.isEmpty()) {
                    ChunkEntry oldestChunkEntry = (ChunkEntry)GelfChunkAggregator.this.sortedEvictionSet.first();
                    if (GelfChunkAggregator.this.isOutdated(oldestChunkEntry)) {
                        GelfChunkAggregator.this.expireEntry(oldestChunkEntry.id);
                        continue;
                    }
                    log.debug("No more outdated chunk entries found to evict, leaving cleanup loop.");
                    break;
                }
            }
            catch (Exception e) {
                log.warn("Error while expiring GELF chunk entries", (Throwable)e);
            }
        }
    }

    @VisibleForTesting
    static class ChunkEntry
    implements Comparable<ChunkEntry> {
        protected final AtomicInteger chunkSlotsWritten = new AtomicInteger(0);
        protected final long firstTimestamp;
        protected final AtomicReferenceArray<GELFMessageChunk> payloadArray;
        protected final String id;

        public ChunkEntry(int chunkCount, long firstTimestamp, String id) {
            this.payloadArray = new AtomicReferenceArray(chunkCount);
            this.firstTimestamp = firstTimestamp;
            this.id = Objects.requireNonNull(id);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ChunkEntry that = (ChunkEntry)o;
            if (!this.id.equals(that.id)) {
                return false;
            }
            return this.firstTimestamp == that.firstTimestamp;
        }

        public int hashCode() {
            return Objects.hash(this.id, this.firstTimestamp);
        }

        @Override
        public int compareTo(@Nonnull ChunkEntry o) {
            if (this.equals(o)) {
                return 0;
            }
            if (this.firstTimestamp == o.firstTimestamp) {
                return this.id.compareTo(o.id);
            }
            return this.firstTimestamp < o.firstTimestamp ? -1 : 1;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("id", (Object)this.id).add("firstTimestamp", this.firstTimestamp).add("chunkSlotsWritten", (Object)this.chunkSlotsWritten).toString();
        }
    }
}

