/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.connect.channel;

import java.time.OffsetDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.channel.Envelope;
import org.apache.iceberg.connect.events.DataComplete;
import org.apache.iceberg.connect.events.DataWritten;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CommitState {
    private static final Logger LOG = LoggerFactory.getLogger(CommitState.class);
    private final List<Envelope> commitBuffer = Lists.newArrayList();
    private final List<DataComplete> readyBuffer = Lists.newArrayList();
    private long startTime;
    private UUID currentCommitId;
    private final IcebergSinkConfig config;

    CommitState(IcebergSinkConfig config) {
        this.config = config;
    }

    void addResponse(Envelope envelope) {
        this.commitBuffer.add(envelope);
        if (!this.isCommitInProgress()) {
            DataWritten dataWritten = (DataWritten)envelope.event().payload();
            LOG.warn("Received commit response when no commit in progress, this can happen during recovery. Commit ID: {}", (Object)dataWritten.commitId());
        }
    }

    void addReady(Envelope envelope) {
        DataComplete dataComplete = (DataComplete)envelope.event().payload();
        this.readyBuffer.add(dataComplete);
        if (!this.isCommitInProgress()) {
            LOG.warn("Received commit ready when no commit in progress, this can happen during recovery. Commit ID: {}", (Object)dataComplete.commitId());
        }
    }

    UUID currentCommitId() {
        return this.currentCommitId;
    }

    boolean isCommitInProgress() {
        return this.currentCommitId != null;
    }

    boolean isCommitIntervalReached() {
        if (this.startTime == 0L) {
            this.startTime = System.currentTimeMillis();
        }
        return !this.isCommitInProgress() && System.currentTimeMillis() - this.startTime >= (long)this.config.commitIntervalMs();
    }

    void startNewCommit() {
        this.currentCommitId = UUID.randomUUID();
        this.startTime = System.currentTimeMillis();
    }

    void endCurrentCommit() {
        this.readyBuffer.clear();
        this.currentCommitId = null;
    }

    void clearResponses() {
        this.commitBuffer.clear();
    }

    boolean isCommitTimedOut() {
        if (!this.isCommitInProgress()) {
            return false;
        }
        if (System.currentTimeMillis() - this.startTime > (long)this.config.commitTimeoutMs()) {
            LOG.info("Commit timeout reached. Commit ID: {}", (Object)this.currentCommitId);
            return true;
        }
        return false;
    }

    boolean isCommitReady(int expectedPartitionCount) {
        if (!this.isCommitInProgress()) {
            return false;
        }
        int receivedPartitionCount = this.readyBuffer.stream().filter(payload -> payload.commitId().equals(this.currentCommitId)).mapToInt(payload -> payload.assignments().size()).sum();
        if (receivedPartitionCount >= expectedPartitionCount) {
            LOG.info("Commit {} ready, received responses for all {} partitions", (Object)this.currentCommitId, (Object)receivedPartitionCount);
            return true;
        }
        LOG.info("Commit {} not ready, received responses for {} of {} partitions, waiting for more", new Object[]{this.currentCommitId, receivedPartitionCount, expectedPartitionCount});
        return false;
    }

    Map<TableReference, List<Envelope>> tableCommitMap() {
        return this.commitBuffer.stream().collect(Collectors.groupingBy(envelope -> ((DataWritten)envelope.event().payload()).tableReference()));
    }

    OffsetDateTime validThroughTs(boolean partialCommit) {
        boolean hasValidThroughTs = !partialCommit && this.readyBuffer.stream().flatMap(event -> event.assignments().stream()).allMatch(offset -> offset.timestamp() != null);
        OffsetDateTime result = hasValidThroughTs ? (OffsetDateTime)this.readyBuffer.stream().flatMap(event -> event.assignments().stream()).map(TopicPartitionOffset::timestamp).min(Comparator.naturalOrder()).orElse(null) : null;
        return result;
    }
}

