/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.monitoring.Metrics;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.InflightMessageList;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.row.RowMap;
import java.util.concurrent.TimeUnit;

public abstract class AbstractAsyncProducer
extends AbstractProducer {
    private InflightMessageList inflightMessages;

    public AbstractAsyncProducer(MaxwellContext context) {
        super(context);
        this.inflightMessages = new InflightMessageList(context);
        Metrics metrics = context.getMetrics();
        String gaugeName = metrics.metricName("inflightmessages", "count");
        metrics.register(gaugeName, () -> this.inflightMessages.size());
    }

    public abstract void sendAsync(RowMap var1, CallbackCompleter var2) throws Exception;

    @Override
    public final void push(RowMap r) throws Exception {
        Position position = r.getNextPosition();
        if (!r.shouldOutput(this.outputConfig)) {
            if (position != null) {
                this.inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);
                InflightMessageList.InflightMessage completed = this.inflightMessages.completeMessage(position);
                if (completed != null) {
                    this.context.setPosition(completed.position);
                }
            }
            return;
        }
        long messageID = this.inflightMessages.waitForSlot();
        if (r.isTXCommit()) {
            this.inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
        }
        CallbackCompleter cc = new CallbackCompleter(this.inflightMessages, position, r.isTXCommit(), this.context, messageID);
        this.sendAsync(r, cc);
    }

    public class CallbackCompleter {
        private InflightMessageList inflightMessages;
        private final MaxwellContext context;
        private final int metricsAgeSloMs;
        private final Position position;
        private final boolean isTXCommit;
        private final long messageID;

        public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {
            this.inflightMessages = inflightMessages;
            this.context = context;
            this.metricsAgeSloMs = context.getConfig().metricsAgeSlo * 1000;
            this.position = position;
            this.isTXCommit = isTXCommit;
            this.messageID = messageID;
        }

        public void markCompleted() {
            InflightMessageList.InflightMessage message;
            this.inflightMessages.freeSlot(this.messageID);
            if (this.isTXCommit && (message = this.inflightMessages.completeMessage(this.position)) != null) {
                this.context.setPosition(message.position);
                long currentTime = System.currentTimeMillis();
                long endToEndLatency = currentTime - message.eventTimeMS;
                AbstractAsyncProducer.this.messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
                AbstractAsyncProducer.this.messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);
                if (endToEndLatency > (long)this.metricsAgeSloMs) {
                    AbstractAsyncProducer.this.messageLatencySloViolationCount.inc();
                }
            }
        }
    }
}

