/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.core.service;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.hawkular.metrics.core.service.AvailabilityDataPointCollector;
import org.hawkular.metrics.core.service.DataAccess;
import org.hawkular.metrics.core.service.DataRetentionsMapper;
import org.hawkular.metrics.core.service.DateTimeService;
import org.hawkular.metrics.core.service.Functions;
import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.core.service.MetricsThreadFactory;
import org.hawkular.metrics.core.service.NumericDataPointCollector;
import org.hawkular.metrics.core.service.Order;
import org.hawkular.metrics.core.service.SumNumericBucketPointCollector;
import org.hawkular.metrics.core.service.VoidSubscriber;
import org.hawkular.metrics.core.service.log.CoreLogger;
import org.hawkular.metrics.core.service.log.CoreLogging;
import org.hawkular.metrics.core.service.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.core.service.transformers.MetricsIndexRowTransformer;
import org.hawkular.metrics.core.service.transformers.TagsIndexRowTransformer;
import org.hawkular.metrics.model.AvailabilityBucketPoint;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.BucketPoint;
import org.hawkular.metrics.model.Buckets;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.model.NumericBucketPoint;
import org.hawkular.metrics.model.Retention;
import org.hawkular.metrics.model.Tenant;
import org.hawkular.metrics.model.Utils;
import org.hawkular.metrics.model.exception.MetricAlreadyExistsException;
import org.hawkular.metrics.model.exception.TenantAlreadyExistsException;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.joda.time.Duration;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Func5;
import rx.observable.ListenableFutureObservable;
import rx.subjects.PublishSubject;

