/*
 * Decompiled with CFR 0.152.
 */
package io.fluo.recipes.accumulo.export;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.commons.configuration.Configuration;

public class SharedBatchWriter {
    private static LinkedBlockingQueue<Mutations> exportQueue = null;
    private static Map<String, SharedBatchWriter> exporters = new HashMap<String, SharedBatchWriter>();

    public SharedBatchWriter(String instanceName, String zookeepers, String user, String password, String table) throws Exception {
        exportQueue = new LinkedBlockingQueue(10000);
        Thread queueProcessingTask = new Thread(new ExportTask(instanceName, zookeepers, user, password, table));
        queueProcessingTask.setDaemon(true);
        queueProcessingTask.start();
    }

    public static synchronized SharedBatchWriter getInstance(String instanceName, String zookeepers, String user, String password, String table) throws Exception {
        String key = instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table;
        SharedBatchWriter ret = exporters.get(key);
        if (ret == null) {
            ret = new SharedBatchWriter(instanceName, zookeepers, user, password, table);
            exporters.put(key, ret);
        }
        return ret;
    }

    public void write(Collection<Mutation> mutations) {
        Mutations work = new Mutations(mutations);
        exportQueue.add(work);
        try {
            work.cdl.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static class ExportTask
    implements Runnable {
        private BatchWriter bw;

        public ExportTask(String instanceName, String zookeepers, String user, String password, String table) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
            ZooKeeperInstance zki = new ZooKeeperInstance((Configuration)new ClientConfiguration(new Configuration[0]).withInstance(instanceName).withZkHosts(zookeepers));
            Connector conn = zki.getConnector(user, (AuthenticationToken)new PasswordToken((CharSequence)password));
            try {
                this.bw = conn.createBatchWriter(table, new BatchWriterConfig());
            }
            catch (TableNotFoundException tnfe) {
                try {
                    conn.tableOperations().create(table);
                }
                catch (TableExistsException tableExistsException) {
                    // empty catch block
                }
                this.bw = conn.createBatchWriter(table, new BatchWriterConfig());
            }
        }

        @Override
        public void run() {
            ArrayList exports = new ArrayList();
            try {
                block2: while (true) {
                    exports.clear();
                    exports.add(exportQueue.take());
                    exportQueue.drainTo(exports);
                    for (Mutations ml : exports) {
                        this.bw.addMutations(ml.mutations);
                    }
                    this.bw.flush();
                    Iterator iterator = exports.iterator();
                    while (true) {
                        Mutations ml;
                        if (!iterator.hasNext()) continue block2;
                        ml = (Mutations)iterator.next();
                        ml.cdl.countDown();
                    }
                    break;
                }
            }
            catch (InterruptedException | MutationsRejectedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class Mutations {
        List<Mutation> mutations;
        CountDownLatch cdl = new CountDownLatch(1);

        public Mutations(Collection<Mutation> mutations) {
            this.mutations = new ArrayList<Mutation>(mutations);
        }
    }
}

