/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.sink.committer;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import org.apache.flink.api.connector.sink2.Committer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryCommitter
implements Committer<BigQueryCommittable>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryCommitter.class);
    private final BigQueryConnectOptions connectOptions;

    public BigQueryCommitter(BigQueryConnectOptions connectOptions) {
        this.connectOptions = connectOptions;
    }

    public void commit(Collection<Committer.CommitRequest<BigQueryCommittable>> commitRequests) {
        if (commitRequests.isEmpty()) {
            LOG.info("No committable found. Nothing to commit!");
            return;
        }
        try (BigQueryServices.StorageWriteClient writeClient = BigQueryServicesFactory.instance((BigQueryConnectOptions)this.connectOptions).storageWrite();){
            for (Committer.CommitRequest<BigQueryCommittable> commitRequest : commitRequests) {
                BigQueryCommittable committable = (BigQueryCommittable)commitRequest.getCommittable();
                long producerId = committable.getProducerId();
                String streamName = committable.getStreamName();
                long streamOffset = committable.getStreamOffset();
                LOG.info("Committing records appended by producer {}", (Object)producerId);
                LOG.debug("Invoking flushRows API on stream {} till offset {}", (Object)streamName, (Object)streamOffset);
                FlushRowsResponse response = writeClient.flushRows(streamName, streamOffset);
                if (response.getOffset() == streamOffset) continue;
                LOG.error("BigQuery FlushRows API failed. Returned offset {}, expected {}", (Object)response.getOffset(), (Object)streamOffset);
                throw new BigQueryConnectorException(String.format("Commit operation failed for producer %d", producerId));
            }
        }
        catch (ApiException | IOException e) {
            throw new BigQueryConnectorException("Commit operation failed", e);
        }
    }

    @Override
    public void close() {
    }
}