public class MetricsServiceImpl
implements MetricsService {
    private static final CoreLogger log = CoreLogging.getCoreLogger(MetricsServiceImpl.class);
    public static final String SYSTEM_TENANT_ID = Functions.makeSafe("system");
    private final Map<DataRetentionKey, Integer> dataRetentions = new ConcurrentHashMap<DataRetentionKey, Integer>();
    private final PublishSubject<Metric<?>> insertedDataPointEvents = PublishSubject.create();
    private ListeningExecutorService metricsTasks;
    private DataAccess dataAccess;
    private TaskScheduler taskScheduler;
    private DateTimeService dateTimeService;
    private MetricRegistry metricRegistry;
    private Map<MetricType<?>, Func2<? extends Metric<?>, Integer, Observable<Integer>>> dataPointInserters;
    private Map<MetricType<?>, Meter> dataPointInsertMeters;
    private Map<MetricType<?>, Timer> dataPointReadTimers;
    private Map<MetricType<?>, Func5<? extends MetricId<?>, Long, Long, Integer, Order, Observable<Row>>> dataPointFinders;
    private Map<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> dataPointMappers;
    private int defaultTTL = Duration.standardDays(7L).toStandardSeconds().getSeconds();

    public void startUp(Session session, String keyspace, boolean resetDb, MetricRegistry metricRegistry) {
        this.startUp(session, keyspace, resetDb, true, metricRegistry);
    }

    public void startUp(Session session, String keyspace, boolean resetDb, boolean createSchema, MetricRegistry metricRegistry) {
        SchemaManager schemaManager = new SchemaManager(session);
        if (resetDb) {
            schemaManager.dropKeyspace(keyspace);
        }
        if (createSchema) {
            schemaManager.createSchema(keyspace);
        }
        session.execute("USE " + keyspace);
        log.infoKeyspaceUsed(keyspace);
        this.metricsTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory()));
        this.loadDataRetentions();
        this.metricRegistry = metricRegistry;
        this.dataPointInserters = ImmutableMap.builder().put(MetricType.GAUGE, (metric, ttl) -> {
            Metric gauge = metric;
            return this.dataAccess.insertGaugeData(gauge, (int)ttl);
        }).put(MetricType.AVAILABILITY, (metric, ttl) -> {
            Metric avail = metric;
            return this.dataAccess.insertAvailabilityData(avail, (int)ttl);
        }).put(MetricType.COUNTER, (metric, ttl) -> {
            Metric counter = metric;
            return this.dataAccess.insertCounterData(counter, (int)ttl);
        }).put(MetricType.COUNTER_RATE, (metric, ttl) -> {
            Metric gauge = metric;
            return this.dataAccess.insertGaugeData(gauge, (int)ttl);
        }).build();
        this.dataPointFinders = ImmutableMap.builder().put(MetricType.GAUGE, (metricId, start, end, limit, order) -> {
            MetricId gaugeId = metricId;
            return this.dataAccess.findGaugeData(gaugeId, (long)start, (long)end, (int)limit, (Order)((Object)order), false);
        }).put(MetricType.AVAILABILITY, (metricId, start, end, limit, order) -> {
            MetricId availabilityId = metricId;
            return this.dataAccess.findAvailabilityData(availabilityId, (long)start, (long)end, (int)limit, (Order)((Object)order), false);
        }).put(MetricType.COUNTER, (metricId, start, end, limit, order) -> {
            MetricId counterId = metricId;
            return this.dataAccess.findCounterData(counterId, (long)start, (long)end, (int)limit, (Order)((Object)order));
        }).build();
        this.dataPointMappers = ImmutableMap.builder().put(MetricType.GAUGE, Functions::getGaugeDataPoint).put(MetricType.AVAILABILITY, Functions::getAvailabilityDataPoint).put(MetricType.COUNTER, Functions::getCounterDataPoint).build();
        this.initMetrics();
    }

    void loadDataRetentions() {
        List<String> tenantIds = this.loadTenantIds();
        CountDownLatch latch = new CountDownLatch(tenantIds.size() * 2);
        for (String tenantId : tenantIds) {
            DataRetentionsMapper gaugeMapper = new DataRetentionsMapper(tenantId, MetricType.GAUGE);
            DataRetentionsMapper availMapper = new DataRetentionsMapper(tenantId, MetricType.AVAILABILITY);
            ResultSetFuture gaugeFuture = this.dataAccess.findDataRetentions(tenantId, MetricType.GAUGE);
            ResultSetFuture availabilityFuture = this.dataAccess.findDataRetentions(tenantId, MetricType.AVAILABILITY);
            ListenableFuture<Set<Retention>> gaugeRetentions = Futures.transform(gaugeFuture, gaugeMapper, (Executor)this.metricsTasks);
            ListenableFuture<Set<Retention>> availabilityRetentions = Futures.transform(availabilityFuture, availMapper, (Executor)this.metricsTasks);
            Futures.addCallback(gaugeRetentions, new DataRetentionsLoadedCallback(tenantId, MetricType.GAUGE, latch));
            Futures.addCallback(availabilityRetentions, new DataRetentionsLoadedCallback(tenantId, MetricType.AVAILABILITY, latch));
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    void unloadDataRetentions() {
        this.dataRetentions.clear();
    }

    private void initMetrics() {
        this.dataPointInsertMeters = ImmutableMap.builder().put(MetricType.GAUGE, this.metricRegistry.meter("gauge-inserts")).put(MetricType.AVAILABILITY, this.metricRegistry.meter("availability-inserts")).put(MetricType.COUNTER, this.metricRegistry.meter("counter-inserts")).put(MetricType.COUNTER_RATE, this.metricRegistry.meter("gauge-inserts")).build();
        this.dataPointReadTimers = ImmutableMap.builder().put(MetricType.GAUGE, this.metricRegistry.timer("gauge-read-latency")).put(MetricType.AVAILABILITY, this.metricRegistry.timer("availability-read-latency")).put(MetricType.COUNTER, this.metricRegistry.timer("counter-read-latency")).build();
    }

    DataAccess getDataAccess() {
        return this.dataAccess;
    }

    public void setDataAccess(DataAccess dataAccess) {
        this.dataAccess = dataAccess;
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
    }

    public void setDateTimeService(DateTimeService dateTimeService) {
        this.dateTimeService = dateTimeService;
    }

    public void setDefaultTTL(int defaultTTL) {
        this.defaultTTL = Duration.standardDays(defaultTTL).toStandardSeconds().getSeconds();
    }

    @Override
    public Observable<Void> createTenant(Tenant tenant) {
        return Observable.create(subscriber -> {
            Observable updates = this.dataAccess.insertTenant(tenant).flatMap(resultSet -> {
                if (!resultSet.wasApplied()) {
                    throw new TenantAlreadyExistsException(tenant.getId());
                }
                Observable retentionUpdates = Observable.from(tenant.getRetentionSettings().entrySet()).flatMap(entry -> this.dataAccess.updateRetentionsIndex(tenant.getId(), (MetricType)entry.getKey(), ImmutableMap.of(Functions.makeSafe(((MetricType)entry.getKey()).getText()), entry.getValue()))).map(rs -> null);
                return retentionUpdates;
            });
            updates.subscribe(resultSet -> {}, arg_0 -> ((Subscriber)subscriber).onError(arg_0), () -> ((Subscriber)subscriber).onCompleted());
        });
    }

    @Override
    public Observable<Tenant> getTenants() {
        return this.dataAccess.findAllTenantIds().map(row -> row.getString(0)).distinct().flatMap(id -> this.dataAccess.findTenant((String)id).map(Functions::getTenant).switchIfEmpty(Observable.just((Object)new Tenant((String)id))));
    }

    private List<String> loadTenantIds() {
        Iterable tenantIds = this.dataAccess.findAllTenantIds().map(row -> row.getString(0)).distinct().toBlocking().toIterable();
        return ImmutableList.copyOf(tenantIds);
    }

    @Override
    public Observable<Void> createMetric(Metric<?> metric) {
        MetricType<?> metricType = metric.getMetricId().getType();
        if (!metricType.isUserType()) {
            throw new IllegalArgumentException(metric + " cannot be created. " + metricType + " metrics are " + "internally generated metrics and cannot be created by clients.");
        }
        ResultSetFuture future = this.dataAccess.insertMetricInMetricsIndex(metric);
        Observable indexUpdated = ListenableFutureObservable.from((ListenableFuture)future, (Executor)this.metricsTasks);
        return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
            if (!resultSet.wasApplied()) {
                subscriber.onError((Throwable)new MetricAlreadyExistsException(metric));
            } else {
                ArrayList<Observable<ResultSet>> updates = new ArrayList<Observable<ResultSet>>();
                updates.add(this.dataAccess.addDataRetention(metric));
                updates.add(this.dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()));
                if (metric.getDataRetention() != null) {
                    updates.add(this.updateRetentionsIndex(metric));
                }
                Observable.merge(updates).subscribe(new VoidSubscriber((Subscriber<? super Void>)subscriber));
            }
        }));
    }

    private Observable<ResultSet> updateRetentionsIndex(Metric<?> metric) {
        ResultSetFuture dataRetentionFuture = this.dataAccess.updateRetentionsIndex(metric);
        Observable dataRetentionUpdated = ListenableFutureObservable.from((ListenableFuture)dataRetentionFuture, (Executor)this.metricsTasks);
        this.dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention());
        return dataRetentionUpdated;
    }

    @Override
    public <T> Observable<Metric<T>> findMetric(MetricId<T> id) {
        return this.dataAccess.findMetric(id).compose(new MetricsIndexRowTransformer<T>(id.getTenantId(), id.getType(), this.defaultTTL));
    }

    @Override
    public <T> Observable<Metric<T>> findMetrics(String tenantId, MetricType<T> metricType) {
        if (metricType == null) {
            return Observable.from(MetricType.userTypes()).map(type -> {
                MetricType t = type;
                return t;
            }).flatMap(type -> this.dataAccess.findMetricsInMetricsIndex(tenantId, type).compose(new MetricsIndexRowTransformer(tenantId, type, this.defaultTTL)));
        }
        return this.dataAccess.findMetricsInMetricsIndex(tenantId, metricType).compose(new MetricsIndexRowTransformer<T>(tenantId, metricType, this.defaultTTL));
    }

    private <T> Observable<Metric<T>> findMetricsWithFilters(String tenantId, MetricType<T> metricType, Map<String, String> tagsQueries) {
        return Observable.from(tagsQueries.entrySet()).flatMap(e -> this.dataAccess.findMetricsByTagName(tenantId, (String)e.getKey()).filter(this.tagValueFilter((String)e.getValue())).compose(new TagsIndexRowTransformer(tenantId, metricType)).compose(new ItemsToSetTransformer()).reduce((s1, s2) -> {
            s1.addAll(s2);
            return s1;
        })).reduce((s1, s2) -> {
            s1.retainAll((Collection<?>)s2);
            return s1;
        }).flatMap(Observable::from).flatMap(this::findMetric);
    }

    @Override
    public <T> Observable<Metric<T>> findMetricsWithFilters(String tenantId, MetricType<T> metricType, Map<String, String> tagsQueries, Func1<Metric<T>, Boolean> ... filters) {
        Observable metricObservable = this.findMetricsWithFilters(tenantId, metricType, tagsQueries);
        for (Func1<Metric<T>, Boolean> filter : filters) {
            metricObservable = metricObservable.filter(filter);
        }
        return metricObservable;
    }

    private Func1<Row, Boolean> tagValueFilter(String regexp) {
        boolean positive = !regexp.startsWith("!");
        Pattern p = this.filterPattern(regexp);
        return r -> positive == p.matcher(r.getString(2)).matches();
    }

    @Override
    public <T> Func1<Metric<T>, Boolean> idFilter(String regexp) {
        boolean positive = !regexp.startsWith("!");
        Pattern p = this.filterPattern(regexp);
        return tMetric -> positive == p.matcher(tMetric.getId()).matches();
    }

    private Pattern filterPattern(String inputRegexp) {
        if (inputRegexp.equals("*")) {
            inputRegexp = ".*";
        } else if (inputRegexp.startsWith("!")) {
            inputRegexp = inputRegexp.substring(1);
        }
        return Pattern.compile(inputRegexp);
    }

    @Override
    public Observable<Optional<Map<String, String>>> getMetricTags(MetricId<?> id) {
        return this.dataAccess.getMetricTags(id).take(1).map(row -> Optional.of(row.getMap(0, String.class, String.class))).defaultIfEmpty(Optional.empty());
    }

    @Override
    public Observable<Void> addTags(Metric<?> metric, Map<String, String> tags) {
        try {
            Preconditions.checkArgument(tags != null, "Missing tags");
            Preconditions.checkArgument(Functions.isValidTagMap(tags), "Invalid tags; tag key is required");
        }
        catch (Exception e) {
            return Observable.error((Throwable)e);
        }
        return this.dataAccess.addTags(metric, tags).mergeWith(this.dataAccess.insertIntoMetricsTagsIndex(metric, tags)).toList().map(l -> null);
    }

    @Override
    public Observable<Void> deleteTags(Metric<?> metric, Map<String, String> tags) {
        return this.dataAccess.deleteTags(metric, tags.keySet()).mergeWith(this.dataAccess.deleteFromMetricsTagsIndex(metric, tags)).toList().map(r -> null);
    }

    @Override
    public <T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<Metric<T>> metrics) {
        Preconditions.checkArgument(metricType != null, "metricType is null");
        Meter meter = this.getInsertMeter(metricType);
        Func2 inserter = this.getInserter(metricType);
        Observable updates = metrics.filter(metric -> !metric.getDataPoints().isEmpty()).flatMap(metric -> ((Observable)inserter.call(metric, (Object)this.getTTL(metric.getMetricId()))).doOnNext(i -> this.insertedDataPointEvents.onNext(metric))).doOnNext(meter::mark);
        Observable indexUpdates = this.dataAccess.updateMetricsIndex(metrics).doOnNext(batchSize -> log.tracef("Inserted %d %s metrics into metrics_idx", batchSize, metricType));
        return Observable.concat((Observable)updates, (Observable)indexUpdates).takeLast(1).map(count -> null);
    }

    private <T> Meter getInsertMeter(MetricType<T> metricType) {
        Meter meter = this.dataPointInsertMeters.get(metricType);
        if (meter == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return meter;
    }

    private <T> Func2<Metric<T>, Integer, Observable<Integer>> getInserter(MetricType<T> metricType) {
        Func2<? extends Metric<?>, Integer, Observable<Integer>> inserter = this.dataPointInserters.get(metricType);
        if (inserter == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return inserter;
    }

    @Override
    public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit, Order order) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        MetricType<T> metricType = metricId.getType();
        Timer timer = this.getDataPointFindTimer(metricType);
        Func5 finder = this.getDataPointFinder(metricType);
        Func1 mapper = this.getDataPointMapper(metricType);
        return this.time(timer, () -> ((Observable)finder.call((Object)metricId, (Object)start, (Object)end, (Object)limit, (Object)order)).map(mapper));
    }

    private <T> Timer getDataPointFindTimer(MetricType<T> metricType) {
        Timer timer = this.dataPointReadTimers.get(metricType);
        if (timer == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return timer;
    }

    private <T> Func5<MetricId<T>, Long, Long, Integer, Order, Observable<Row>> getDataPointFinder(MetricType<T> metricType) {
        Func5<? extends MetricId<?>, Long, Long, Integer, Order, Observable<Row>> finder = this.dataPointFinders.get(metricType);
        if (finder == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return finder;
    }

    private <T> Func1<Row, DataPoint<T>> getDataPointMapper(MetricType<T> metricType) {
        Func1<Row, ? extends DataPoint<?>> mapper = this.dataPointMappers.get(metricType);
        if (mapper == null) {
            throw new UnsupportedOperationException(metricType.getText());
        }
        return mapper;
    }

    @Override
    public Observable<DataPoint<Double>> findRateData(MetricId<Long> id, long start, long end) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.findDataPoints(id, start, end, 0, Order.ASC).buffer(2, 1).filter(l -> l.size() == 2 && (Long)((DataPoint)l.get(1)).getValue() >= (Long)((DataPoint)l.get(0)).getValue()).map(l -> {
            DataPoint point1 = (DataPoint)l.get(0);
            DataPoint point2 = (DataPoint)l.get(1);
            long timestamp = point2.getTimestamp();
            long value_diff = (Long)point2.getValue() - (Long)point1.getValue();
            double time_diff = point2.getTimestamp() - point1.getTimestamp();
            double rate = 60000.0 * (double)value_diff / time_diff;
            return new DataPoint<Double>(timestamp, rate);
        });
    }

    @Override
    public Observable<List<NumericBucketPoint>> findRateStats(MetricId<Long> id, long start, long end, Buckets buckets, List<Double> percentiles) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.bucketize(this.findRateData(id, start, end), buckets, percentiles);
    }

    @Override
    public <T> Observable<T> findGaugeData(MetricId<Double> id, long start, long end, Func1<Observable<DataPoint<Double>>, Observable<T>> ... funcs) {
        Observable dataCache = this.findDataPoints(id, start, end, 0, Order.DESC).cache();
        return Observable.from((Object[])funcs).flatMap(fn -> (Observable)fn.call((Object)dataCache));
    }

    @Override
    public Observable<List<NumericBucketPoint>> findGaugeStats(MetricId<Double> metricId, long start, long end, Buckets buckets, List<Double> percentiles) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.bucketize(this.findDataPoints(metricId, start, end, 0, Order.DESC), buckets, percentiles);
    }

    @Override
    public <T extends Number> Observable<List<NumericBucketPoint>> findNumericStats(String tenantId, MetricType<T> metricType, Map<String, String> tagFilters, long start, long end, Buckets buckets, List<Double> percentiles, boolean stacked) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        if (!stacked) {
            if (MetricType.COUNTER.equals(metricType) || MetricType.GAUGE.equals(metricType)) {
                return this.bucketize((Observable<? extends DataPoint<? extends Number>>)this.findMetricsWithFilters(tenantId, metricType, tagFilters).flatMap(metric -> this.findDataPoints(metric.getMetricId(), start, end, 0, Order.DESC)), buckets, percentiles);
            }
            return this.bucketize((Observable<? extends DataPoint<? extends Number>>)this.findMetricsWithFilters(tenantId, MetricType.COUNTER, tagFilters).flatMap(metric -> this.findRateData(metric.getMetricId(), start, end)), buckets, percentiles);
        }
        Observable individualStats = MetricType.COUNTER.equals(metricType) || MetricType.GAUGE.equals(metricType) ? this.findMetricsWithFilters(tenantId, metricType, tagFilters).map(metric -> this.bucketize(this.findDataPoints(metric.getMetricId(), start, end, 0, Order.DESC), buckets, percentiles).flatMap(Observable::from)) : this.findMetricsWithFilters(tenantId, MetricType.COUNTER, tagFilters).map(metric -> this.bucketize(this.findRateData(metric.getMetricId(), start, end), buckets, percentiles).flatMap(Observable::from));
        return Observable.merge((Observable)individualStats).groupBy(BucketPoint::getStart).flatMap(group -> group.collect(SumNumericBucketPointCollector::new, SumNumericBucketPointCollector::increment)).map(SumNumericBucketPointCollector::toBucketPoint).toMap(BucketPoint::getStart).map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
    }

    @Override
    public <T extends Number> Observable<List<NumericBucketPoint>> findNumericStats(String tenantId, MetricType<T> metricType, List<String> metrics, long start, long end, Buckets buckets, List<Double> percentiles, boolean stacked) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        if (!stacked) {
            if (MetricType.COUNTER.equals(metricType) || MetricType.GAUGE.equals(metricType)) {
                return this.bucketize((Observable<? extends DataPoint<? extends Number>>)Observable.from(metrics).flatMap(metricName -> this.findMetric(new MetricId(tenantId, metricType, (String)metricName))).flatMap(metric -> this.findDataPoints(metric.getMetricId(), start, end, 0, Order.DESC)), buckets, percentiles);
            }
            return this.bucketize((Observable<? extends DataPoint<? extends Number>>)Observable.from(metrics).flatMap(metricName -> this.findMetric(new MetricId<Long>(tenantId, MetricType.COUNTER, (String)metricName))).flatMap(metric -> this.findRateData(new MetricId<Long>(tenantId, MetricType.COUNTER, metric.getMetricId().getName()), start, end)), buckets, percentiles);
        }
        Observable individualStats = MetricType.COUNTER.equals(metricType) || MetricType.GAUGE.equals(metricType) ? Observable.from(metrics).flatMap(metricName -> this.findMetric(new MetricId(tenantId, metricType, (String)metricName))).map(metric -> this.bucketize(this.findDataPoints(metric.getMetricId(), start, end, 0, Order.DESC), buckets, percentiles).flatMap(Observable::from)) : Observable.from(metrics).flatMap(metricName -> this.findMetric(new MetricId<Long>(tenantId, MetricType.COUNTER, (String)metricName))).map(metric -> this.bucketize(this.findRateData(new MetricId<Long>(tenantId, MetricType.COUNTER, metric.getMetricId().getName()), start, end), buckets, percentiles).flatMap(Observable::from));
        return Observable.merge((Observable)individualStats).groupBy(BucketPoint::getStart).flatMap(group -> group.collect(SumNumericBucketPointCollector::new, SumNumericBucketPointCollector::increment)).map(SumNumericBucketPointCollector::toBucketPoint).toMap(BucketPoint::getStart).map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
    }

    private Observable<List<NumericBucketPoint>> bucketize(Observable<? extends DataPoint<? extends Number>> dataPoints, Buckets buckets, List<Double> percentiles) {
        return dataPoints.groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp())).flatMap(group -> group.collect(() -> new NumericDataPointCollector(buckets, (Integer)group.getKey(), percentiles), NumericDataPointCollector::increment)).map(NumericDataPointCollector::toBucketPoint).toMap(BucketPoint::getStart).map(pointMap -> NumericBucketPoint.toList(pointMap, buckets));
    }

    @Override
    public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start, long end, boolean distinct, int limit, Order order) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        if (distinct) {
            Observable availabilityData = this.findDataPoints(id, start, end, 0, order).distinctUntilChanged(DataPoint::getValue);
            if (limit <= 0) {
                return availabilityData;
            }
            return availabilityData.limit(limit);
        }
        return this.findDataPoints(id, start, end, limit, order);
    }

    @Override
    public Observable<List<AvailabilityBucketPoint>> findAvailabilityStats(MetricId<AvailabilityType> metricId, long start, long end, Buckets buckets) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.findDataPoints(metricId, start, end, 0, Order.ASC).groupBy(dataPoint -> buckets.getIndex(dataPoint.getTimestamp())).flatMap(group -> group.collect(() -> new AvailabilityDataPointCollector(buckets, (Integer)group.getKey()), AvailabilityDataPointCollector::increment)).map(AvailabilityDataPointCollector::toBucketPoint).toMap(BucketPoint::getStart).map(pointMap -> AvailabilityBucketPoint.toList(pointMap, buckets));
    }

    @Override
    public Observable<Boolean> idExists(MetricId<?> metricId) {
        return this.findMetrics(metricId.getTenantId(), metricId.getType()).filter(m -> metricId.getName().equals(m.getMetricId().getName())).take(1).map(m -> Boolean.TRUE).defaultIfEmpty((Object)Boolean.FALSE);
    }

    @Override
    public Observable<List<NumericBucketPoint>> findCounterStats(MetricId<Long> id, long start, long end, Buckets buckets, List<Double> percentiles) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.bucketize(this.findDataPoints(id, start, end, 0, Order.ASC), buckets, percentiles);
    }

    @Override
    public Observable<List<long[]>> getPeriods(MetricId<Double> id, Predicate<Double> predicate, long start, long end) {
        Preconditions.checkArgument(Utils.isValidTimeRange(start, end), "Invalid time range");
        return this.dataAccess.findGaugeData(id, start, end, 0, Order.ASC, false).map(Functions::getGaugeDataPoint).toList().map(data -> {
            ArrayList<long[]> periods = new ArrayList<long[]>(data.size());
            long[] period = null;
            DataPoint previous = null;
            for (DataPoint d : data) {
                if (predicate.test((Double)d.getValue())) {
                    if (period == null) {
                        period = new long[2];
                        period[0] = d.getTimestamp();
                    }
                    previous = d;
                    continue;
                }
                if (period == null) continue;
                period[1] = previous.getTimestamp();
                periods.add(period);
                period = null;
                previous = null;
            }
            if (period != null) {
                period[1] = previous.getTimestamp();
                periods.add(period);
            }
            return periods;
        });
    }

    @Override
    public Observable<Metric<?>> insertedDataEvents() {
        return this.insertedDataPointEvents;
    }

    private int getTTL(MetricId<?> metricId) {
        Integer ttl = this.dataRetentions.get(new DataRetentionKey(metricId));
        if (ttl == null) {
            ttl = this.dataRetentions.getOrDefault(new DataRetentionKey(metricId.getTenantId(), metricId.getType()), this.defaultTTL);
        }
        return ttl;
    }

    public void shutdown() {
        this.insertedDataPointEvents.onCompleted();
        this.metricsTasks.shutdown();
        this.unloadDataRetentions();
    }

    private <T> T time(Timer timer, Callable<T> callable) {
        try {
            return timer.time(callable);
        }
        catch (Exception e) {
            throw new RuntimeException("There was an error during a timed event", e);
        }
    }

    private static class TenantBucket {
        String tenant;
        long bucket;

        public TenantBucket(String tenant, long bucket) {
            this.tenant = tenant;
            this.bucket = bucket;
        }

        public String getTenant() {
            return this.tenant;
        }

        public long getBucket() {
            return this.bucket;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TenantBucket that = (TenantBucket)o;
            return Objects.equals(this.bucket, that.bucket) && Objects.equals(this.tenant, that.tenant);
        }

        public int hashCode() {
            return Objects.hash(this.tenant, this.bucket);
        }
    }

    private class DataRetentionsLoadedCallback
    implements FutureCallback<Set<Retention>> {
        private final String tenantId;
        private final MetricType<?> type;
        private final CountDownLatch latch;

        public DataRetentionsLoadedCallback(String tenantId, MetricType<?> type, CountDownLatch latch) {
            this.tenantId = tenantId;
            this.type = type;
            this.latch = latch;
        }

        @Override
        public void onSuccess(Set<Retention> dataRetentionsSet) {
            for (Retention r : dataRetentionsSet) {
                MetricsServiceImpl.this.dataRetentions.put(new DataRetentionKey(r.getId()), r.getValue());
            }
            this.latch.countDown();
        }

        @Override
        public void onFailure(Throwable t) {
            log.warnDataRetentionLoadingFailure(this.tenantId, this.type, t);
            this.latch.countDown();
        }
    }

    private static class DataRetentionKey {
        private final MetricId<?> metricId;

        public DataRetentionKey(String tenantId, MetricType<?> type) {
            this.metricId = new MetricId(tenantId, type, Functions.makeSafe(type.getText()));
        }

        public DataRetentionKey(MetricId<?> metricId) {
            this.metricId = metricId;
        }

        public DataRetentionKey(Metric<?> metric) {
            this.metricId = metric.getMetricId();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            DataRetentionKey that = (DataRetentionKey)o;
            return this.metricId.equals(that.metricId);
        }

        public int hashCode() {
            return this.metricId.hashCode();
        }
    }
}

