/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.metrics.MetricsContext;

@Internal
public class ReaderMetricsContext
implements MetricsContext {
    public static final String ASSIGNED_SPLITS = "assignedSplits";
    public static final String ASSIGNED_BYTES = "assignedBytes";
    public static final String FINISHED_SPLITS = "finishedSplits";
    public static final String FINISHED_BYTES = "finishedBytes";
    public static final String SPLIT_READER_FETCH_CALLS = "splitReaderFetchCalls";
    private final AtomicLong assignedSplits;
    private final AtomicLong assignedBytes;
    private final AtomicLong finishedSplits;
    private final AtomicLong finishedBytes;
    private final AtomicLong splitReaderFetchCalls;

    public ReaderMetricsContext(MetricGroup metricGroup) {
        MetricGroup readerMetricGroup = metricGroup.addGroup("IcebergSourceReader");
        this.assignedSplits = new AtomicLong();
        this.assignedBytes = new AtomicLong();
        this.finishedSplits = new AtomicLong();
        this.finishedBytes = new AtomicLong();
        this.splitReaderFetchCalls = new AtomicLong();
        readerMetricGroup.gauge(ASSIGNED_SPLITS, this.assignedSplits::get);
        readerMetricGroup.gauge(ASSIGNED_BYTES, this.assignedBytes::get);
        readerMetricGroup.gauge(FINISHED_SPLITS, this.finishedSplits::get);
        readerMetricGroup.gauge(FINISHED_BYTES, this.finishedBytes::get);
        readerMetricGroup.gauge(SPLIT_READER_FETCH_CALLS, this.splitReaderFetchCalls::get);
    }

    @Override
    public <T extends Number> MetricsContext.Counter<T> counter(String name, Class<T> type, MetricsContext.Unit unit) {
        switch (name) {
            case "assignedSplits": {
                ValidationException.check(type == Long.class, "'%s' requires Long type", ASSIGNED_SPLITS);
                return this.longCounter(this.assignedSplits::addAndGet);
            }
            case "assignedBytes": {
                ValidationException.check(type == Long.class, "'%s' requires Integer type", ASSIGNED_BYTES);
                return this.longCounter(this.assignedBytes::addAndGet);
            }
            case "finishedSplits": {
                ValidationException.check(type == Long.class, "'%s' requires Long type", FINISHED_SPLITS);
                return this.longCounter(this.finishedSplits::addAndGet);
            }
            case "finishedBytes": {
                ValidationException.check(type == Long.class, "'%s' requires Integer type", FINISHED_BYTES);
                return this.longCounter(this.finishedBytes::addAndGet);
            }
            case "splitReaderFetchCalls": {
                ValidationException.check(type == Long.class, "'%s' requires Integer type", SPLIT_READER_FETCH_CALLS);
                return this.longCounter(this.splitReaderFetchCalls::addAndGet);
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported counter: '%s'", name));
    }

    private MetricsContext.Counter<Long> longCounter(final Consumer<Long> consumer) {
        return new MetricsContext.Counter<Long>(){

            @Override
            public void increment() {
                this.increment(1L);
            }

            @Override
            public void increment(Long amount) {
                consumer.accept(amount);
            }
        };
    }
}

