/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gora.flink;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.apache.gora.mapreduce.GoraInputFormat;
import org.apache.gora.mapreduce.GoraOutputFormat;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

public class GoraFlinkEngine<KeyIn, ValueIn extends PersistentBase, KeyOut, ValueOut extends PersistentBase> {
    private Class<KeyIn> classKeyIn;
    private Class<ValueIn> classValueIn;
    private Class<KeyOut> classKeyOut;
    private Class<ValueOut> classValueOut;

    public GoraFlinkEngine(Class<KeyIn> classKeyIn, Class<ValueIn> classValueIn) {
        this.classKeyIn = classKeyIn;
        this.classValueIn = classValueIn;
    }

    public GoraFlinkEngine(Class<KeyIn> classKeyIn, Class<ValueIn> classValueIn, Class<KeyOut> classKeyOut, Class<ValueOut> classValueOut) {
        this.classKeyIn = classKeyIn;
        this.classValueIn = classValueIn;
        this.classKeyOut = classKeyOut;
        this.classValueOut = classValueOut;
    }

    public DataSource<Tuple2<KeyIn, ValueIn>> createDataSource(ExecutionEnvironment env, Configuration conf, Class<? extends DataStore<KeyIn, ValueIn>> dataStoreClass) throws IOException {
        Preconditions.checkNotNull(this.classKeyIn);
        Preconditions.checkNotNull(this.classValueIn);
        Job job = Job.getInstance((Configuration)conf);
        DataStore<KeyIn, ValueIn> dataStore = DataStoreFactory.getDataStore(dataStoreClass, this.classKeyIn, this.classValueIn, job.getConfiguration());
        GoraInputFormat.setInput(job, dataStore.newQuery(), true);
        HadoopInputFormat wrappedGoraInput = new HadoopInputFormat(new GoraInputFormat(), this.classKeyIn, this.classValueIn, job);
        return env.createInput((InputFormat)wrappedGoraInput);
    }

    public DataSource<Tuple2<KeyIn, ValueIn>> createDataSource(ExecutionEnvironment env, Configuration conf, DataStore<KeyIn, ValueIn> dataStore) throws IOException {
        Preconditions.checkNotNull(this.classKeyIn);
        Preconditions.checkNotNull(this.classValueIn);
        Job job = Job.getInstance((Configuration)conf);
        GoraInputFormat.setInput(job, dataStore.newQuery(), true);
        HadoopInputFormat wrappedGoraInput = new HadoopInputFormat(new GoraInputFormat(), this.classKeyIn, this.classValueIn, job);
        return env.createInput((InputFormat)wrappedGoraInput);
    }

    public OutputFormat<Tuple2<KeyOut, ValueOut>> createDataSink(Configuration conf, DataStore<KeyOut, ValueOut> dataStore) throws IOException {
        Preconditions.checkNotNull(this.classKeyOut);
        Preconditions.checkNotNull(this.classValueOut);
        Job job = Job.getInstance((Configuration)conf);
        GoraOutputFormat.setOutput(job, dataStore, true);
        HadoopOutputFormat wrappedGoraOutput = new HadoopOutputFormat(new GoraOutputFormat(), job);
        Path tempPath = Files.createTempDirectory("temp", new FileAttribute[0]);
        job.getConfiguration().set("mapred.output.dir", tempPath.toAbsolutePath().toString());
        return wrappedGoraOutput;
    }

    public OutputFormat<Tuple2<KeyOut, ValueOut>> createDataSink(Configuration conf, Class<? extends DataStore<KeyOut, ValueOut>> dataStoreClass) throws IOException {
        Preconditions.checkNotNull(this.classKeyOut);
        Preconditions.checkNotNull(this.classValueOut);
        Job job = Job.getInstance((Configuration)conf);
        DataStore<KeyOut, ValueOut> dataStore = DataStoreFactory.getDataStore(dataStoreClass, this.classKeyOut, this.classValueOut, job.getConfiguration());
        GoraOutputFormat.setOutput(job, dataStore, true);
        HadoopOutputFormat wrappedGoraOutput = new HadoopOutputFormat(new GoraOutputFormat(), job);
        Path tempPath = Files.createTempDirectory("temp", new FileAttribute[0]);
        job.getConfiguration().set("mapred.output.dir", tempPath.toAbsolutePath().toString());
        return wrappedGoraOutput;
    }
}

