/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.WithKeys;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.TypedPValue;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class Write {
    private static final Logger LOG = LoggerFactory.getLogger(Write.class);

    public static <T> Bound<T> to(Sink<T> sink) {
        Preconditions.checkNotNull(sink, "sink");
        return new Bound(sink, 0);
    }

    public static class Bound<T>
    extends PTransform<PCollection<T>, PDone> {
        private final Sink<T> sink;
        private int numShards;

        private Bound(Sink<T> sink, int numShards) {
            this.sink = sink;
            this.numShards = numShards;
        }

        @Override
        public PDone apply(PCollection<T> input) {
            Preconditions.checkArgument(PCollection.IsBounded.BOUNDED == input.isBounded(), "%s can only be applied to a Bounded PCollection", Write.class.getSimpleName());
            PipelineOptions options = input.getPipeline().getOptions();
            this.sink.validate(options);
            return this.createWrite(input, this.sink.createWriteOperation(options));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("sink", this.sink.getClass()).withLabel("Write Sink")).include(this.sink).addIfNotDefault(DisplayData.item("numShards", this.getNumShards()).withLabel("Fixed Number of Shards"), 0);
        }

        public int getNumShards() {
            return this.numShards;
        }

        public Sink<T> getSink() {
            return this.sink;
        }

        public Bound<T> withNumShards(int numShards) {
            return new Bound<T>(this.sink, Math.max(numShards, 0));
        }

        private <WriteT> PDone createWrite(PCollection<T> input, Sink.WriteOperation<T, WriteT> writeOperation) {
            Pipeline p = input.getPipeline();
            SerializableCoder<?> operationCoder = SerializableCoder.of(writeOperation.getClass());
            TypedPValue operationCollection = (PCollection)p.apply(Create.of(writeOperation).withCoder(operationCoder));
            operationCollection = ((PCollection)((PCollection)operationCollection).apply("Initialize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Sink.WriteOperation<T, WriteT>>(){

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    Sink.WriteOperation writeOperation = (Sink.WriteOperation)c.element();
                    LOG.info("Initializing write operation {}", (Object)writeOperation);
                    writeOperation.initialize(c.getPipelineOptions());
                    LOG.debug("Done initializing write operation {}", (Object)writeOperation);
                    c.output(writeOperation);
                }
            }))).setCoder((Coder)operationCoder);
            PCollectionView writeOperationView = (PCollectionView)((PCollection)operationCollection).apply(View.asSingleton());
            PCollection inputInGlobalWindow = (PCollection)input.apply(Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes());
            PCollection results = this.getNumShards() <= 0 ? (PCollection)inputInGlobalWindow.apply("WriteBundles", ParDo.of(new WriteBundles(writeOperationView)).withSideInputs(writeOperationView)) : (PCollection)((PCollection)((PCollection)inputInGlobalWindow.apply("ApplyShardLabel", WithKeys.of(new ApplyShardingKey(this.getNumShards())))).apply("GroupIntoShards", GroupByKey.create())).apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles(writeOperationView)).withSideInputs(writeOperationView));
            results.setCoder((Coder)writeOperation.getWriterResultCoder());
            final PCollectionView resultsView = (PCollectionView)results.apply(View.asIterable());
            PCollection done = (PCollection)((PCollection)operationCollection).apply("Finalize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Integer>(){

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    Sink.WriteOperation writeOperation = (Sink.WriteOperation)c.element();
                    LOG.info("Finalizing write operation {}.", (Object)writeOperation);
                    ArrayList results = Lists.newArrayList((Iterable)c.sideInput(resultsView));
                    LOG.debug("Side input initialized to finalize write operation {}.", (Object)writeOperation);
                    int minShardsNeeded = Math.max(1, Bound.this.getNumShards());
                    int extraShardsNeeded = minShardsNeeded - results.size();
                    if (extraShardsNeeded > 0) {
                        LOG.info("Creating {} empty output shards in addition to {} written for a total of {}.", new Object[]{extraShardsNeeded, results.size(), minShardsNeeded});
                        for (int i = 0; i < extraShardsNeeded; ++i) {
                            Sink.Writer writer = writeOperation.createWriter(c.getPipelineOptions());
                            writer.open(UUID.randomUUID().toString());
                            Object emptyWrite = writer.close();
                            results.add(emptyWrite);
                        }
                        LOG.debug("Done creating extra shards.");
                    }
                    writeOperation.finalize(results, c.getPipelineOptions());
                    LOG.debug("Done finalizing write operation {}", (Object)writeOperation);
                }
            }).withSideInputs(resultsView));
            return PDone.in(input.getPipeline());
        }

        private static class ApplyShardingKey<T>
        implements SerializableFunction<T, Integer> {
            private final int numShards;
            private int shardNumber;

            ApplyShardingKey(int numShards) {
                this.numShards = numShards;
                this.shardNumber = -1;
            }

            @Override
            public Integer apply(T input) {
                this.shardNumber = this.shardNumber == -1 ? ThreadLocalRandom.current().nextInt(this.numShards) : (this.shardNumber + 1) % this.numShards;
                return this.shardNumber;
            }
        }

        private class WriteShardedBundles<WriteT>
        extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
            private final PCollectionView<Sink.WriteOperation<T, WriteT>> writeOperationView;

            WriteShardedBundles(PCollectionView<Sink.WriteOperation<T, WriteT>> writeOperationView) {
                this.writeOperationView = writeOperationView;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Sink.WriteOperation writeOperation = c.sideInput(this.writeOperationView);
                LOG.info("Opening writer for write operation {}", writeOperation);
                Sink.Writer writer = writeOperation.createWriter(c.getPipelineOptions());
                writer.open(UUID.randomUUID().toString());
                LOG.debug("Done opening writer {} for operation {}", writer, this.writeOperationView);
                try {
                    for (Object t : (Iterable)((KV)c.element()).getValue()) {
                        writer.write(t);
                    }
                }
                catch (Exception e) {
                    try {
                        writer.close();
                    }
                    catch (Exception closeException) {
                        if (closeException instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        e.addSuppressed(closeException);
                    }
                    throw e;
                }
                WriteT result = writer.close();
                c.output(result);
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                Bound.this.populateDisplayData(builder);
            }
        }

        private class WriteBundles<WriteT>
        extends DoFn<T, WriteT> {
            private Sink.Writer<T, WriteT> writer = null;
            private final PCollectionView<Sink.WriteOperation<T, WriteT>> writeOperationView;

            WriteBundles(PCollectionView<Sink.WriteOperation<T, WriteT>> writeOperationView) {
                this.writeOperationView = writeOperationView;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                if (this.writer == null) {
                    Sink.WriteOperation writeOperation = c.sideInput(this.writeOperationView);
                    LOG.info("Opening writer for write operation {}", writeOperation);
                    this.writer = writeOperation.createWriter(c.getPipelineOptions());
                    this.writer.open(UUID.randomUUID().toString());
                    LOG.debug("Done opening writer {} for operation {}", this.writer, this.writeOperationView);
                }
                try {
                    this.writer.write(c.element());
                }
                catch (Exception e) {
                    try {
                        this.writer.close();
                    }
                    catch (Exception closeException) {
                        if (closeException instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        e.addSuppressed(closeException);
                    }
                    throw e;
                }
            }

            @Override
            public void finishBundle(DoFn.Context c) throws Exception {
                if (this.writer != null) {
                    WriteT result = this.writer.close();
                    c.output(result);
                    this.writer = null;
                }
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                Bound.this.populateDisplayData(builder);
            }
        }
    }
}

