/*
 * Decompiled with CFR 0.152.
 */
package org.rhq.server.metrics.aggregation;

import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.rhq.server.metrics.AbortedException;
import org.rhq.server.metrics.ArithmeticMeanCalculator;
import org.rhq.server.metrics.CacheMapper;
import org.rhq.server.metrics.DateTimeService;
import org.rhq.server.metrics.MetricsDAO;
import org.rhq.server.metrics.aggregation.AggregationType;
import org.rhq.server.metrics.aggregation.CacheIndexQueryException;
import org.rhq.server.metrics.aggregation.CacheIterator;
import org.rhq.server.metrics.aggregation.IndexAggregatesPair;
import org.rhq.server.metrics.aggregation.TaskTracker;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.CacheIndexEntry;
import org.rhq.server.metrics.domain.NumericMetric;
import org.rhq.server.metrics.domain.ResultSetMapper;

abstract class BaseAggregator {
    private final Log LOG = LogFactory.getLog(this.getClass());
    protected static final int BATCH_SIZE = 5;
    protected MetricsDAO dao;
    protected AggregationType aggregationType;
    protected AsyncFunction<IndexAggregatesPair, List<ResultSet>> persistMetrics;
    protected Semaphore permits;
    protected ListeningExecutorService aggregationTasks;
    protected DateTime startTime;
    protected DateTimeService dateTimeService;
    protected TaskTracker taskTracker = new TaskTracker();
    protected boolean cacheActive = true;
    protected int indexPageSize;

    BaseAggregator() {
    }

    void setDao(MetricsDAO dao) {
        this.dao = dao;
    }

    void setAggregationType(AggregationType aggregationType) {
        this.aggregationType = aggregationType;
    }

    void setPersistMetrics(AsyncFunction<IndexAggregatesPair, List<ResultSet>> persistMetrics) {
        this.persistMetrics = persistMetrics;
    }

    void setPermits(Semaphore permits) {
        this.permits = permits;
    }

    void setAggregationTasks(ListeningExecutorService aggregationTasks) {
        this.aggregationTasks = aggregationTasks;
    }

    void setStartTime(DateTime startTime) {
        this.startTime = startTime;
    }

    void setDateTimeService(DateTimeService dateTimeService) {
        this.dateTimeService = dateTimeService;
    }

    void setCacheActive(boolean cacheActive) {
        this.cacheActive = cacheActive;
    }

    public void setIndexPageSize(int indexPageSize) {
        this.indexPageSize = indexPageSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<AggregationType, Integer> execute() throws InterruptedException, AbortedException {
        this.LOG.debug((Object)("Starting " + this.getDebugType() + " aggregation"));
        Stopwatch stopwatch = new Stopwatch().start();
        try {
            List<CacheIndexEntry> indexEntries = this.getIndexEntries();
            this.scheduleTasks(indexEntries);
            this.taskTracker.waitForTasksToFinish();
        }
        catch (CacheIndexQueryException e) {
            this.LOG.warn((Object)"There was an error querying the cache index", (Throwable)e);
            this.taskTracker.abort("There was an error querying the cache index: " + e.getMessage());
        }
        catch (Exception e) {
            this.LOG.warn((Object)"There was an unexpected error scheduling aggregation tasks", (Throwable)e);
            this.taskTracker.abort("There was an unexpected error scheduling aggregation tasks: " + e.getMessage());
        }
        finally {
            stopwatch.stop();
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug((Object)("Finished " + this.getDebugType() + " aggregation in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms"));
            }
        }
        return this.getAggregationCounts();
    }

    protected abstract List<CacheIndexEntry> getIndexEntries();

    protected abstract AggregationTask createAggregationTask(CacheIndexEntry var1);

    protected abstract Map<AggregationType, Integer> getAggregationCounts();

    protected abstract Iterable<CacheIndexEntry> reduceIndexEntries(List<CacheIndexEntry> var1);

