/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.dynamodb.bootstrap;

import com.amazonaws.dynamodb.bootstrap.AbstractLogConsumer;
import com.amazonaws.dynamodb.bootstrap.BlockingQueueWorker;
import com.amazonaws.dynamodb.bootstrap.DynamoDBEntryWithSize;
import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class BlockingQueueConsumer
extends AbstractLogConsumer {
    private BlockingQueue<DynamoDBEntryWithSize> queue = new ArrayBlockingQueue<DynamoDBEntryWithSize>(20);

    public BlockingQueueConsumer(int numThreads) {
        int numProcessors = Runtime.getRuntime().availableProcessors();
        if (numProcessors > numThreads) {
            numThreads = numProcessors;
        }
        this.threadPool = Executors.newFixedThreadPool(numThreads);
        this.exec = new ExecutorCompletionService(this.threadPool);
    }

    @Override
    public Future<Void> writeResult(SegmentedScanResult result) {
        Future<Void> jobSubmission = null;
        try {
            jobSubmission = this.exec.submit(new BlockingQueueWorker(this.queue, result));
        }
        catch (NullPointerException npe) {
            throw new NullPointerException("Thread pool not initialized for LogStashExecutor");
        }
        return jobSubmission;
    }

    public BlockingQueue<DynamoDBEntryWithSize> getQueue() {
        return this.queue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown(boolean awaitTermination) {
        super.shutdown(awaitTermination);
        boolean added = false;
        boolean interrupted = false;
        try {
            do {
                try {
                    this.queue.put(new DynamoDBEntryWithSize(null, -1));
                    added = true;
                }
                catch (InterruptedException e) {
                    interrupted = true;
                }
            } while (!added);
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

