/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.datatokenization.transforms;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.examples.complete.datatokenization.transforms.AutoValue_DataProtectors_RowToTokenizedRow;
import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.Gson;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.JsonArray;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.JsonObject;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataProtectors {
    private static final Logger LOG = LoggerFactory.getLogger(DataProtectors.class);
    public static final String ID_FIELD_NAME = "ID";
    private static final Long MAX_BUFFERING_DURATION_MS = Long.valueOf(System.getenv().getOrDefault("MAX_BUFFERING_DURATION_MS", "100"));

    public static class TokenizationFn
    extends DoFn<KV<Integer, Iterable<Row>>, Row> {
        private static Schema schemaToRpc;
        private static CloseableHttpClient httpclient;
        private static ObjectMapper objectMapperSerializerForSchema;
        private static ObjectMapper objectMapperDeserializerForSchema;
        private final Schema schema;
        private final String rpcURI;
        private final TupleTag<FailsafeElement<Row, Row>> failureTag;
        private Map<String, Row> inputRowsWithIds;

        public TokenizationFn(Schema schema, String rpcURI, TupleTag<FailsafeElement<Row, Row>> failureTag) {
            this.schema = schema;
            this.rpcURI = rpcURI;
            this.failureTag = failureTag;
            this.inputRowsWithIds = new HashMap<String, Row>();
        }

        @DoFn.Setup
        public void setup() {
            List fields = this.schema.getFields();
            fields.add(Schema.Field.of((String)DataProtectors.ID_FIELD_NAME, (Schema.FieldType)Schema.FieldType.STRING));
            schemaToRpc = new Schema(fields);
            objectMapperSerializerForSchema = RowJsonUtils.newObjectMapperWith((RowJson.RowJsonSerializer)RowJson.RowJsonSerializer.forSchema((Schema)schemaToRpc));
            objectMapperDeserializerForSchema = RowJsonUtils.newObjectMapperWith((RowJson.RowJsonDeserializer)RowJson.RowJsonDeserializer.forSchema((Schema)schemaToRpc));
            httpclient = HttpClients.createDefault();
        }

        @DoFn.Teardown
        public void close() {
            block2: {
                try {
                    httpclient.close();
                }
                catch (IOException exception) {
                    String exceptionMessage = exception.getMessage();
                    if (exceptionMessage == null) break block2;
                    LOG.warn("Can't close connection: {}", (Object)exceptionMessage);
                }
            }
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<Integer, Iterable<Row>> element, DoFn.ProcessContext context) {
            Iterable rows = (Iterable)element.getValue();
            try {
                for (Row outputRow : this.getTokenizedRow(rows)) {
                    context.output((Object)outputRow);
                }
            }
            catch (Exception e) {
                for (Row outputRow : rows) {
                    context.output(this.failureTag, FailsafeElement.of(outputRow, outputRow).setErrorMessage(e.getMessage()).setStacktrace(Throwables.getStackTraceAsString((Throwable)e)));
                }
            }
        }

        private ArrayList<String> rowsToJsons(Iterable<Row> inputRows) {
            ArrayList<String> jsons = new ArrayList<String>();
            HashMap<String, Row> inputRowsWithIds = new HashMap<String, Row>();
            for (Row inputRow : inputRows) {
                Row.Builder builder = Row.withSchema((Schema)schemaToRpc);
                for (Schema.Field field : schemaToRpc.getFields()) {
                    if (!inputRow.getSchema().hasField(field.getName())) continue;
                    builder = builder.addValue(inputRow.getValue(field.getName()));
                }
                String id = UUID.randomUUID().toString();
                builder = builder.addValue((Object)id);
                inputRowsWithIds.put(id, inputRow);
                Row row = builder.build();
                jsons.add(RowJsonUtils.rowToJson((ObjectMapper)objectMapperSerializerForSchema, (Row)row));
            }
            this.inputRowsWithIds = inputRowsWithIds;
            return jsons;
        }

        private String formatJsonsToRpcBatch(Iterable<String> jsons) {
            StringBuilder stringBuilder = new StringBuilder(String.join((CharSequence)",", jsons));
            stringBuilder.append("]").insert(0, "{\"data\": [").append("}");
            return stringBuilder.toString();
        }

        private ArrayList<Row> getTokenizedRow(Iterable<Row> inputRows) throws IOException {
            ArrayList<Row> outputRows = new ArrayList<Row>();
            CloseableHttpResponse response = this.sendRpc(this.formatJsonsToRpcBatch(this.rowsToJsons(inputRows)).getBytes(Charset.defaultCharset()));
            if (response.getStatusLine().getStatusCode() != 200) {
                LOG.error("Send to RPC '{}' failed with '{}'", (Object)this.rpcURI, (Object)response.getStatusLine());
            }
            String tokenizedData = IOUtils.toString((InputStream)response.getEntity().getContent(), (Charset)StandardCharsets.UTF_8);
            Gson gson = new Gson();
            JsonArray jsonTokenizedRows = ((JsonObject)gson.fromJson(tokenizedData, JsonObject.class)).getAsJsonArray("data");
            for (int i = 0; i < jsonTokenizedRows.size(); ++i) {
                Row tokenizedRow = RowJsonUtils.jsonToRow((ObjectMapper)objectMapperDeserializerForSchema, (String)jsonTokenizedRows.get(i).toString());
                Row.FieldValueBuilder rowBuilder = Row.fromRow((Row)this.inputRowsWithIds.get(tokenizedRow.getString(DataProtectors.ID_FIELD_NAME)));
                for (Schema.Field field : schemaToRpc.getFields()) {
                    if (field.getName().equals(DataProtectors.ID_FIELD_NAME)) continue;
                    rowBuilder = rowBuilder.withFieldValue(field.getName(), tokenizedRow.getValue(field.getName()));
                }
                outputRows.add(rowBuilder.build());
            }
            return outputRows;
        }

        private CloseableHttpResponse sendRpc(byte[] data) throws IOException {
            HttpPost httpPost = new HttpPost(this.rpcURI);
            ByteArrayEntity stringEntity = new ByteArrayEntity(data, ContentType.APPLICATION_JSON);
            httpPost.setEntity((HttpEntity)stringEntity);
            return httpclient.execute((HttpUriRequest)httpPost);
        }
    }

    @AutoValue
    public static abstract class RowToTokenizedRow<T>
    extends PTransform<PCollection<KV<Integer, Row>>, PCollectionTuple> {
        public static <T> Builder<T> newBuilder() {
            return new AutoValue_DataProtectors_RowToTokenizedRow.Builder();
        }

        public abstract TupleTag<Row> successTag();

        public abstract TupleTag<FailsafeElement<Row, Row>> failureTag();

        public abstract Schema schema();

        public abstract int batchSize();

        public abstract String rpcURI();

        public PCollectionTuple expand(PCollection<KV<Integer, Row>> inputRows) {
            FailsafeElementCoder coder = FailsafeElementCoder.of(RowCoder.of((Schema)this.schema()), RowCoder.of((Schema)this.schema()));
            Duration maxBuffering = Duration.millis((long)MAX_BUFFERING_DURATION_MS);
            PCollectionTuple pCollectionTuple = (PCollectionTuple)((PCollection)inputRows.apply("GroupRowsIntoBatches", (PTransform)GroupIntoBatches.ofSize((long)this.batchSize()).withMaxBufferingDuration(maxBuffering))).apply("Tokenize", (PTransform)ParDo.of((DoFn)new TokenizationFn(this.schema(), this.rpcURI(), this.failureTag())).withOutputTags(this.successTag(), TupleTagList.of(this.failureTag())));
            return PCollectionTuple.of(this.successTag(), (PCollection)pCollectionTuple.get(this.successTag()).setRowSchema(this.schema())).and(this.failureTag(), pCollectionTuple.get(this.failureTag()).setCoder(coder));
        }

        @AutoValue.Builder
        public static abstract class Builder<T> {
            public abstract Builder<T> setSuccessTag(TupleTag<Row> var1);

            public abstract Builder<T> setFailureTag(TupleTag<FailsafeElement<Row, Row>> var1);

            public abstract Builder<T> setSchema(Schema var1);

            public abstract Builder<T> setBatchSize(int var1);

            public abstract Builder<T> setRpcURI(String var1);

            public abstract RowToTokenizedRow<T> build();
        }
    }
}

