/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.source.split.assigner;

import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumState;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoverer;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoveryScheduler;
import com.google.cloud.flink.bigquery.source.split.assigner.BigQuerySourceSplitAssigner;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class UnboundedSplitAssigner
extends BigQuerySourceSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSplitAssigner.class);
    private final SplitDiscoveryScheduler observer;

    UnboundedSplitAssigner(SplitDiscoveryScheduler observer, BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) {
        super(readOptions, sourceEnumState);
        this.observer = observer;
    }

    @VisibleForTesting
    DiscoveryResult discoverNewSplits() {
        LOG.info("Searching for new data to read.");
        BigQueryConnectOptions options = this.readOptions.getBigQueryConnectOptions();
        try {
            BigQueryServices.QueryDataClient client = BigQueryServicesFactory.instance((BigQueryConnectOptions)options).queryClient();
            List newPartitions = client.retrievePartitionsStatus(options.getProjectId(), options.getDataset(), options.getTable());
            LOG.info("Table partitions and their status: {}", (Object)newPartitions);
            LOG.info("Already seen partition ids: {}", (Object)this.lastSeenPartitions);
            newPartitions = newPartitions.stream().filter(pIdStatus -> this.readOptions.getOldestPartitionId().map(oldestPartitionId -> pIdStatus.getPartitionId().compareTo((String)oldestPartitionId) >= 0).orElse(true)).filter(pIdStatus -> pIdStatus.isCompleted()).filter(pIdStatus -> !this.lastSeenPartitions.contains(pIdStatus.getPartitionId())).collect(Collectors.toList());
            return new DiscoveryResult(newPartitions.stream().map(p -> p.getPartitionId()).collect(Collectors.toList()), newPartitions.stream().map(pId -> BigQueryPartitionUtils.formatPartitionRestrictionBasedOnInfo(Optional.of(pId.getInfo()), (String)pId.getInfo().getColumnName(), (String)((String)BigQueryPartitionUtils.partitionValuesFromIdAndDataType(Arrays.asList(pId.getPartitionId()), (StandardSQLTypeName)pId.getInfo().getColumnType()).get(0)))).flatMap(restriction -> SplitDiscoverer.discoverSplits((BigQueryConnectOptions)options, (DataFormat)DataFormat.AVRO, (List)this.readOptions.getColumnNames(), (String)this.combineRestrictions(this.readOptions.getRowRestriction(), (String)restriction), (Optional)this.readOptions.getSnapshotTimestampInMillis(), (Integer)this.readOptions.getMaxStreamCount()).stream()).collect(Collectors.toList()));
        }
        catch (Exception ex) {
            throw new RuntimeException("Problems while trying to discover new splits.", ex);
        }
    }

    @VisibleForTesting
    String combineRestrictions(String existing, String newRestriction) {
        if (existing.trim().isEmpty()) {
            return newRestriction;
        }
        return existing + " AND " + newRestriction;
    }

    @VisibleForTesting
    void handlePartitionSplitDiscovery(DiscoveryResult discovery, Throwable t) {
        if (t != null && this.remainingTableStreams.isEmpty()) {
            throw new RuntimeException(t);
        }
        if (t != null) {
            LOG.error("Failed to poll for new read streams, continuing", t);
            return;
        }
        if (discovery.getReadStreams().isEmpty() && discovery.getNewPartitions().isEmpty()) {
            LOG.info("No new partitions for now.");
            return;
        }
        LOG.info("Discovered new partitions: {}", discovery.getNewPartitions());
        LOG.info("Discovered new read streams: {}", discovery.getReadStreams());
        this.lastSeenPartitions.addAll(discovery.getNewPartitions());
        this.remainingTableStreams.addAll(discovery.getReadStreams());
        this.observer.notifySplits();
    }

    @Override
    public void discoverSplits() {
        this.observer.schedule(this::discoverNewSplits, this::handlePartitionSplitDiscovery, 0L, Duration.ofMinutes(this.readOptions.getPartitionDiscoveryRefreshIntervalInMinutes().intValue()).toMillis());
    }

    @Override
    public boolean noMoreSplits() {
        return false;
    }

    static class DiscoveryResult {
        private final List<String> newPartitions;
        private final List<String> readStreams;

        public DiscoveryResult(List<String> newPartitions, List<String> newStreams) {
            this.newPartitions = newPartitions;
            this.readStreams = newStreams;
        }

        public List<String> getNewPartitions() {
            return this.newPartitions;
        }

        public List<String> getReadStreams() {
            return this.readStreams;
        }
    }
}

