/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.datatokenization;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
import org.apache.beam.examples.complete.datatokenization.transforms.DataProtectors;
import org.apache.beam.examples.complete.datatokenization.transforms.JsonToBeamRow;
import org.apache.beam.examples.complete.datatokenization.transforms.SerializableFunctions;
import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationBigQueryIO;
import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationBigTableIO;
import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO;
import org.apache.beam.examples.complete.datatokenization.utils.DurationUtils;
import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters;
import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv;
import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataTokenization {
    private static final Logger LOG = LoggerFactory.getLogger(DataTokenization.class);
    public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER = FailsafeElementCoder.of(NullableCoder.of((Coder)StringUtf8Coder.of()), NullableCoder.of((Coder)StringUtf8Coder.of()));
    private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
    private static final TupleTag<Row> TOKENIZATION_OUT = new TupleTag<Row>(){};
    static final TupleTag<FailsafeElement<Row, Row>> TOKENIZATION_DEADLETTER_OUT = new TupleTag<FailsafeElement<Row, Row>>(){};

    public static void main(String[] args) {
        DataTokenizationOptions options = (DataTokenizationOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(DataTokenizationOptions.class);
        FileSystems.setDefaultPipelineOptions((PipelineOptions)options);
        DataTokenization.run(options);
    }

    public static PipelineResult run(DataTokenizationOptions options) {
        PCollection rows;
        SchemasUtils schema = null;
        try {
            schema = new SchemasUtils(options.getDataSchemaPath(), StandardCharsets.UTF_8);
        }
        catch (IOException e) {
            LOG.error("Failed to retrieve schema for data.", (Throwable)e);
        }
        Preconditions.checkArgument((schema != null ? 1 : 0) != 0, (Object)"Data schema is mandatory.");
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForType(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
        coderRegistry.registerCoderForType(RowCoder.of((Schema)schema.getBeamSchema()).getEncodedTypeDescriptor(), (Coder)RowCoder.of((Schema)schema.getBeamSchema()));
        FailsafeElementCoder coder = FailsafeElementCoder.of(RowCoder.of((Schema)schema.getBeamSchema()), RowCoder.of((Schema)schema.getBeamSchema()));
        coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
        if (options.getInputFilePattern() != null) {
            rows = new TokenizationFileSystemIO(options).read(pipeline, schema);
        } else if (options.getPubsubTopic() != null) {
            rows = (PCollection)((PCollection)pipeline.apply("ReadMessagesFromPubsub", (PTransform)PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))).apply("TransformToBeamRow", (PTransform)new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema));
            if (options.getOutputDirectory() != null) {
                rows = (PCollection)rows.apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)DurationUtils.parseDuration(options.getWindowDuration()))));
            }
        } else {
            throw new IllegalStateException("No source is provided, please configure File System or Pub/Sub");
        }
        PCollectionTuple tokenizedRows = (PCollectionTuple)((PCollection)rows.setRowSchema(schema.getBeamSchema()).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.kvs((TypeDescriptor)TypeDescriptors.integers(), (TypeDescriptor)TypeDescriptors.rows())).via((SerializableFunction & Serializable)row -> KV.of((Object)0, (Object)row)))).setCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)RowCoder.of((Schema)schema.getBeamSchema()))).apply("DsgTokenization", DataProtectors.RowToTokenizedRow.newBuilder().setBatchSize(options.getBatchSize()).setRpcURI(options.getRpcUri()).setSchema(schema.getBeamSchema()).setSuccessTag(TOKENIZATION_OUT).setFailureTag(TOKENIZATION_DEADLETTER_OUT).build());
        String csvDelimiter = options.getCsvDelimiter();
        if (options.getNonTokenizedDeadLetterPath() != null) {
            ((PCollection)tokenizedRows.get(TOKENIZATION_DEADLETTER_OUT).apply("ConvertToCSV", (PTransform)MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()).via((SerializableFunction & Serializable)fse -> FailsafeElement.of(new RowToCsv(csvDelimiter).getCsvFromRow((Row)fse.getOriginalPayload()), new RowToCsv(csvDelimiter).getCsvFromRow((Row)fse.getPayload()))))).apply("WriteTokenizationErrorsToFS", ErrorConverters.WriteErrorsToTextIO.newBuilder().setErrorWritePath(options.getNonTokenizedDeadLetterPath()).setTranslateFunction(SerializableFunctions.getCsvErrorConverter()).build());
        }
        if (options.getOutputDirectory() != null) {
            new TokenizationFileSystemIO(options).write((PCollection<Row>)tokenizedRows.get(TOKENIZATION_OUT), schema.getBeamSchema());
        } else if (options.getBigQueryTableName() != null) {
            WriteResult writeResult = TokenizationBigQueryIO.write((PCollection<Row>)tokenizedRows.get(TOKENIZATION_OUT), options.getBigQueryTableName(), schema.getBigQuerySchema());
            ((PCollection)writeResult.getFailedInsertsWithErr().apply("WrapInsertionErrors", (PTransform)MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()).via(TokenizationBigQueryIO::wrapBigQueryInsertError))).setCoder(FAILSAFE_ELEMENT_CODER).apply("WriteInsertionFailedRecords", (PTransform)ErrorConverters.WriteStringMessageErrors.newBuilder().setErrorRecordsTable(options.getBigQueryTableName() + DEFAULT_DEADLETTER_TABLE_SUFFIX).setErrorRecordsTableSchema("{\n  \"fields\": [\n    {\n      \"name\": \"timestamp\",\n      \"type\": \"TIMESTAMP\",\n      \"mode\": \"REQUIRED\"\n    },\n    {\n      \"name\": \"payloadString\",\n      \"type\": \"STRING\",\n      \"mode\": \"REQUIRED\"\n    },\n    {\n      \"name\": \"payloadBytes\",\n      \"type\": \"BYTES\",\n      \"mode\": \"REQUIRED\"\n    },\n    {\n      \"name\": \"attributes\",\n      \"type\": \"RECORD\",\n      \"mode\": \"REPEATED\",\n      \"fields\": [\n        {\n          \"name\": \"key\",\n          \"type\": \"STRING\",\n          \"mode\": \"NULLABLE\"\n        },\n        {\n          \"name\": \"value\",\n          \"type\": \"STRING\",\n          \"mode\": \"NULLABLE\"\n        }\n      ]\n    },\n    {\n      \"name\": \"errorMessage\",\n      \"type\": \"STRING\",\n      \"mode\": \"NULLABLE\"\n    },\n    {\n      \"name\": \"stacktrace\",\n      \"type\": \"STRING\",\n      \"mode\": \"NULLABLE\"\n    }\n  ]\n}").build());
        } else if (options.getBigTableInstanceId() != null) {
            new TokenizationBigTableIO(options).write((PCollection<Row>)tokenizedRows.get(TOKENIZATION_OUT), schema.getBeamSchema());
        } else {
            throw new IllegalStateException("No sink is provided, please configure BigQuery or BigTable.");
        }
        return pipeline.run();
    }
}

