/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.channel;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flume.ChannelException;
import org.apache.flume.ChannelFullException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.annotations.Recyclable;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Preconditions;

@Recyclable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MemoryChannel
extends BasicChannelSemantics {
    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
    private static final Integer defaultCapacity = 100;
    private static final Integer defaultTransCapacity = 100;
    private static final double byteCapacitySlotSize = 100.0;
    private static final Long defaultByteCapacity = (long)((double)Runtime.getRuntime().maxMemory() * 0.8);
    private static final Integer defaultByteCapacityBufferPercentage = 20;
    private static final Integer defaultKeepAlive = 3;
    private Object queueLock = new Object();
    @GuardedBy(value="queueLock")
    private LinkedBlockingDeque<Event> queue;
    private Semaphore queueRemaining;
    private Semaphore queueStored;
    private volatile Integer transCapacity;
    private volatile int keepAlive;
    private volatile int byteCapacity;
    private volatile int lastByteCapacity;
    private volatile int byteCapacityBufferPercentage;
    private Semaphore bytesRemaining;
    private ChannelCounter channelCounter;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void configure(Context context) {
        Integer capacity = null;
        try {
            capacity = context.getInteger("capacity", defaultCapacity);
        }
        catch (NumberFormatException e) {
            capacity = defaultCapacity;
            LOGGER.warn("Invalid capacity specified, initializing channel to default capacity of {}", (Object)defaultCapacity);
        }
        if (capacity <= 0) {
            capacity = defaultCapacity;
            LOGGER.warn("Invalid capacity specified, initializing channel to default capacity of {}", (Object)defaultCapacity);
        }
        try {
            this.transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
        }
        catch (NumberFormatException e) {
            this.transCapacity = defaultTransCapacity;
            LOGGER.warn("Invalid transation capacity specified, initializing channel to default capacity of {}", (Object)defaultTransCapacity);
        }
        if (this.transCapacity <= 0) {
            this.transCapacity = defaultTransCapacity;
            LOGGER.warn("Invalid transation capacity specified, initializing channel to default capacity of {}", (Object)defaultTransCapacity);
        }
        Preconditions.checkState((this.transCapacity <= capacity ? 1 : 0) != 0, (Object)"Transaction Capacity of Memory Channel cannot be higher than the capacity.");
        try {
            this.byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);
        }
        catch (NumberFormatException e) {
            this.byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
        }
        try {
            this.byteCapacity = (int)((double)context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1.0 - (double)this.byteCapacityBufferPercentage * 0.01) / 100.0);
            if (this.byteCapacity < 1) {
                this.byteCapacity = Integer.MAX_VALUE;
            }
        }
        catch (NumberFormatException e) {
            this.byteCapacity = (int)((double)defaultByteCapacity.longValue() * (1.0 - (double)this.byteCapacityBufferPercentage * 0.01) / 100.0);
        }
        try {
            this.keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
        }
        catch (NumberFormatException e) {
            this.keepAlive = defaultKeepAlive;
        }
        if (this.queue != null) {
            try {
                this.resizeQueue(capacity);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } else {
            Object e = this.queueLock;
            synchronized (e) {
                this.queue = new LinkedBlockingDeque(capacity);
                this.queueRemaining = new Semaphore(capacity);
                this.queueStored = new Semaphore(0);
            }
        }
        if (this.bytesRemaining == null) {
            this.bytesRemaining = new Semaphore(this.byteCapacity);
            this.lastByteCapacity = this.byteCapacity;
        } else if (this.byteCapacity > this.lastByteCapacity) {
            this.bytesRemaining.release(this.byteCapacity - this.lastByteCapacity);
            this.lastByteCapacity = this.byteCapacity;
        } else {
            try {
                if (!this.bytesRemaining.tryAcquire(this.lastByteCapacity - this.byteCapacity, this.keepAlive, TimeUnit.SECONDS)) {
                    LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");
                } else {
                    this.lastByteCapacity = this.byteCapacity;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.channelCounter == null) {
            this.channelCounter = new ChannelCounter(this.getName());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resizeQueue(int capacity) throws InterruptedException {
        int oldCapacity;
        Object object = this.queueLock;
        synchronized (object) {
            oldCapacity = this.queue.size() + this.queue.remainingCapacity();
        }
        if (oldCapacity == capacity) {
            return;
        }
        if (oldCapacity > capacity) {
            if (!this.queueRemaining.tryAcquire(oldCapacity - capacity, this.keepAlive, TimeUnit.SECONDS)) {
                LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
            } else {
                object = this.queueLock;
                synchronized (object) {
                    LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
                    newQueue.addAll(this.queue);
                    this.queue = newQueue;
                }
            }
        } else {
            object = this.queueLock;
            synchronized (object) {
                LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
                newQueue.addAll(this.queue);
                this.queue = newQueue;
            }
            this.queueRemaining.release(capacity - oldCapacity);
        }
    }

    @Override
    public synchronized void start() {
        this.channelCounter.start();
        this.channelCounter.setChannelSize(this.queue.size());
        this.channelCounter.setChannelCapacity(this.queue.size() + this.queue.remainingCapacity());
        super.start();
    }

    @Override
    public synchronized void stop() {
        this.channelCounter.setChannelSize(this.queue.size());
        this.channelCounter.stop();
        super.stop();
    }

    @Override
    protected BasicTransactionSemantics createTransaction() {
        return new MemoryTransaction(this.transCapacity, this.channelCounter);
    }

    private long estimateEventSize(Event event) {
        byte[] body = event.getBody();
        if (body != null && body.length != 0) {
            return body.length;
        }
        return 1L;
    }

    private class MemoryTransaction
    extends BasicTransactionSemantics {
        private LinkedBlockingDeque<Event> takeList;
        private LinkedBlockingDeque<Event> putList;
        private final ChannelCounter channelCounter;
        private int putByteCounter = 0;
        private int takeByteCounter = 0;

        public MemoryTransaction(int transCapacity, ChannelCounter counter) {
            this.putList = new LinkedBlockingDeque(transCapacity);
            this.takeList = new LinkedBlockingDeque(transCapacity);
            this.channelCounter = counter;
        }

        @Override
        protected void doPut(Event event) throws InterruptedException {
            this.channelCounter.incrementEventPutAttemptCount();
            int eventByteSize = (int)Math.ceil((double)MemoryChannel.this.estimateEventSize(event) / 100.0);
            if (!this.putList.offer(event)) {
                throw new ChannelException("Put queue for MemoryTransaction of capacity " + this.putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count");
            }
            this.putByteCounter += eventByteSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected Event doTake() throws InterruptedException {
            Event event;
            this.channelCounter.incrementEventTakeAttemptCount();
            if (this.takeList.remainingCapacity() == 0) {
                throw new ChannelException("Take list for MemoryTransaction, capacity " + this.takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count");
            }
            if (!MemoryChannel.this.queueStored.tryAcquire(MemoryChannel.this.keepAlive, TimeUnit.SECONDS)) {
                return null;
            }
            Object object = MemoryChannel.this.queueLock;
            synchronized (object) {
                event = (Event)MemoryChannel.this.queue.poll();
            }
            Preconditions.checkNotNull((Object)event, (Object)"Queue.poll returned NULL despite semaphore signalling existence of entry");
            this.takeList.put(event);
            int eventByteSize = (int)Math.ceil((double)MemoryChannel.this.estimateEventSize(event) / 100.0);
            this.takeByteCounter += eventByteSize;
            return event;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void doCommit() throws InterruptedException {
            int remainingChange = this.takeList.size() - this.putList.size();
            if (remainingChange < 0) {
                if (!MemoryChannel.this.bytesRemaining.tryAcquire(this.putByteCounter, MemoryChannel.this.keepAlive, TimeUnit.SECONDS)) {
                    throw new ChannelException("Cannot commit transaction. Byte capacity allocated to store event body " + (double)MemoryChannel.this.byteCapacity * 100.0 + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources");
                }
                if (!MemoryChannel.this.queueRemaining.tryAcquire(-remainingChange, MemoryChannel.this.keepAlive, TimeUnit.SECONDS)) {
                    MemoryChannel.this.bytesRemaining.release(this.putByteCounter);
                    throw new ChannelFullException("Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight");
                }
            }
            int puts = this.putList.size();
            int takes = this.takeList.size();
            Object object = MemoryChannel.this.queueLock;
            synchronized (object) {
                if (puts > 0) {
                    while (!this.putList.isEmpty()) {
                        if (MemoryChannel.this.queue.offer(this.putList.removeFirst())) continue;
                        throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
                    }
                }
                this.putList.clear();
                this.takeList.clear();
            }
            MemoryChannel.this.bytesRemaining.release(this.takeByteCounter);
            this.takeByteCounter = 0;
            this.putByteCounter = 0;
            MemoryChannel.this.queueStored.release(puts);
            if (remainingChange > 0) {
                MemoryChannel.this.queueRemaining.release(remainingChange);
            }
            if (puts > 0) {
                this.channelCounter.addToEventPutSuccessCount(puts);
            }
            if (takes > 0) {
                this.channelCounter.addToEventTakeSuccessCount(takes);
            }
            this.channelCounter.setChannelSize(MemoryChannel.this.queue.size());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void doRollback() {
            int takes = this.takeList.size();
            Object object = MemoryChannel.this.queueLock;
            synchronized (object) {
                Preconditions.checkState((MemoryChannel.this.queue.remainingCapacity() >= this.takeList.size() ? 1 : 0) != 0, (Object)"Not enough space in memory channel queue to rollback takes. This should never happen, please report");
                while (!this.takeList.isEmpty()) {
                    MemoryChannel.this.queue.addFirst(this.takeList.removeLast());
                }
                this.putList.clear();
            }
            MemoryChannel.this.bytesRemaining.release(this.putByteCounter);
            this.putByteCounter = 0;
            this.takeByteCounter = 0;
            MemoryChannel.this.queueStored.release(takes);
            this.channelCounter.setChannelSize(MemoryChannel.this.queue.size());
        }
    }
}

