/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.ChannelUri;
import io.aeron.Counter;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.cluster.ConsensusModuleAgent;
import io.aeron.cluster.MultipleRecordingReplication;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.client.ClusterException;
import io.aeron.exceptions.AeronException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.agrona.CloseHelper;
import org.agrona.collections.Object2ObjectHashMap;

class StandbySnapshotReplicator
implements AutoCloseable {
    private final int memberId;
    private final AeronArchive archive;
    private final RecordingLog recordingLog;
    private final int serviceCount;
    private final String archiveControlChannel;
    private final int archiveControlStreamId;
    private final String replicationChannel;
    private final int fileSyncLevel;
    private final Counter snapshotCounter;
    private final Object2ObjectHashMap<String, String> errorsByEndpoint = new Object2ObjectHashMap();
    private MultipleRecordingReplication recordingReplication;
    private ArrayList<SnapshotReplicationEntry> snapshotsToReplicate;
    private SnapshotReplicationEntry currentSnapshotToReplicate;
    private boolean isComplete = false;

    StandbySnapshotReplicator(int memberId, AeronArchive archive, RecordingLog recordingLog, int serviceCount, String archiveControlChannel, int archiveControlStreamId, String replicationChannel, int fileSyncLevel, Counter snapshotCounter) {
        this.memberId = memberId;
        this.archive = archive;
        this.recordingLog = recordingLog;
        this.serviceCount = serviceCount;
        this.archiveControlChannel = archiveControlChannel;
        this.archiveControlStreamId = archiveControlStreamId;
        this.replicationChannel = replicationChannel;
        this.fileSyncLevel = fileSyncLevel;
        this.snapshotCounter = snapshotCounter;
    }

    static StandbySnapshotReplicator newInstance(int memberId, AeronArchive.Context archiveCtx, RecordingLog recordingLog, int serviceCount, String archiveControlChannel, int archiveControlStreamId, String replicationChannel, int fileSyncLevel, Counter snapshotCounter) {
        AeronArchive archive = AeronArchive.connect((AeronArchive.Context)archiveCtx.clone().errorHandler(null));
        StandbySnapshotReplicator standbySnapshotReplicator = new StandbySnapshotReplicator(memberId, archive, recordingLog, serviceCount, archiveControlChannel, archiveControlStreamId, replicationChannel, fileSyncLevel, snapshotCounter);
        archive.context().recordingSignalConsumer(standbySnapshotReplicator::onSignal);
        return standbySnapshotReplicator;
    }

    int poll(long nowNs) {
        int workCount = 0;
        if (null == this.recordingReplication) {
            ++workCount;
            if (null == this.snapshotsToReplicate) {
                this.snapshotsToReplicate = this.computeSnapshotsToReplicate();
                if (null == this.snapshotsToReplicate) {
                    this.isComplete = true;
                    return workCount;
                }
            }
            if (this.snapshotsToReplicate.isEmpty()) {
                throw new ClusterException("failed to replicate any standby snapshots, errors: " + String.valueOf(this.errorsByEndpoint), AeronException.Category.WARN);
            }
            this.currentSnapshotToReplicate = this.snapshotsToReplicate.remove(0);
            String srcChannel = ChannelUri.createDestinationUri((String)this.archiveControlChannel, (String)this.currentSnapshotToReplicate.endpoint);
            long progressTimeoutNs = this.archive.context().messageTimeoutNs() * 2L;
            long progressIntervalNs = progressTimeoutNs / 10L;
            this.recordingReplication = MultipleRecordingReplication.newInstance(this.archive, this.archiveControlStreamId, srcChannel, this.replicationChannel, progressTimeoutNs, progressIntervalNs);
            this.recordingReplication.setEventListener(this::logReplicationEnded);
            int n = this.currentSnapshotToReplicate.recordingLogEntries.size();
            for (int i = 0; i < n; ++i) {
                RecordingLog.Entry entry = this.currentSnapshotToReplicate.recordingLogEntries.get(i);
                this.recordingReplication.addRecording(entry.recordingId, -1L, -1L);
            }
            ++workCount;
        }
        try {
            workCount += this.recordingReplication.poll(nowNs);
            this.archive.pollForRecordingSignals();
        }
        catch (ArchiveException | ClusterException ex) {
            this.errorsByEndpoint.put((Object)this.currentSnapshotToReplicate.endpoint, (Object)ex.getMessage());
            CloseHelper.quietClose((AutoCloseable)this.recordingReplication);
            this.recordingReplication = null;
        }
        if (null != this.recordingReplication && this.recordingReplication.isComplete()) {
            int n = this.currentSnapshotToReplicate.recordingLogEntries.size();
            for (int i = 0; i < n; ++i) {
                RecordingLog.Entry entry = this.currentSnapshotToReplicate.recordingLogEntries.get(i);
                long dstRecordingId = this.recordingReplication.completedDstRecordingId(entry.recordingId);
                this.recordingLog.appendSnapshot(dstRecordingId, entry.leadershipTermId, entry.termBaseLogPosition, entry.logPosition, entry.timestamp, entry.serviceId);
            }
            this.recordingLog.force(this.fileSyncLevel);
            this.snapshotCounter.incrementRelease();
            CloseHelper.quietClose((AutoCloseable)this.recordingReplication);
            this.recordingReplication = null;
            this.isComplete = true;
        }
        return workCount;
    }

    private ArrayList<SnapshotReplicationEntry> computeSnapshotsToReplicate() {
        ArrayList<SnapshotReplicationEntry> orderedSnapshotToReplicate;
        Map<String, List<RecordingLog.Entry>> snapshotsByEndpoint = this.filterByExistingRecordingLogEntries(this.recordingLog.latestStandbySnapshots(this.serviceCount));
        if (snapshotsByEndpoint.isEmpty()) {
            orderedSnapshotToReplicate = null;
        } else {
            orderedSnapshotToReplicate = new ArrayList<SnapshotReplicationEntry>();
            snapshotsByEndpoint.forEach((k, v) -> {
                long logPosition = ((RecordingLog.Entry)v.get((int)0)).logPosition;
                orderedSnapshotToReplicate.add(new SnapshotReplicationEntry((String)k, logPosition, (List<RecordingLog.Entry>)v));
            });
            orderedSnapshotToReplicate.sort(StandbySnapshotReplicator::compareTo);
        }
        return orderedSnapshotToReplicate;
    }

    private static int compareTo(SnapshotReplicationEntry a, SnapshotReplicationEntry b) {
        int descendingOrderCompare = -Long.compare(a.logPosition, b.logPosition);
        if (0 != descendingOrderCompare) {
            return descendingOrderCompare;
        }
        return a.endpoint.compareTo(b.endpoint);
    }

    boolean isComplete() {
        return this.isComplete;
    }

    void onSignal(long controlSessionId, long correlationId, long recordingId, long subscriptionId, long position, RecordingSignal signal) {
        if (null != this.recordingReplication) {
            this.recordingReplication.onSignal(correlationId, recordingId, position, signal);
        }
    }

    @Override
    public void close() {
        CloseHelper.quietClose((AutoCloseable)this.archive);
    }

    private void logReplicationEnded(String controlUri, long srcRecordingId, long dstRecordingId, long position, boolean hasSynced) {
        ConsensusModuleAgent.logReplicationEnded(this.memberId, "STANDBY_SNAPSHOT", controlUri, srcRecordingId, dstRecordingId, position, hasSynced);
    }

    private Map<String, List<RecordingLog.Entry>> filterByExistingRecordingLogEntries(Map<String, List<RecordingLog.Entry>> standbySnapshotsByEndpoint) {
        Object2ObjectHashMap filteredSnapshotsByEndpoint = new Object2ObjectHashMap();
        for (Map.Entry<String, List<RecordingLog.Entry>> entry : standbySnapshotsByEndpoint.entrySet()) {
            int i = entry.getValue().size();
            while (--i > -1) {
                RecordingLog.Entry standbySnapshotEntry = entry.getValue().get(i);
                RecordingLog.Entry snapshotEntry = this.recordingLog.getLatestSnapshot(standbySnapshotEntry.serviceId);
                if (null == snapshotEntry || standbySnapshotEntry.logPosition > snapshotEntry.logPosition) continue;
                entry.getValue().remove(i);
            }
            if (entry.getValue().isEmpty()) continue;
            filteredSnapshotsByEndpoint.put(entry.getKey(), entry.getValue());
        }
        return filteredSnapshotsByEndpoint;
    }

    private static final class SnapshotReplicationEntry {
        private final String endpoint;
        private final long logPosition;
        private final List<RecordingLog.Entry> recordingLogEntries = new ArrayList<RecordingLog.Entry>();

        private SnapshotReplicationEntry(String endpoint, long logPosition, List<RecordingLog.Entry> entries) {
            this.endpoint = endpoint;
            this.logPosition = logPosition;
            this.recordingLogEntries.addAll(entries);
        }
    }
}

