/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.datastore.sink;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.common.collect.MoreCollectors;
import com.google.datastore.v1.AllocateIdsRequest;
import com.google.datastore.v1.AllocateIdsResponse;
import com.google.datastore.v1.BeginTransactionRequest;
import com.google.datastore.v1.BeginTransactionResponse;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import io.cdap.plugin.gcp.datastore.util.DatastoreUtil;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatastoreRecordWriter
extends RecordWriter<NullWritable, Entity> {
    private static final Logger LOG = LoggerFactory.getLogger(DatastoreRecordWriter.class);
    private final Datastore datastore;
    private final int batchSize;
    private final boolean useAutogeneratedKey;
    private final boolean useTransactions;
    private CommitRequest.Builder builder;
    private int totalCount;
    private int numberOfRecordsInBatch;
    private String projectId;
    private Counter counter;
    private Sleeper sleeper;
    private BackOff flushBackoff;

    public DatastoreRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration config = taskAttemptContext.getConfiguration();
        this.projectId = config.get("mapred.gcd.output.project");
        String serviceAccount = config.get("mapred.gcd.output.service.account");
        Boolean isServiceAccountFilePath = config.getBoolean("mapred.gcd.output.service.account.isfilepath", true);
        this.batchSize = config.getInt("mapred.gcd.output.batch.size", 25);
        this.useAutogeneratedKey = config.getBoolean("mapred.gcd.output.use.autogenerated.key", false);
        this.useTransactions = config.getBoolean("mapred.gcd.output.use.transactions", true);
        LOG.debug("Initialize RecordWriter(projectId={}, batchSize={}, useAutogeneratedKey={}, serviceAccount={})", new Object[]{this.projectId, this.batchSize, this.useAutogeneratedKey, serviceAccount});
        this.datastore = DatastoreUtil.getDatastoreV1(serviceAccount, isServiceAccountFilePath, this.projectId);
        this.totalCount = 0;
        this.numberOfRecordsInBatch = 0;
        this.builder = this.newCommitRequest();
        this.counter = taskAttemptContext.getCounter((Enum)FileOutputFormatCounter.BYTES_WRITTEN);
        this.sleeper = Sleeper.DEFAULT;
        this.flushBackoff = new ExponentialBackOff.Builder().setMaxIntervalMillis(12000).setInitialIntervalMillis(1000).setMaxElapsedTimeMillis(60000).setRandomizationFactor(0.25).build();
    }

    private CommitRequest.Builder newCommitRequest() throws IOException {
        CommitRequest.Builder builder = CommitRequest.newBuilder();
        builder.setProjectId(this.projectId);
        if (this.useTransactions) {
            BeginTransactionResponse tres;
            builder.setMode(CommitRequest.Mode.TRANSACTIONAL);
            BeginTransactionRequest.Builder treq = BeginTransactionRequest.newBuilder();
            try {
                tres = this.datastore.beginTransaction(treq.build());
            }
            catch (DatastoreException e) {
                throw new IOException("Failed to begin datastore transaction", e);
            }
            ByteString tx = tres.getTransaction();
            builder.setTransaction(tx);
        } else {
            builder.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
        }
        return builder;
    }

    public void write(NullWritable key, Entity entity) throws IOException, InterruptedException {
        LOG.trace("RecordWriter write({})", (Object)entity);
        if (this.useAutogeneratedKey) {
            AllocateIdsResponse response;
            AllocateIdsRequest request = AllocateIdsRequest.newBuilder().setProjectId(this.projectId).addKeys(entity.getKey()).build();
            try {
                response = this.datastore.allocateIds(request);
            }
            catch (DatastoreException e) {
                throw new IOException("Failed to allocate id", e);
            }
            Entity fullEntity = Entity.newBuilder().setKey((Key)response.getKeysList().stream().collect(MoreCollectors.onlyElement())).putAllProperties(entity.getPropertiesMap()).build();
            this.builder.addMutations(DatastoreHelper.makeInsert((Entity)fullEntity).build());
        } else {
            this.builder.addMutations(DatastoreHelper.makeUpsert((Entity)entity).build());
        }
        ++this.totalCount;
        ++this.numberOfRecordsInBatch;
        if (this.totalCount % this.batchSize == 0) {
            this.flush();
        }
    }

    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.flush();
        LOG.debug("Total number of values written to Cloud Datastore: {}", (Object)this.totalCount);
    }

    private void flush() throws IOException, InterruptedException {
        if (this.numberOfRecordsInBatch > 0) {
            LOG.debug("Writing a batch of {} values to Cloud Datastore.", (Object)this.numberOfRecordsInBatch);
            while (true) {
                try {
                    this.flushInternal();
                }
                catch (DatastoreException e) {
                    long backoff = this.flushBackoff.nextBackOffMillis();
                    if (backoff != -1L && this.isRetryable(e)) {
                        LOG.warn("Retrying flush after {} ms", (Object)backoff);
                        this.sleeper.sleep(backoff);
                        continue;
                    }
                    LOG.error("Datastore commit failed with code {}: {}", (Object)e.getCode(), (Object)e.toString());
                    throw new IOException("Datastore commit failed", e);
                }
                break;
            }
            this.builder = this.newCommitRequest();
            this.numberOfRecordsInBatch = 0;
            this.flushBackoff.reset();
        }
    }

    private void flushInternal() throws DatastoreException {
        CommitRequest request = this.builder.build();
        this.datastore.commit(request);
        this.counter.increment((long)request.getSerializedSize());
    }

    private boolean isRetryable(DatastoreException e) {
        return e.getCode() == Code.ABORTED || e.getCode() == Code.DEADLINE_EXCEEDED || e.getCode() == Code.RESOURCE_EXHAUSTED || e.getCode() == Code.UNAVAILABLE;
    }
}

