/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.queueing;

import com.google.common.collect.ImmutableMap;
import com.squareup.tape2.QueueFile;
import com.wavefront.agent.data.DataSubmissionTask;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.queueing.ConcurrentQueueFile;
import com.wavefront.agent.queueing.ConcurrentShardedQueueFile;
import com.wavefront.agent.queueing.FileBasedTaskQueue;
import com.wavefront.agent.queueing.InstrumentedTaskQueueDelegate;
import com.wavefront.agent.queueing.QueueFile;
import com.wavefront.agent.queueing.RetryTaskConverter;
import com.wavefront.agent.queueing.TapeQueueFile;
import com.wavefront.agent.queueing.TaskConverter;
import com.wavefront.agent.queueing.TaskQueue;
import com.wavefront.agent.queueing.TaskQueueFactory;
import com.wavefront.agent.queueing.TaskQueueStub;
import com.wavefront.common.Pair;
import com.wavefront.common.TaggedMetricName;
import com.wavefront.metrics.ExpectedAgentMetric;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

public class TaskQueueFactoryImpl
implements TaskQueueFactory {
    private static final Logger logger = Logger.getLogger(TaskQueueFactoryImpl.class.getCanonicalName());
    private final Map<HandlerKey, Map<Integer, TaskQueue<?>>> taskQueues = new ConcurrentHashMap();
    private final List<Pair<FileChannel, FileLock>> taskQueuesLocks = new ArrayList<Pair<FileChannel, FileLock>>();
    private final String bufferFile;
    private final boolean purgeBuffer;
    private final boolean disableSharding;
    private final int shardSize;
    private static final Counter bytesWritten = Metrics.newCounter((MetricName)new TaggedMetricName("buffer", "bytes-written"));
    private static final Counter ioTimeWrites = Metrics.newCounter((MetricName)new TaggedMetricName("buffer", "io-time-writes"));

    public TaskQueueFactoryImpl(final String bufferFile, boolean purgeBuffer, boolean disableSharding, int shardSize) {
        this.bufferFile = bufferFile;
        this.purgeBuffer = purgeBuffer;
        this.disableSharding = disableSharding;
        this.shardSize = shardSize;
        Metrics.newGauge((MetricName)ExpectedAgentMetric.BUFFER_BYTES_LEFT.metricName, (Gauge)new Gauge<Long>(){

            public Long value() {
                try {
                    File bufferDirectory;
                    long availableBytes = TaskQueueFactoryImpl.this.taskQueues.values().stream().flatMap(x -> x.values().stream()).map(TaskQueue::getAvailableBytes).filter(Objects::nonNull).mapToLong(x -> x).sum();
                    for (bufferDirectory = new File(bufferFile).getAbsoluteFile(); bufferDirectory != null && bufferDirectory.getUsableSpace() == 0L; bufferDirectory = bufferDirectory.getParentFile()) {
                    }
                    if (bufferDirectory != null) {
                        return bufferDirectory.getUsableSpace() + availableBytes;
                    }
                }
                catch (Throwable t) {
                    logger.warning("cannot compute remaining space in buffer file partition: " + t);
                }
                return null;
            }
        });
    }

    @Override
    public <T extends DataSubmissionTask<T>> TaskQueue<T> getTaskQueue(@Nonnull HandlerKey key, int threadNum) {
        TaskQueue<T> taskQueue = this.taskQueues.computeIfAbsent(key, x -> new TreeMap()).computeIfAbsent(threadNum, x -> this.createTaskQueue(key, threadNum));
        try {
            taskQueue.peek();
        }
        catch (IllegalStateException e) {
            taskQueue = this.createTaskQueue(key, threadNum);
            this.taskQueues.get(key).put(threadNum, taskQueue);
        }
        return taskQueue;
    }

    private <T extends DataSubmissionTask<T>> TaskQueue<T> createTaskQueue(@Nonnull HandlerKey handlerKey, int threadNum) {
        String fileName = this.bufferFile + "." + handlerKey.getEntityType().toString() + "." + handlerKey.getHandle() + "." + threadNum;
        String lockFileName = fileName + ".lck";
        String spoolFileName = fileName + ".spool";
        try {
            File lockFile = new File(lockFileName);
            FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
            FileLock lock = channel.tryLock();
            logger.fine(() -> "lockFile: " + lockFile);
            if (lock == null) {
                channel.close();
                throw new OverlappingFileLockException();
            }
            logger.fine(() -> "lock isValid: " + lock.isValid() + " - isShared: " + lock.isShared());
            this.taskQueuesLocks.add((Pair<FileChannel, FileLock>)new Pair((Object)channel, (Object)lock));
        }
        catch (SecurityException e) {
            logger.severe("Error writing to the buffer lock file " + lockFileName + " - please make sure write permissions are correct for this file path and restart the proxy: " + e);
            return new TaskQueueStub();
        }
        catch (OverlappingFileLockException e) {
            logger.severe("Error requesting exclusive access to the buffer lock file " + lockFileName + " - please make sure that no other processes access this file and restart the proxy: " + e);
            return new TaskQueueStub();
        }
        catch (IOException e) {
            logger.severe("Error requesting access to buffer lock file " + lockFileName + " Channel is closed or an I/O error has occurred - please restart the proxy: " + e);
            return new TaskQueueStub();
        }
        try {
            File buffer = new File(spoolFileName);
            if (this.purgeBuffer && buffer.delete()) {
                logger.warning("Retry buffer has been purged: " + spoolFileName);
            }
            BiConsumer<Integer, Long> statsUpdater = (bytes, millis) -> {
                bytesWritten.inc((long)bytes.intValue());
                ioTimeWrites.inc(millis.longValue());
            };
            QueueFile queueFile = this.disableSharding ? new ConcurrentQueueFile(new TapeQueueFile(new QueueFile.Builder(new File(spoolFileName)).build(), statsUpdater)) : new ConcurrentShardedQueueFile(spoolFileName, ".spool", this.shardSize * 1024 * 1024, s -> new TapeQueueFile(new QueueFile.Builder(new File(s)).build(), statsUpdater));
            return new InstrumentedTaskQueueDelegate(new FileBasedTaskQueue(queueFile, new RetryTaskConverter(handlerKey.getHandle(), TaskConverter.CompressionType.LZ4)), "buffer", (Map<String, String>)ImmutableMap.of((Object)"port", (Object)handlerKey.getHandle()), handlerKey.getEntityType());
        }
        catch (Exception e) {
            logger.severe("WF-006: Unable to open or create queue file " + spoolFileName + ": " + e.getMessage());
            return new TaskQueueStub();
        }
    }
}

