/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryChangeStreamAction {
    private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
    private static final Tracer TRACER = Tracing.getTracer();
    private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes((long)5L);
    private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";
    private final ChangeStreamDao changeStreamDao;
    private final PartitionMetadataDao partitionMetadataDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final PartitionMetadataMapper partitionMetadataMapper;
    private final DataChangeRecordAction dataChangeRecordAction;
    private final HeartbeatRecordAction heartbeatRecordAction;
    private final ChildPartitionsRecordAction childPartitionsRecordAction;
    private final ChangeStreamMetrics metrics;

    QueryChangeStreamAction(ChangeStreamDao changeStreamDao, PartitionMetadataDao partitionMetadataDao, ChangeStreamRecordMapper changeStreamRecordMapper, PartitionMetadataMapper partitionMetadataMapper, DataChangeRecordAction dataChangeRecordAction, HeartbeatRecordAction heartbeatRecordAction, ChildPartitionsRecordAction childPartitionsRecordAction, ChangeStreamMetrics metrics) {
        this.changeStreamDao = changeStreamDao;
        this.partitionMetadataDao = partitionMetadataDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.partitionMetadataMapper = partitionMetadataMapper;
        this.dataChangeRecordAction = dataChangeRecordAction;
        this.heartbeatRecordAction = heartbeatRecordAction;
        this.childPartitionsRecordAction = childPartitionsRecordAction;
        this.metrics = metrics;
    }

    /*
     * Exception decompiling
     */
    @VisibleForTesting
    public DoFn.ProcessContinuation run(PartitionMetadata partition, RestrictionTracker<TimestampRange, Timestamp> tracker, DoFn.OutputReceiver<DataChangeRecord> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator, DoFn.BundleFinalizer bundleFinalizer) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[TRYBLOCK]], but top level block is 14[WHILELOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private DoFn.BundleFinalizer.Callback updateWatermarkCallback(String token, WatermarkEstimator<Instant> watermarkEstimator) {
        return () -> {
            Instant watermark = watermarkEstimator.currentWatermark();
            LOG.debug("[" + token + "] Updating current watermark to " + watermark);
            try {
                this.partitionMetadataDao.updateWatermark(token, Timestamp.ofTimeMicroseconds((long)(watermark.getMillis() * 1000L)));
            }
            catch (SpannerException e) {
                if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
                    LOG.debug("[" + token + "] Unable to update the current watermark, partition NOT FOUND");
                }
                LOG.error("[" + token + "] Error updating the current watermark: " + e.getMessage(), (Throwable)e);
            }
        };
    }

    private boolean isTimestampOutOfRange(SpannerException e) {
        return (e.getErrorCode() == ErrorCode.INVALID_ARGUMENT || e.getErrorCode() == ErrorCode.OUT_OF_RANGE) && e.getMessage() != null && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
    }

    private static /* synthetic */ IllegalStateException lambda$run$0(String token) {
        return new IllegalStateException("Partition " + token + " not found in metadata table");
    }

    private static /* synthetic */ /* end resource */ void $closeResource(Throwable x0, AutoCloseable x1) {
        if (x0 != null) {
            try {
                x1.close();
            }
            catch (Throwable throwable) {
                x0.addSuppressed(throwable);
            }
        } else {
            x1.close();
        }
    }
}

