/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.metrics.MetricFiltering;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.metrics.MetricKey;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.DistributionData;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.GaugeData;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.MetricUpdates;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.MetricsMap;
import org.apache.beam.runners.direct.portable.AutoValue_DirectMetrics_DirectMetricQueryResults;
import org.apache.beam.runners.direct.portable.AutoValue_DirectMetrics_DirectMetricResult;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;

class DirectMetrics
extends MetricResults {
    private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setDaemon(true).setNameFormat("direct-metrics-counter-committer").build());
    private static final MetricAggregation<Long, Long> COUNTER = new MetricAggregation<Long, Long>(){

        @Override
        public Long zero() {
            return 0L;
        }

        @Override
        public Long combine(Iterable<Long> updates) {
            long value = 0L;
            for (long update : updates) {
                value += update;
            }
            return value;
        }

        @Override
        public Long extract(Long data) {
            return data;
        }
    };
    private static final MetricAggregation<DistributionData, DistributionResult> DISTRIBUTION = new MetricAggregation<DistributionData, DistributionResult>(){

        @Override
        public DistributionData zero() {
            return DistributionData.EMPTY;
        }

        @Override
        public DistributionData combine(Iterable<DistributionData> updates) {
            DistributionData result = DistributionData.EMPTY;
            for (DistributionData update : updates) {
                result = result.combine(update);
            }
            return result;
        }

        @Override
        public DistributionResult extract(DistributionData data) {
            return data.extractResult();
        }
    };
    private static final MetricAggregation<GaugeData, GaugeResult> GAUGE = new MetricAggregation<GaugeData, GaugeResult>(){

        @Override
        public GaugeData zero() {
            return GaugeData.empty();
        }

        @Override
        public GaugeData combine(Iterable<GaugeData> updates) {
            GaugeData result = GaugeData.empty();
            for (GaugeData update : updates) {
                result = result.combine(update);
            }
            return result;
        }

        @Override
        public GaugeResult extract(GaugeData data) {
            return data.extractResult();
        }
    };
    private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters = new MetricsMap<MetricKey, DirectMetric>(unusedKey -> new DirectMetric<Long, Long>(COUNTER));
    private MetricsMap<MetricKey, DirectMetric<DistributionData, DistributionResult>> distributions = new MetricsMap<MetricKey, DirectMetric>(unusedKey -> new DirectMetric<DistributionData, DistributionResult>(DISTRIBUTION));
    private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges = new MetricsMap<MetricKey, DirectMetric>(unusedKey -> new DirectMetric<GaugeData, GaugeResult>(GAUGE));

    DirectMetrics() {
    }

    public MetricQueryResults queryMetrics(MetricsFilter filter) {
        ImmutableList.Builder counterResults = ImmutableList.builder();
        for (Map.Entry<MetricKey, DirectMetric<Long, Long>> entry : this.counters.entries()) {
            this.maybeExtractResult(filter, counterResults, entry);
        }
        ImmutableList.Builder distributionResults = ImmutableList.builder();
        for (Map.Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> entry : this.distributions.entries()) {
            this.maybeExtractResult(filter, distributionResults, entry);
        }
        ImmutableList.Builder builder = ImmutableList.builder();
        for (Map.Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge : this.gauges.entries()) {
            this.maybeExtractResult(filter, builder, gauge);
        }
        return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build(), builder.build());
    }

    private <ResultT> void maybeExtractResult(MetricsFilter filter, ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder, Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
        if (MetricFiltering.matches(filter, entry.getKey())) {
            resultsBuilder.add((Object)DirectMetricResult.create(entry.getKey().metricName(), entry.getKey().stepName(), entry.getValue().extractCommitted(), entry.getValue().extractLatestAttempted()));
        }
    }

    public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
        for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates.counterUpdates()) {
            this.counters.get(metricUpdate.getKey()).updatePhysical(bundle, metricUpdate.getUpdate());
        }
        for (MetricUpdates.MetricUpdate<Serializable> metricUpdate : updates.distributionUpdates()) {
            this.distributions.get(metricUpdate.getKey()).updatePhysical(bundle, (DistributionData)metricUpdate.getUpdate());
        }
        for (MetricUpdates.MetricUpdate<Serializable> metricUpdate : updates.gaugeUpdates()) {
            this.gauges.get(metricUpdate.getKey()).updatePhysical(bundle, (GaugeData)metricUpdate.getUpdate());
        }
    }

    public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
        for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates.counterUpdates()) {
            this.counters.get(metricUpdate.getKey()).commitPhysical(bundle, metricUpdate.getUpdate());
        }
        for (MetricUpdates.MetricUpdate<Serializable> metricUpdate : updates.distributionUpdates()) {
            this.distributions.get(metricUpdate.getKey()).commitPhysical(bundle, (DistributionData)metricUpdate.getUpdate());
        }
        for (MetricUpdates.MetricUpdate<Serializable> metricUpdate : updates.gaugeUpdates()) {
            this.gauges.get(metricUpdate.getKey()).commitPhysical(bundle, (GaugeData)metricUpdate.getUpdate());
        }
    }

    public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) {
        for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates.counterUpdates()) {
            this.counters.get(metricUpdate.getKey()).commitLogical(bundle, metricUpdate.getUpdate());
        }
        for (MetricUpdates.MetricUpdate<Serializable> metricUpdate : updates.distributionUpdates()) {
            this.distributions.get(metricUpdate.getKey()).commitLogical(bundle, (DistributionData)metricUpdate.getUpdate());
        }
        for (MetricUpdates.MetricUpdate<Serializable> metricUpdate : updates.gaugeUpdates()) {
            this.gauges.get(metricUpdate.getKey()).commitLogical(bundle, (GaugeData)metricUpdate.getUpdate());
        }
    }

    @AutoValue
    static abstract class DirectMetricResult<T>
    implements MetricResult<T> {
        DirectMetricResult() {
        }

        public abstract MetricName getName();

        public abstract String getStep();

        public abstract T getCommitted();

        public abstract T getAttempted();

        public static <T> MetricResult<T> create(MetricName name, String scope, T committed, T attempted) {
            return new AutoValue_DirectMetrics_DirectMetricResult<T>(name, scope, committed, attempted);
        }
    }

    @AutoValue
    static abstract class DirectMetricQueryResults
    implements MetricQueryResults {
        DirectMetricQueryResults() {
        }

        public static MetricQueryResults create(Iterable<MetricResult<Long>> counters, Iterable<MetricResult<DistributionResult>> distributions, Iterable<MetricResult<GaugeResult>> gauges) {
            return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions, gauges);
        }
    }

    private static class DirectMetric<UpdateT, ResultT> {
        private final MetricAggregation<UpdateT, ResultT> aggregation;
        private final AtomicReference<UpdateT> finishedCommitted;
        private final Object attemptedLock = new Object();
        @GuardedBy(value="attemptedLock")
        private volatile UpdateT finishedAttempted;
        private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted = new ConcurrentHashMap();

        public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
            this.aggregation = aggregation;
            this.finishedCommitted = new AtomicReference<UpdateT>(aggregation.zero());
            this.finishedAttempted = aggregation.zero();
        }

        public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) {
            this.inflightAttempted.put(bundle, tentativeCumulative);
        }

        public void commitPhysical(CommittedBundle<?> bundle, UpdateT finalCumulative) {
            this.inflightAttempted.put(bundle, finalCumulative);
            COUNTER_COMMITTER.submit(() -> {
                Object object = this.attemptedLock;
                synchronized (object) {
                    this.finishedAttempted = this.aggregation.combine(Arrays.asList(this.finishedAttempted, finalCumulative));
                    this.inflightAttempted.remove(bundle);
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ResultT extractLatestAttempted() {
            ArrayList<Object> updates = new ArrayList<Object>(this.inflightAttempted.size() + 1);
            Object object = this.attemptedLock;
            synchronized (object) {
                updates.add(this.finishedAttempted);
                updates.addAll(this.inflightAttempted.values());
            }
            return this.aggregation.extract(this.aggregation.combine(updates));
        }

        public void commitLogical(CommittedBundle<?> bundle, UpdateT finalCumulative) {
            UpdateT current;
            do {
                current = this.finishedCommitted.get();
            } while (!this.finishedCommitted.compareAndSet(current, this.aggregation.combine(Arrays.asList(current, finalCumulative))));
        }

        public ResultT extractCommitted() {
            return this.aggregation.extract(this.finishedCommitted.get());
        }
    }

    private static interface MetricAggregation<UpdateT, ResultT> {
        public UpdateT zero();

        public UpdateT combine(Iterable<UpdateT> var1);

        public ResultT extract(UpdateT var1);
    }
}

