/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals.metrics;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;

public class KafkaMetricRecorder {
    public static final String SOURCE_METRIC_GROUP = "source";
    public static final String SINK_METRIC_GROUP = "sink";
    public static final long INVALID = -1L;
    private final OperatorMetricGroup operatorMetricGroup;
    private Counter numBytesIn;
    private Counter numRecordsIn;
    private final SimpleGauge<Long> currentFetchEventTimeLag = new SimpleGauge<Long>(-1L);
    private final SimpleGauge<Long> currentEmitEventTimeLag = new SimpleGauge<Long>(-1L);
    private final CurrentTimeLagGauge watermarkLag = new CurrentTimeLagGauge();
    private final CurrentTimeLagGauge sourceIdleTime = new CurrentTimeLagGauge();
    private Gauge<Long> pendingRecords;
    private Counter numBytesOut;
    private Counter numRecordsOut;
    private Counter numRecordsOutErrors;
    private final SimpleGauge<Long> currentSendTime = new SimpleGauge<Long>(-1L);

    public KafkaMetricRecorder(OperatorMetricGroup operatorMetricGroup) {
        this.operatorMetricGroup = operatorMetricGroup;
    }

    public void registerSourceMetrics() {
        this.numBytesIn = this.operatorMetricGroup.getIOMetricGroup().getNumBytesInCounter();
        MetricGroup sourceMetricGroup = this.operatorMetricGroup.addGroup(SOURCE_METRIC_GROUP);
        sourceMetricGroup.counter("numBytesIn", this.numBytesIn);
        sourceMetricGroup.meter("numBytesInPerSecond", (Meter)new MeterView(this.numBytesIn));
        this.numRecordsIn = this.operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        sourceMetricGroup.counter("numRecordsIn", this.numRecordsIn);
        sourceMetricGroup.meter("numRecordsInPerSecond", (Meter)new MeterView(this.numRecordsIn));
        this.operatorMetricGroup.gauge("currentFetchEventTimeLag", this.currentFetchEventTimeLag);
        this.operatorMetricGroup.gauge("currentEmitEventTimeLag", this.currentEmitEventTimeLag);
        this.operatorMetricGroup.gauge("watermarkLag", (Gauge)this.watermarkLag);
        this.operatorMetricGroup.gauge("sourceIdleTime", (Gauge)this.sourceIdleTime);
    }

    public void registerSourcePendingRecords(Gauge<Long> pendingRecords) {
        this.pendingRecords = pendingRecords;
        this.operatorMetricGroup.gauge("pendingRecords", pendingRecords);
    }

    public Counter getNumBytesInCounter() {
        return this.numBytesIn;
    }

    public Counter getNumRecordsInCounter() {
        return this.numRecordsIn;
    }

    public SimpleGauge<Long> getCurrentFetchEventTimeLag() {
        return this.currentFetchEventTimeLag;
    }

    public SimpleGauge<Long> getCurrentEmitEventTimeLag() {
        return this.currentEmitEventTimeLag;
    }

    public CurrentTimeLagGauge getWatermarkLag() {
        return this.watermarkLag;
    }

    public CurrentTimeLagGauge getSourceIdleTime() {
        return this.sourceIdleTime;
    }

    public Gauge<Long> getPendingRecords() {
        return this.pendingRecords;
    }

    public void registerSinkMetrics() {
        this.numBytesOut = this.operatorMetricGroup.getIOMetricGroup().getNumBytesOutCounter();
        MetricGroup sinkMetricGroup = this.operatorMetricGroup.addGroup(SINK_METRIC_GROUP);
        sinkMetricGroup.counter("numBytesOut", this.numBytesOut);
        sinkMetricGroup.meter("numBytesOutPerSecond", (Meter)new MeterView(this.numBytesOut));
        this.numRecordsOut = this.operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        sinkMetricGroup.counter("numRecordsOut", this.numRecordsOut);
        sinkMetricGroup.meter("numRecordsOutPerSecond", (Meter)new MeterView(this.numRecordsOut));
        this.numRecordsOutErrors = this.operatorMetricGroup.counter("numRecordsOutErrors");
        this.operatorMetricGroup.gauge("currentSendTime", this.currentSendTime);
    }

    public Counter getNumBytesOutCounter() {
        return this.numBytesOut;
    }

    public Counter getNumRecordsOutCounter() {
        return this.numRecordsOut;
    }

    public Counter getNumRecordsOutErrors() {
        return this.numRecordsOutErrors;
    }

    public SimpleGauge<Long> getCurrentSendTime() {
        return this.currentSendTime;
    }

    public static class SimpleGauge<T>
    implements Gauge<T> {
        private T value;

        public SimpleGauge(T initialValue) {
            this.value = initialValue;
        }

        public void update(T value) {
            this.value = value;
        }

        public T getValue() {
            return this.value;
        }
    }

    public static class CurrentTimeLagGauge
    implements Gauge<Long> {
        public Long timestamp = -1L;

        public void update(Long value) {
            this.timestamp = value;
        }

        public Long getValue() {
            if (this.timestamp == -1L) {
                return -1L;
            }
            return System.currentTimeMillis() - this.timestamp;
        }
    }
}

