/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.dynamodb.bootstrap;

import com.amazonaws.dynamodb.bootstrap.AbstractLogConsumer;
import com.amazonaws.dynamodb.bootstrap.DynamoDBConsumerWorker;
import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class DynamoDBConsumer
extends AbstractLogConsumer {
    private final AmazonDynamoDBClient client;
    private final String tableName;
    private final RateLimiter rateLimiter;

    public DynamoDBConsumer(AmazonDynamoDBClient client, String tableName, double rateLimit, ExecutorService exec) {
        this.client = client;
        this.tableName = tableName;
        this.rateLimiter = RateLimiter.create((double)rateLimit);
        this.threadPool = exec;
        this.exec = new ExecutorCompletionService(this.threadPool);
    }

    @Override
    public Future<Void> writeResult(SegmentedScanResult result) {
        Future<Void> jobSubmission = null;
        List<BatchWriteItemRequest> batches = DynamoDBConsumer.splitResultIntoBatches(result.getScanResult(), this.tableName);
        Iterator<BatchWriteItemRequest> batchesIterator = batches.iterator();
        while (batchesIterator.hasNext()) {
            try {
                jobSubmission = this.exec.submit(new DynamoDBConsumerWorker(batchesIterator.next(), this.client, this.rateLimiter, this.tableName));
            }
            catch (NullPointerException npe) {
                throw new NullPointerException("Thread pool not initialized for LogStashExecutor");
            }
        }
        return jobSubmission;
    }

    public static List<BatchWriteItemRequest> splitResultIntoBatches(ScanResult result, String tableName) {
        LinkedList<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>();
        Iterator it = result.getItems().iterator();
        BatchWriteItemRequest req = new BatchWriteItemRequest().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
        LinkedList<WriteRequest> writeRequests = new LinkedList<WriteRequest>();
        int i = 0;
        while (it.hasNext()) {
            PutRequest put = new PutRequest((Map)it.next());
            writeRequests.add(new WriteRequest(put));
            if (++i != 25) continue;
            req.addRequestItemsEntry(tableName, writeRequests);
            batches.add(req);
            req = new BatchWriteItemRequest().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
            writeRequests = new LinkedList();
            i = 0;
        }
        if (i > 0) {
            req.addRequestItemsEntry(tableName, writeRequests);
            batches.add(req);
        }
        return batches;
    }
}