    protected abstract String getDebugType();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scheduleTasks(List<CacheIndexEntry> indexEntries) {
        try {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug((Object)("Scheduling " + this.getDebugType() + " aggregation tasks for " + indexEntries.size() + " index entries"));
            }
            for (CacheIndexEntry indexEntry : this.reduceIndexEntries(indexEntries)) {
                this.submitAggregationTask(indexEntry);
            }
            this.taskTracker.finishedSchedulingTasks();
        }
        catch (InterruptedException e) {
            this.LOG.warn((Object)"There was an interrupt while scheduling aggregation tasks.", (Throwable)e);
            this.taskTracker.abort("There was an interrupt while scheduling aggregation tasks.");
        }
        catch (Exception e) {
            this.LOG.error((Object)("There was an unexpected error while scheduling " + this.getDebugType() + " aggregation tasks"), (Throwable)e);
            this.taskTracker.abort("Aborting " + this.getDebugType() + " aggregation due to unexpected error: " + e.getMessage());
        }
        finally {
            this.LOG.debug((Object)"Finished scheduling aggregation tasks");
        }
    }

    protected void submitAggregationTask(CacheIndexEntry indexEntry) throws InterruptedException {
        this.permits.acquire();
        this.aggregationTasks.submit((Runnable)this.createAggregationTask(indexEntry));
        this.taskTracker.addTask();
    }

    protected <T extends NumericMetric> Function<List<ResultSet>, Iterable<List<T>>> toIterable(final ResultSetMapper<T> mapper) {
        return new Function<List<ResultSet>, Iterable<List<T>>>(){

            public Iterable<List<T>> apply(final List<ResultSet> resultSets) {
                return new Iterable<List<T>>(){
                    private Iterator<ResultSet> resultSetIterator;
                    {
                        this.resultSetIterator = resultSets.iterator();
                    }

                    @Override
                    public Iterator<List<T>> iterator() {
                        return new Iterator<List<T>>(){

                            @Override
                            public boolean hasNext() {
                                return resultSetIterator.hasNext();
                            }

                            @Override
                            public List<T> next() {
                                return mapper.mapAll((ResultSet)resultSetIterator.next());
                            }

                            @Override
                            public void remove() {
                                throw new UnsupportedOperationException();
                            }
                        };
                    }
                };
            }
        };
    }

    protected <T extends NumericMetric> Function<Iterable<List<T>>, List<AggregateNumericMetric>> computeAggregates(final long timeSlice, Class<T> type) {
        return new Function<Iterable<List<T>>, List<AggregateNumericMetric>>(){

            public List<AggregateNumericMetric> apply(Iterable<List<T>> values) {
                ArrayList<AggregateNumericMetric> aggregates = new ArrayList<AggregateNumericMetric>(5);
                for (List metricList : values) {
                    aggregates.add(BaseAggregator.this.computeAggregate(metricList, timeSlice));
                }
                return aggregates;
            }
        };
    }

    private <T extends NumericMetric> AggregateNumericMetric computeAggregate(List<T> metrics, long timeSlice) {
        Double min = Double.NaN;
        Double max = Double.NaN;
        ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
        int scheduleId = 0;
        for (NumericMetric metric : metrics) {
            mean.add(metric.getAvg());
            if (Double.isNaN(min)) {
                scheduleId = metric.getScheduleId();
                min = metric.getMin();
                max = metric.getMax();
                continue;
            }
            if (metric.getMin() < min) {
                min = metric.getMin();
            }
            if (!(metric.getMax() > max)) continue;
            max = metric.getMax();
        }
        return new AggregateNumericMetric(scheduleId, mean.getArithmeticMean(), min, max, timeSlice);
    }

    protected AsyncFunction<List<ResultSet>, ResultSet> deleteCacheEntry(final CacheIndexEntry indexEntry) {
        return new AsyncFunction<List<ResultSet>, ResultSet>(){

            public ListenableFuture<ResultSet> apply(List<ResultSet> resultSets) throws Exception {
                return BaseAggregator.this.dao.deleteCacheEntries(BaseAggregator.this.aggregationType.getCacheTable(), indexEntry.getCollectionTimeSlice(), indexEntry.getStartScheduleId());
            }
        };
    }

    protected AsyncFunction<ResultSet, ResultSet> deleteCacheIndexEntry(final CacheIndexEntry indexEntry) {
        return new AsyncFunction<ResultSet, ResultSet>(){

            public ListenableFuture<ResultSet> apply(ResultSet deleteCacheResultSet) throws Exception {
                return BaseAggregator.this.dao.deleteCacheIndexEntry(BaseAggregator.this.aggregationType.getCacheTable(), indexEntry.getDay(), indexEntry.getPartition(), indexEntry.getCollectionTimeSlice(), indexEntry.getStartScheduleId(), indexEntry.getInsertTimeSlice());
            }
        };
    }

    protected Function<List<AggregateNumericMetric>, IndexAggregatesPair> indexAggregatesPair(final CacheIndexEntry indexEntry) {
        return new Function<List<AggregateNumericMetric>, IndexAggregatesPair>(){

            public IndexAggregatesPair apply(List<AggregateNumericMetric> metrics) {
                return new IndexAggregatesPair(indexEntry, metrics);
            }
        };
    }

    protected <T extends NumericMetric> Function<ResultSet, Iterable<List<T>>> toIterable(final CacheMapper<T> mapper) {
        return new Function<ResultSet, Iterable<List<T>>>(){

            public Iterable<List<T>> apply(final ResultSet resultSet) {
                return new Iterable<List<T>>(){

                    @Override
                    public Iterator<List<T>> iterator() {
                        return new CacheIterator(mapper, resultSet);
                    }
                };
            }
        };
    }

    protected class AggregationTaskFinishedCallback<T>
    implements FutureCallback<T> {
        protected AggregationTaskFinishedCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSuccess(T args) {
            try {
                this.onFinish(args);
            }
            finally {
                BaseAggregator.this.permits.release();
                BaseAggregator.this.taskTracker.finishedTask();
                if (BaseAggregator.this.LOG.isDebugEnabled()) {
                    BaseAggregator.this.LOG.debug((Object)("There are " + BaseAggregator.this.taskTracker.getRemainingTasks() + " remaining tasks and " + BaseAggregator.this.permits.availablePermits() + " available permits"));
                }
            }
        }

        protected void onFinish(T args) {
        }

        public void onFailure(Throwable t) {
            BaseAggregator.this.LOG.warn((Object)"There was an error aggregating data", t);
            BaseAggregator.this.permits.release();
            BaseAggregator.this.taskTracker.finishedTask();
            if (BaseAggregator.this.LOG.isDebugEnabled()) {
                BaseAggregator.this.LOG.debug((Object)("There are " + BaseAggregator.this.taskTracker.getRemainingTasks() + " remaining tasks and " + BaseAggregator.this.permits.availablePermits() + " available permits"));
            }
        }
    }

    protected abstract class AggregationTask
    implements Runnable {
        private CacheIndexEntry indexEntry;

        public AggregationTask(CacheIndexEntry indexEntry) {
            this.indexEntry = indexEntry;
        }

        @Override
        public void run() {
            try {
                this.run(this.indexEntry);
            }
            catch (Exception e) {
                BaseAggregator.this.LOG.error((Object)"Aggregation will be aborted due to an unexpected error", (Throwable)e);
                BaseAggregator.this.taskTracker.abort("Aborting aggregation due to an unexpected error: " + e.getMessage());
            }
        }

        abstract void run(CacheIndexEntry var1);
    }
}

