/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.beam;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.beam.InitialOffsetReader;
import com.google.cloud.pubsublite.beam.OffsetByteProgress;
import com.google.cloud.pubsublite.beam.SubscriptionPartition;
import com.google.cloud.pubsublite.beam.SubscriptionPartitionProcessor;
import com.google.cloud.pubsublite.beam.SubscriptionPartitionProcessorFactory;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.joda.time.Duration;
import org.joda.time.Instant;

class PerSubscriptionPartitionSdf
extends DoFn<SubscriptionPartition, SequencedMessage> {
    private final Duration maxSleepTime;
    private final SubscriptionPartitionProcessorFactory processorFactory;
    private final SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
    private final SerializableBiFunction<SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>> trackerFactory;
    private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;

    PerSubscriptionPartitionSdf(Duration maxSleepTime, SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory, SerializableBiFunction<SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>> trackerFactory, SubscriptionPartitionProcessorFactory processorFactory, SerializableFunction<SubscriptionPartition, Committer> committerFactory) {
        this.maxSleepTime = maxSleepTime;
        this.processorFactory = processorFactory;
        this.offsetReaderFactory = offsetReaderFactory;
        this.trackerFactory = trackerFactory;
        this.committerFactory = committerFactory;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkState() {
        return Instant.EPOCH;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimators.MonotonicallyIncreasing newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant state) {
        return new WatermarkEstimators.MonotonicallyIncreasing(state);
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(RestrictionTracker<OffsetRange, OffsetByteProgress> tracker, @DoFn.Element SubscriptionPartition subscriptionPartition, DoFn.OutputReceiver<SequencedMessage> receiver) throws Exception {
        try (SubscriptionPartitionProcessor processor = this.processorFactory.newProcessor(subscriptionPartition, tracker, receiver);){
            processor.start();
            DoFn.ProcessContinuation result = processor.waitForCompletion(this.maxSleepTime);
            processor.lastClaimed().ifPresent(lastClaimedOffset -> {
                Committer committer = (Committer)this.committerFactory.apply((Object)subscriptionPartition);
                committer.startAsync().awaitRunning();
                try {
                    committer.commitOffset(Offset.of((long)(lastClaimedOffset.value() + 1L))).get();
                }
                catch (Exception e) {
                    throw ExtractStatus.toCanonical((Throwable)e).underlying;
                }
                committer.stopAsync().awaitTerminated();
            });
            DoFn.ProcessContinuation processContinuation = result;
            return processContinuation;
        }
    }

    @DoFn.GetInitialRestriction
    public OffsetRange getInitialRestriction(@DoFn.Element SubscriptionPartition subscriptionPartition) {
        try (InitialOffsetReader reader = (InitialOffsetReader)this.offsetReaderFactory.apply((Object)subscriptionPartition);){
            Offset offset = reader.read();
            OffsetRange offsetRange = new OffsetRange(offset.value(), Long.MAX_VALUE);
            return offsetRange;
        }
    }

    @DoFn.NewTracker
    public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(@DoFn.Element SubscriptionPartition subscriptionPartition, @DoFn.Restriction OffsetRange range) {
        return (RestrictionTracker)this.trackerFactory.apply((Object)subscriptionPartition, (Object)range);
    }
}

