/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.hive.HiveDelimitedTextSerializer;
import org.apache.flume.sink.hive.HiveEventSerializer;
import org.apache.flume.sink.hive.HiveJsonSerializer;
import org.apache.flume.sink.hive.HiveWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveSink
extends AbstractSink
implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
    private static final int DEFAULT_MAXOPENCONNECTIONS = 500;
    private static final int DEFAULT_TXNSPERBATCH = 100;
    private static final int DEFAULT_BATCHSIZE = 15000;
    private static final int DEFAULT_CALLTIMEOUT = 10000;
    private static final int DEFAULT_IDLETIMEOUT = 0;
    private static final int DEFAULT_HEARTBEATINTERVAL = 240;
    private Map<HiveEndPoint, HiveWriter> allWriters;
    private SinkCounter sinkCounter;
    private volatile int idleTimeout;
    private String metaStoreUri;
    private String proxyUser;
    private String database;
    private String table;
    private List<String> partitionVals;
    private Integer txnsPerBatchAsk;
    private Integer batchSize;
    private Integer maxOpenConnections;
    private boolean autoCreatePartitions;
    private String serializerType;
    private HiveEventSerializer serializer;
    private Integer callTimeout;
    private Integer heartBeatInterval;
    private ExecutorService callTimeoutPool;
    private boolean useLocalTime;
    private TimeZone timeZone;
    private boolean needRounding;
    private int roundUnit;
    private Integer roundValue;
    private Timer heartBeatTimer = new Timer();
    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);

    @VisibleForTesting
    Map<HiveEndPoint, HiveWriter> getAllWriters() {
        return this.allWriters;
    }

    public void configure(Context context) {
        this.metaStoreUri = context.getString("hive.metastore");
        if (this.metaStoreUri == null) {
            throw new IllegalArgumentException("hive.metastore config setting is not specified for sink " + this.getName());
        }
        if (this.metaStoreUri.equalsIgnoreCase("null")) {
            this.metaStoreUri = null;
        }
        this.proxyUser = null;
        this.database = context.getString("hive.database");
        if (this.database == null) {
            throw new IllegalArgumentException("hive.database config setting is not specified for sink " + this.getName());
        }
        this.table = context.getString("hive.table");
        if (this.table == null) {
            throw new IllegalArgumentException("hive.table config setting is not specified for sink " + this.getName());
        }
        String partitions = context.getString("hive.partition");
        if (partitions != null) {
            this.partitionVals = Arrays.asList(partitions.split(","));
        }
        this.txnsPerBatchAsk = context.getInteger("hive.txnsPerBatchAsk", Integer.valueOf(100));
        if (this.txnsPerBatchAsk < 0) {
            LOG.warn(this.getName() + ". hive.txnsPerBatchAsk must be  positive number. Defaulting to " + 100);
            this.txnsPerBatchAsk = 100;
        }
        this.batchSize = context.getInteger("batchSize", Integer.valueOf(15000));
        if (this.batchSize < 0) {
            LOG.warn(this.getName() + ". batchSize must be  positive number. Defaulting to " + 15000);
            this.batchSize = 15000;
        }
        this.idleTimeout = context.getInteger("idleTimeout", Integer.valueOf(0));
        if (this.idleTimeout < 0) {
            LOG.warn(this.getName() + ". idleTimeout must be  positive number. Defaulting to " + 0);
            this.idleTimeout = 0;
        }
        this.callTimeout = context.getInteger("callTimeout", Integer.valueOf(10000));
        if (this.callTimeout < 0) {
            LOG.warn(this.getName() + ". callTimeout must be  positive number. Defaulting to " + 10000);
            this.callTimeout = 10000;
        }
        this.heartBeatInterval = context.getInteger("heartBeatInterval", Integer.valueOf(240));
        if (this.heartBeatInterval < 0) {
            LOG.warn(this.getName() + ". heartBeatInterval must be  positive number. Defaulting to " + 240);
            this.heartBeatInterval = 240;
        }
        this.maxOpenConnections = context.getInteger("maxOpenConnections", Integer.valueOf(500));
        this.autoCreatePartitions = context.getBoolean("autoCreatePartitions", Boolean.valueOf(true));
        this.useLocalTime = context.getBoolean("useLocalTimeStamp", Boolean.valueOf(false));
        String tzName = context.getString("timeZone");
        this.timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
        this.needRounding = context.getBoolean("round", Boolean.valueOf(false));
        String unit = context.getString("roundUnit", "minute");
        if (unit.equalsIgnoreCase("hour")) {
            this.roundUnit = 11;
        } else if (unit.equalsIgnoreCase("minute")) {
            this.roundUnit = 12;
        } else if (unit.equalsIgnoreCase("second")) {
            this.roundUnit = 13;
        } else {
            LOG.warn(this.getName() + ". Rounding unit is not valid, please set one of " + "minute, hour or second. Rounding will be disabled");
            this.needRounding = false;
        }
        this.roundValue = context.getInteger("roundValue", Integer.valueOf(1));
        if (this.roundUnit == 13 || this.roundUnit == 12) {
            Preconditions.checkArgument((this.roundValue > 0 && this.roundValue <= 60 ? 1 : 0) != 0, (Object)"Round value must be > 0 and <= 60");
        } else if (this.roundUnit == 11) {
            Preconditions.checkArgument((this.roundValue > 0 && this.roundValue <= 24 ? 1 : 0) != 0, (Object)"Round value must be > 0 and <= 24");
        }
        this.serializerType = context.getString("serializer", "");
        if (this.serializerType.isEmpty()) {
            throw new IllegalArgumentException("serializer config setting is not specified for sink " + this.getName());
        }
        this.serializer = this.createSerializer(this.serializerType);
        this.serializer.configure(context);
        Preconditions.checkArgument((this.batchSize > 0 ? 1 : 0) != 0, (Object)"batchSize must be greater than 0");
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
    }

    @VisibleForTesting
    protected SinkCounter getCounter() {
        return this.sinkCounter;
    }

    private HiveEventSerializer createSerializer(String serializerName) {
        if (serializerName.compareToIgnoreCase("DELIMITED") == 0 || serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) {
            return new HiveDelimitedTextSerializer();
        }
        if (serializerName.compareToIgnoreCase("JSON") == 0 || serializerName.compareTo(HiveJsonSerializer.class.getName()) == 0) {
            return new HiveJsonSerializer();
        }
        try {
            return (HiveEventSerializer)Class.forName(serializerName).newInstance();
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Unable to instantiate serializer: " + serializerName + " on sink: " + this.getName(), e);
        }
    }

    public Sink.Status process() throws EventDeliveryException {
        Channel channel = this.getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        boolean success = false;
        try {
            if (this.timeToSendHeartBeat.compareAndSet(true, false)) {
                this.enableHeartBeatOnAllWriters();
            }
            int txnEventCount = this.drainOneBatch(channel);
            transaction.commit();
            success = true;
            if (txnEventCount < 1) {
                Sink.Status status = Sink.Status.BACKOFF;
                return status;
            }
            Sink.Status status = Sink.Status.READY;
            return status;
        }
        catch (InterruptedException err) {
            LOG.warn(this.getName() + ": Thread was interrupted.", (Throwable)err);
            Sink.Status status = Sink.Status.BACKOFF;
            return status;
        }
        catch (Exception e) {
            throw new EventDeliveryException((Throwable)e);
        }
        finally {
            if (!success) {
                transaction.rollback();
            }
            transaction.close();
        }
    }

    private int drainOneBatch(Channel channel) throws HiveWriter.Failure, InterruptedException {
        try {
            Event event;
            int txnEventCount;
            HashMap activeWriters = Maps.newHashMap();
            for (txnEventCount = 0; txnEventCount < this.batchSize && (event = channel.take()) != null; ++txnEventCount) {
                HiveEndPoint endPoint = this.makeEndPoint(this.metaStoreUri, this.database, this.table, this.partitionVals, event.getHeaders(), this.timeZone, this.needRounding, this.roundUnit, this.roundValue, this.useLocalTime);
                HiveWriter writer = this.getOrCreateWriter(activeWriters, endPoint);
                LOG.debug("{} : Writing event to {}", (Object)this.getName(), (Object)endPoint);
                writer.write(event);
            }
            if (txnEventCount == 0) {
                this.sinkCounter.incrementBatchEmptyCount();
            } else if (txnEventCount == this.batchSize) {
                this.sinkCounter.incrementBatchCompleteCount();
            } else {
                this.sinkCounter.incrementBatchUnderflowCount();
            }
            this.sinkCounter.addToEventDrainAttemptCount((long)txnEventCount);
            for (HiveWriter writer : activeWriters.values()) {
                writer.flush(true);
            }
            this.sinkCounter.addToEventDrainSuccessCount((long)txnEventCount);
            return txnEventCount;
        }
        catch (HiveWriter.Failure e) {
            LOG.warn(this.getName() + " : " + e.getMessage(), (Throwable)e);
            this.abortAllWriters();
            this.closeAllWriters();
            throw e;
        }
    }

    private void enableHeartBeatOnAllWriters() {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.setHearbeatNeeded();
        }
    }

    private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters, HiveEndPoint endPoint) throws HiveWriter.ConnectException, InterruptedException {
        try {
            HiveWriter writer = this.allWriters.get(endPoint);
            if (writer == null) {
                int retired;
                LOG.info(this.getName() + ": Creating Writer to Hive end point : " + endPoint);
                writer = new HiveWriter(endPoint, this.txnsPerBatchAsk, this.autoCreatePartitions, this.callTimeout.intValue(), this.callTimeoutPool, this.proxyUser, this.serializer, this.sinkCounter);
                this.sinkCounter.incrementConnectionCreatedCount();
                if (this.allWriters.size() > this.maxOpenConnections && (retired = this.closeIdleWriters()) == 0) {
                    this.closeEldestWriter();
                }
                this.allWriters.put(endPoint, writer);
                activeWriters.put(endPoint, writer);
            } else if (activeWriters.get(endPoint) == null) {
                activeWriters.put(endPoint, writer);
            }
            return writer;
        }
        catch (HiveWriter.ConnectException e) {
            this.sinkCounter.incrementConnectionFailedCount();
            throw e;
        }
    }

    private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table, List<String> partVals, Map<String, String> headers, TimeZone timeZone, boolean needRounding, int roundUnit, Integer roundValue, boolean useLocalTime) {
        if (partVals == null) {
            return new HiveEndPoint(metaStoreUri, database, table, null);
        }
        ArrayList realPartVals = Lists.newArrayList();
        for (String partVal : partVals) {
            realPartVals.add(BucketPath.escapeString((String)partVal, headers, (TimeZone)timeZone, (boolean)needRounding, (int)roundUnit, (int)roundValue, (boolean)useLocalTime));
        }
        return new HiveEndPoint(metaStoreUri, database, table, (List)realPartVals);
    }

    private void closeEldestWriter() throws InterruptedException {
        long oldestTimeStamp = System.currentTimeMillis();
        HiveEndPoint eldest = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() >= oldestTimeStamp) continue;
            eldest = entry.getKey();
            oldestTimeStamp = entry.getValue().getLastUsed();
        }
        try {
            this.sinkCounter.incrementConnectionCreatedCount();
            LOG.info(this.getName() + ": Closing least used Writer to Hive EndPoint : " + eldest);
            this.allWriters.remove(eldest).close();
        }
        catch (InterruptedException e) {
            LOG.warn(this.getName() + ": Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
            throw e;
        }
    }

    private int closeIdleWriters() throws InterruptedException {
        int count = 0;
        long now = System.currentTimeMillis();
        ArrayList retirees = Lists.newArrayList();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (now - entry.getValue().getLastUsed() <= (long)this.idleTimeout) continue;
            ++count;
            retirees.add(entry.getKey());
        }
        for (HiveEndPoint ep : retirees) {
            this.sinkCounter.incrementConnectionClosedCount();
            LOG.info(this.getName() + ": Closing idle Writer to Hive end point : {}", (Object)ep);
            this.allWriters.remove(ep).close();
        }
        return count;
    }

    private void closeAllWriters() throws InterruptedException {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            entry.getValue().close();
        }
        this.allWriters.clear();
    }

    private void abortAllWriters() throws InterruptedException {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            entry.getValue().abort();
        }
    }

    public void stop() {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                HiveWriter w = entry.getValue();
                LOG.info("Closing connection to {}", (Object)w);
                w.closeConnection();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
        this.callTimeoutPool.shutdown();
        try {
            while (!this.callTimeoutPool.isTerminated()) {
                this.callTimeoutPool.awaitTermination(Math.max(10000, this.callTimeout), TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException ex) {
            LOG.warn(this.getName() + ":Shutdown interrupted on " + this.callTimeoutPool, (Throwable)ex);
        }
        this.callTimeoutPool = null;
        this.allWriters.clear();
        this.allWriters = null;
        this.sinkCounter.stop();
        super.stop();
        LOG.info("Hive Sink {} stopped", (Object)this.getName());
    }

    public void start() {
        String timeoutName = "hive-" + this.getName() + "-call-runner-%d";
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
        this.allWriters = Maps.newHashMap();
        this.sinkCounter.start();
        super.start();
        this.setupHeartBeatTimer();
        LOG.info(this.getName() + ": Hive Sink {} started", (Object)this.getName());
    }

    private void setupHeartBeatTimer() {
        if (this.heartBeatInterval > 0) {
            this.heartBeatTimer.schedule(new TimerTask(){

                @Override
                public void run() {
                    HiveSink.this.timeToSendHeartBeat.set(true);
                    HiveSink.this.setupHeartBeatTimer();
                }
            }, this.heartBeatInterval * 1000);
        }
    }

    public String toString() {
        return "{ Sink type:" + ((Object)((Object)this)).getClass().getSimpleName() + ", name:" + this.getName() + " }";
    }
}

