/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.spanner.sink;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.gcp.spanner.common.BytesCounter;
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
import io.cdap.plugin.gcp.spanner.sink.SpannerSinkConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;

public class SpannerOutputFormat
extends OutputFormat<NullWritable, Mutation> {
    public static void configure(Configuration configuration, SpannerSinkConfig config, @Nullable Schema schema) {
        String projectId = config.connection.getProject();
        configuration.set("project.id", projectId);
        String serviceAccount = config.connection.getServiceAccount();
        if (serviceAccount != null) {
            String type = config.connection.isServiceAccountFilePath() != false ? "serviceFilePath" : "serviceJson";
            configuration.set("service.account.type", type);
            configuration.set("service.account.path", serviceAccount);
        }
        configuration.set("instance.id", config.getInstance());
        configuration.set("database.name", config.getDatabase());
        configuration.set("table", config.getTable());
        String keys = config.getKeys();
        if (keys != null) {
            configuration.set("keys", config.getKeys());
        }
        configuration.set("spanner.write.batch.size", String.valueOf(config.getBatchSize()));
        if (schema != null) {
            configuration.set("schema", schema.toString());
        }
    }

    public RecordWriter<NullWritable, Mutation> getRecordWriter(TaskAttemptContext context) throws IOException {
        Configuration configuration = context.getConfiguration();
        SpannerUtil.verifyPresenceOrCreateDatabaseAndTable(configuration);
        String projectId = configuration.get("project.id");
        String instanceId = configuration.get("instance.id");
        String database = configuration.get("database.name");
        String serviceAccountType = configuration.get("service.account.type");
        String serviceAccount = configuration.get("service.account.path");
        BytesCounter counter = new BytesCounter();
        Spanner spanner = SpannerUtil.getSpannerServiceWithWriteInterceptor(serviceAccount, "serviceFilePath".equals(serviceAccountType), projectId, counter);
        int batchSize = Integer.parseInt(configuration.get("spanner.write.batch.size"));
        DatabaseId db = DatabaseId.of((String)projectId, (String)instanceId, (String)database);
        DatabaseClient client = spanner.getDatabaseClient(db);
        return new SpannerRecordWriter(spanner, client, batchSize, counter);
    }

    public void checkOutputSpecs(JobContext jobContext) {
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
        return new OutputCommitter(){

            public void setupJob(JobContext jobContext) {
            }

            public void setupTask(TaskAttemptContext taskAttemptContext) {
            }

            public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
                return false;
            }

            public void commitTask(TaskAttemptContext taskAttemptContext) {
            }

            public void abortTask(TaskAttemptContext taskAttemptContext) {
            }
        };
    }

    protected static class SpannerRecordWriter
    extends RecordWriter<NullWritable, Mutation> {
        private final Spanner spanner;
        private final DatabaseClient databaseClient;
        private final List<Mutation> mutations;
        private final int batchSize;
        private final BytesCounter counter;

        public SpannerRecordWriter(Spanner spanner, DatabaseClient client, int batchSize, BytesCounter counter) {
            this.spanner = spanner;
            this.databaseClient = client;
            this.mutations = new ArrayList<Mutation>();
            this.batchSize = batchSize;
            this.counter = counter;
        }

        public void write(NullWritable nullWritable, Mutation mutation) {
            this.mutations.add(mutation);
            if (this.mutations.size() > this.batchSize) {
                this.databaseClient.write(this.mutations);
                this.mutations.clear();
            }
        }

        public void close(TaskAttemptContext taskAttemptContext) {
            try {
                if (this.mutations.size() > 0) {
                    this.databaseClient.write(this.mutations);
                    taskAttemptContext.getCounter((Enum)FileOutputFormatCounter.BYTES_WRITTEN).increment(this.counter.getValue());
                    this.mutations.clear();
                }
            }
            finally {
                this.spanner.close();
            }
        }
    }
}

