/*
 * Decompiled with CFR 0.152.
 */
package org.rhq.cassandra.schema;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Query;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;
import org.rhq.cassandra.schema.DBConnectionFactory;
import org.rhq.cassandra.schema.SchemaUpdateThreadFactory;
import org.rhq.cassandra.schema.Step;
import org.rhq.core.util.exception.ThrowableUtil;
import org.rhq.core.util.jdbc.JDBCUtil;

public class PopulateCacheIndex
implements Step {
    private static final Log log = LogFactory.getLog(PopulateCacheIndex.class);
    private static final int CACHE_INDEX_PARTITION = 0;
    private static final String INDEX_TABLE = "metrics_index";
    private static final String CACHE_INDEX_TABLE = "metrics_cache_index";
    private Session session;
    private int cacheBlockSize = Integer.parseInt(System.getProperty("rhq.metrics.cache.block-size", "5"));
    private RateLimiter permits = RateLimiter.create((double)20000.0);
    private PreparedStatement updateCacheIndex;
    private PreparedStatement findIndexTimeSlice;
    private PreparedStatement findIndexEntries;
    private PreparedStatement deleteIndexEntry;
    private AtomicInteger failedUpdates = new AtomicInteger();
    private ListeningExecutorService tasks;
    private DBConnectionFactory dbConnectionFactory;

    @Override
    public void setSession(Session session) {
        this.session = session;
    }

    @Override
    public void bind(Properties properties) {
        this.dbConnectionFactory = (DBConnectionFactory)properties.get("relational_db_connection_factory");
    }

    @Override
    public void execute() {
        if (this.dbConnectionFactory == null) {
            log.info((Object)"The relational database connection factory is not set. No data migration necessary");
        } else {
            this.tasks = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(3, new SchemaUpdateThreadFactory()));
            this.initPreparedStatements();
            Date mostRecent1HourTimeSlice = this.findMostRecentRawDataSinceLastShutdown();
            try {
                if (mostRecent1HourTimeSlice == null) {
                    log.info((Object)"The metrics_cache_index table will not be updated. No raw data was found.");
                } else {
                    Date mostRecent24HourTimeSlice;
                    log.debug((Object)("The most recent hour with raw data is " + mostRecent1HourTimeSlice));
                    Date mostRecent6HourTimeSlice = this.get6HourTimeSlice(mostRecent1HourTimeSlice).toDate();
                    Date day = mostRecent24HourTimeSlice = this.get24HourTimeSlice(mostRecent1HourTimeSlice).toDate();
                    this.updateCacheIndex(this.fetchRawIndexEntries(mostRecent1HourTimeSlice), Bucket.RAW, day, this.current1HourTimeSlice().toDate());
                    this.updateCacheIndex(this.fetch1HourIndexEntries(mostRecent6HourTimeSlice), Bucket.ONE_HOUR, day, mostRecent6HourTimeSlice);
                    this.updateCacheIndex(this.fetch6HourIndexEntries(mostRecent24HourTimeSlice), Bucket.SIX_HOUR, day, mostRecent24HourTimeSlice);
                    if (this.failedUpdates.get() > 0) {
                        throw new RuntimeException("Cannot complete upgrade step due to previous errors. There were " + this.failedUpdates.get() + " failed updates.");
                    }
                    this.deactivateCacheIfNecessary(mostRecent24HourTimeSlice);
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException("The metrics_cache_index updates have not completed due to an interrupt. The schema upgrade will have to be run again to complete the updates.", e);
            }
        }
        this.dropIndex();
    }

    private void initPreparedStatements() {
        this.findIndexEntries = this.session.prepare("SELECT schedule_id FROM rhq.metrics_index WHERE bucket = ? AND time = ?");
        this.updateCacheIndex = this.session.prepare("UPDATE rhq.metrics_cache_index SET schedule_ids = schedule_ids + ? WHERE bucket = ? AND day = ? AND partition = ? AND collection_time_slice = ? AND       start_schedule_id = ? AND insert_time_slice = ?");
        this.findIndexTimeSlice = this.session.prepare("SELECT time FROM rhq.metrics_index WHERE bucket = ? AND time = ?");
        this.deleteIndexEntry = this.session.prepare("DELETE FROM rhq.metrics_index WHERE bucket = ? AND time = ? AND schedule_id = ?");
    }

    private void updateCacheIndex(ResultSet resultSet, Bucket bucket, Date day, Date timeSlice) throws InterruptedException {
        List rows = resultSet.all();
        CountDownLatch updatesFinished = new CountDownLatch(rows.size());
        log.info((Object)("Preparing to update metrics_cache_index for " + rows.size() + " schedules from the " + bucket.text() + " bucket"));
        Date insertTimeSlice = new Date(timeSlice.getTime() + 100L);
        for (Row row : rows) {
            this.permits.acquire();
            int scheduleId = row.getInt(0);
            BoundStatement statement = this.updateCacheIndex.bind(new Object[]{ImmutableSet.of((Object)scheduleId), bucket.text(), day, 0, timeSlice, this.startId(scheduleId), insertTimeSlice});
            ResultSetFuture future = this.session.executeAsync((Query)statement);
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new CacheIndexUpdatedCallback(bucket, scheduleId, timeSlice, updatesFinished), (Executor)this.tasks);
        }
        updatesFinished.await();
        log.info((Object)("Finished updating metrics_cache_index for " + bucket.text() + " bucket"));
    }

    private Date findMostRecentRawDataSinceLastShutdown() {
        log.info((Object)"Searching for most recent hour having raw data");
        DateTime previousHour = this.current1HourTimeSlice();
        DateTime oldestRawTime = previousHour.minus((ReadablePeriod)Days.days((int)7));
        ResultSet resultSet = this.getIndexTimeSlice(Bucket.ONE_HOUR, previousHour);
        Row row = resultSet.one();
        while (row == null && previousHour.compareTo((ReadableInstant)oldestRawTime) > 0) {
            previousHour = previousHour.minusHours(1);
            resultSet = this.getIndexTimeSlice(Bucket.ONE_HOUR, previousHour);
            row = resultSet.one();
        }
        if (row == null) {
            log.info((Object)"No data found in metrics_index table");
            return null;
        }
        Date date = row.getDate(0);
        log.info((Object)("The latest hour with raw data is " + date));
        return date;
    }

    private ResultSet getIndexTimeSlice(Bucket bucket, DateTime time) {
        BoundStatement statement = this.findIndexTimeSlice.bind(new Object[]{bucket.text(), time.toDate()});
        return this.session.execute((Query)statement);
    }

    private ResultSet fetchRawIndexEntries(Date timeSlice) {
        return this.queryMetricsIndex(Bucket.ONE_HOUR, timeSlice);
    }

    private ResultSet fetch1HourIndexEntries(Date timeSlice) {
        return this.queryMetricsIndex(Bucket.SIX_HOUR, timeSlice);
    }

    private ResultSet fetch6HourIndexEntries(Date timeSlice) {
        return this.queryMetricsIndex(Bucket.TWENTY_FOUR_HOUR, timeSlice);
    }

    private ResultSet queryMetricsIndex(Bucket bucket, Date timeSlice) {
        BoundStatement statement = this.findIndexEntries.bind(new Object[]{bucket.text(), timeSlice});
        return this.session.execute((Query)statement);
    }

    private DateTime current1HourTimeSlice() {
        return this.getTimeSlice(DateTime.now(), Duration.standardHours((long)1L));
    }

    private DateTime get6HourTimeSlice(Date date) {
        return this.getTimeSlice(new DateTime(date.getTime()), Duration.standardHours((long)6L));
    }

    private DateTime get24HourTimeSlice(Date date) {
        return this.getTimeSlice(new DateTime(date.getTime()), Duration.standardHours((long)24L));
    }

    private DateTime getTimeSlice(DateTime dt, Duration duration) {
        Period p = duration.toPeriod();
        if (p.getYears() != 0) {
            return dt.yearOfEra().roundFloorCopy().minusYears(dt.getYearOfEra() % p.getYears());
        }
        if (p.getMonths() != 0) {
            return dt.monthOfYear().roundFloorCopy().minusMonths((dt.getMonthOfYear() - 1) % p.getMonths());
        }
        if (p.getWeeks() != 0) {
            return dt.weekOfWeekyear().roundFloorCopy().minusWeeks((dt.getWeekOfWeekyear() - 1) % p.getWeeks());
        }
        if (p.getDays() != 0) {
            return dt.dayOfMonth().roundFloorCopy().minusDays((dt.getDayOfMonth() - 1) % p.getDays());
        }
        if (p.getHours() != 0) {
            return dt.hourOfDay().roundFloorCopy().minusHours(dt.getHourOfDay() % p.getHours());
        }
        if (p.getMinutes() != 0) {
            return dt.minuteOfHour().roundFloorCopy().minusMinutes(dt.getMinuteOfHour() % p.getMinutes());
        }
        if (p.getSeconds() != 0) {
            return dt.secondOfMinute().roundFloorCopy().minusSeconds(dt.getSecondOfMinute() % p.getSeconds());
        }
        return dt.millisOfSecond().roundCeilingCopy().minusMillis(dt.getMillisOfSecond() % p.getMillis());
    }

    private int startId(int scheduleId) {
        return scheduleId / this.cacheBlockSize * this.cacheBlockSize;
    }

    private void deactivateCacheIfNecessary(Date mostRecent24HourTimeSlice) {
        Connection connection = null;
        Statement statement = null;
        try {
            DateTime current24HourTimeSlice = this.get24HourTimeSlice(new Date());
            if (current24HourTimeSlice.isAfter(mostRecent24HourTimeSlice.getTime())) {
                log.info((Object)("The metrics cache will not be deactivated since the most recent raw data is from before today - " + mostRecent24HourTimeSlice));
            } else {
                DateTime next24HourTimeSlice = current24HourTimeSlice.plusDays(1);
                log.info((Object)("The metrics cache will be come active at " + next24HourTimeSlice));
                connection = this.dbConnectionFactory.newConnection();
                statement = connection.createStatement();
                statement.executeUpdate("UPDATE rhq_system_config SET property_value = '" + next24HourTimeSlice.getMillis() + "' WHERE property_key = 'METRICS_CACHE_ACTIVATION_TIME'");
            }
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to deactivate metrics cache", e);
        }
        finally {
            JDBCUtil.safeClose(statement);
            JDBCUtil.safeClose(connection);
        }
    }

    private void dropIndex() {
        this.session.execute("DROP TABLE rhq.metrics_index");
    }

    private class IndexUpdatedCallback
    implements FutureCallback<ResultSet> {
        private Bucket bucket;
        private int scheduleId;
        private CountDownLatch updatesFinished;

        public IndexUpdatedCallback(Bucket bucket, int scheduleId, CountDownLatch updatesFinished) {
            this.bucket = bucket;
            this.scheduleId = scheduleId;
            this.updatesFinished = updatesFinished;
        }

        public void onSuccess(ResultSet result) {
            this.updatesFinished.countDown();
        }

        public void onFailure(Throwable t) {
            log.info((Object)("Failed to delete {bucket: " + this.bucket.text() + ", scheduleId: " + this.scheduleId + "} from " + PopulateCacheIndex.INDEX_TABLE + ": " + ThrowableUtil.getRootMessage((Throwable)t)));
            this.updatesFinished.countDown();
        }
    }

    private class CacheIndexUpdatedCallback
    implements FutureCallback<ResultSet> {
        private Bucket bucket;
        private int scheduleId;
        private Date time;
        private CountDownLatch updatesFinished;

        public CacheIndexUpdatedCallback(Bucket bucket, int scheduleId, Date time, CountDownLatch updatesFinished) {
            this.bucket = bucket;
            this.scheduleId = scheduleId;
            this.time = time;
            this.updatesFinished = updatesFinished;
        }

        public void onSuccess(ResultSet result) {
            PopulateCacheIndex.this.permits.acquire();
            BoundStatement statement = PopulateCacheIndex.this.deleteIndexEntry.bind(new Object[]{this.bucket.text(), this.time, this.scheduleId});
            ResultSetFuture future = PopulateCacheIndex.this.session.executeAsync((Query)statement);
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new IndexUpdatedCallback(this.bucket, this.scheduleId, this.updatesFinished), (Executor)PopulateCacheIndex.this.tasks);
        }

        public void onFailure(Throwable t) {
            log.warn((Object)("Failed to update cache index for {bucket: " + this.bucket.text() + ", scheduleId: " + this.scheduleId + "}: "), ThrowableUtil.getRootCause((Throwable)t));
            PopulateCacheIndex.this.failedUpdates.incrementAndGet();
            this.updatesFinished.countDown();
        }
    }

    private static enum Bucket {
        RAW("raw_metrics"),
        ONE_HOUR("one_hour_metrics"),
        SIX_HOUR("six_hour_metrics"),
        TWENTY_FOUR_HOUR("twenty_four_hour_metrics");

        private String text;

        private Bucket(String text) {
            this.text = text;
        }

        public String text() {
            return this.text;
        }
    }
}

