/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.FileBasedSink;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSink;
import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.io.Write;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.options.StreamingOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Joiner;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Optional;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Strings;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Utf8;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ForwardingMap;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.HashMultimap;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableMap;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.runners.AggregatorPipelineExtractor;
import com.google.cloud.dataflow.sdk.runners.DataflowJobAlreadyExistsException;
import com.google.cloud.dataflow.sdk.runners.DataflowJobAlreadyUpdatedException;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunnerHooks;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
import com.google.cloud.dataflow.sdk.runners.dataflow.AssignWindows;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowUnboundedReadFromBoundedSource;
import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.WithKeys;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.util.PCollectionViews;
import com.google.cloud.dataflow.sdk.util.PathValidator;
import com.google.cloud.dataflow.sdk.util.Reshuffle;
import com.google.cloud.dataflow.sdk.util.StringUtils;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowPipelineRunner
extends PipelineRunner<DataflowPipelineJob> {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineRunner.class);
    private final DataflowPipelineOptions options;
    private final Dataflow dataflowClient;
    private final DataflowPipelineTranslator translator;
    private final Map<Class<?>, Class<?>> overrides;
    private DataflowPipelineRunnerHooks hooks;
    private static final String ENVIRONMENT_MAJOR_VERSION = "5";
    public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE = "dataflow.gcr.io/v1beta3/java-batch:1.8.0";
    public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE = "dataflow.gcr.io/v1beta3/java-streaming:1.8.0";
    private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 0xA00000;
    @VisibleForTesting
    static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 0x100000;
    private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
    public static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]+[a-z0-9]";
    private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
    Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;

    public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
        IOChannelUtils.registerStandardIOFactories(options);
        DataflowPipelineOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options);
        ArrayList<String> missing = new ArrayList<String>();
        if (dataflowOptions.getAppName() == null) {
            missing.add("appName");
        }
        if (missing.size() > 0) {
            throw new IllegalArgumentException("Missing required values: " + Joiner.on(',').join(missing));
        }
        PathValidator validator = dataflowOptions.getPathValidator();
        Preconditions.checkArgument(!Strings.isNullOrEmpty(dataflowOptions.getTempLocation()) || !Strings.isNullOrEmpty(dataflowOptions.getStagingLocation()), "Missing required value: at least one of tempLocation or stagingLocation must be set.");
        if (dataflowOptions.getStagingLocation() != null) {
            validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
        }
        if (dataflowOptions.getTempLocation() != null) {
            validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation());
        }
        if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) {
            dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation());
        } else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) {
            try {
                dataflowOptions.setStagingLocation(IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging"));
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation from PipelineOptions.tempLocation. Please set the staging location explicitly.", e);
            }
        }
        if (dataflowOptions.getFilesToStage() == null) {
            dataflowOptions.setFilesToStage(DataflowPipelineRunner.detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", (Object)dataflowOptions.getFilesToStage().size());
            LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
        }
        String jobName = dataflowOptions.getJobName().toLowerCase();
        Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; the name must consist of only the characters [-a-z0-9], starting with a letter and ending with a letter or number");
        if (!jobName.equals(dataflowOptions.getJobName())) {
            LOG.info("PipelineOptions.jobName did not match the service requirements. Using {} instead of {}.", (Object)jobName, (Object)dataflowOptions.getJobName());
        }
        dataflowOptions.setJobName(jobName);
        String project = dataflowOptions.getProject();
        if (project.matches("[0-9]*")) {
            throw new IllegalArgumentException("Project ID '" + project + "' invalid. Please make sure you specified the Project ID, not project number.");
        }
        if (!project.matches(PROJECT_ID_REGEXP)) {
            throw new IllegalArgumentException("Project ID '" + project + "' invalid. Please make sure you specified the Project ID, not project description.");
        }
        DataflowPipelineDebugOptions debugOptions = dataflowOptions.as(DataflowPipelineDebugOptions.class);
        if (debugOptions.getNumberOfWorkerHarnessThreads() < 0) {
            throw new IllegalArgumentException("Number of worker harness threads '" + debugOptions.getNumberOfWorkerHarnessThreads() + "' invalid. Please make sure the value is non-negative.");
        }
        if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
            dataflowOptions.setGcsUploadBufferSizeBytes(0x100000);
        }
        return new DataflowPipelineRunner(dataflowOptions);
    }

    @VisibleForTesting
    protected DataflowPipelineRunner(DataflowPipelineOptions options) {
        this.options = options;
        this.dataflowClient = options.getDataflowClient();
        this.translator = DataflowPipelineTranslator.fromOptions(options);
        this.pcollectionsRequiringIndexedFormat = new HashSet();
        this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet();
        ImmutableMap.Builder<Class, Class> builder = ImmutableMap.builder();
        if (options.isStreaming()) {
            builder.put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class);
            builder.put(Create.Values.class, StreamingCreate.class);
            builder.put(View.AsMap.class, StreamingViewAsMap.class);
            builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
            builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
            builder.put(View.AsList.class, StreamingViewAsList.class);
            builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
            builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
            builder.put(Read.Bounded.class, StreamingBoundedRead.class);
            builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
            builder.put(Window.Bound.class, AssignWindows.class);
            builder.put(PubsubIO.Read.Bound.PubsubReader.class, UnsupportedIO.class);
            builder.put(PubsubIO.Write.Bound.PubsubWriter.class, UnsupportedIO.class);
            if (options.getExperiments() == null || !options.getExperiments().contains("enable_custom_pubsub_sink")) {
                builder.put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class);
            }
        } else {
            builder.put(Read.Unbounded.class, UnsupportedIO.class);
            builder.put(Window.Bound.class, AssignWindows.class);
            builder.put(Write.Bound.class, BatchWrite.class);
            builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
            builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
            if (options.getExperiments() == null || !options.getExperiments().contains("disable_ism_side_input")) {
                builder.put(View.AsMap.class, BatchViewAsMap.class);
                builder.put(View.AsMultimap.class, BatchViewAsMultimap.class);
                builder.put(View.AsSingleton.class, BatchViewAsSingleton.class);
                builder.put(View.AsList.class, BatchViewAsList.class);
                builder.put(View.AsIterable.class, BatchViewAsIterable.class);
            }
            if (options.getExperiments() == null || !options.getExperiments().contains("enable_custom_bigquery_source")) {
                builder.put(BigQueryIO.Read.Bound.class, BatchBigQueryIONativeRead.class);
            }
            if (options.getExperiments() == null || !options.getExperiments().contains("enable_custom_bigquery_sink")) {
                builder.put(BigQueryIO.Write.Bound.class, BatchBigQueryIOWrite.class);
            }
        }
        this.overrides = builder.build();
    }

    @Override
    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
        if (Combine.GroupedValues.class.equals(transform.getClass()) || GroupByKey.class.equals(transform.getClass())) {
            PCollection pc = (PCollection)input;
            PCollection outputT = PCollection.createPrimitiveOutputInternal(pc.getPipeline(), transform instanceof GroupByKey ? ((GroupByKey)transform).updateWindowingStrategy(pc.getWindowingStrategy()) : pc.getWindowingStrategy(), pc.isBounded());
            return (OutputT)outputT;
        }
        if (PubsubIO.Read.Bound.class.equals(transform.getClass()) && this.options.isStreaming() && (this.options.getExperiments() == null || !this.options.getExperiments().contains("enable_custom_pubsub_source"))) {
            PCollection pubsub = this.applyPubsubStreamingRead((PubsubIO.Read.Bound)transform, input);
            return (OutputT)pubsub;
        }
        if (Window.Bound.class.equals(transform.getClass())) {
            PCollection windowed = this.applyWindow((Window.Bound)transform, (PCollection)input);
            return (OutputT)windowed;
        }
        if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) && ((PCollectionList)input).size() == 0) {
            return Pipeline.applyTransform(input, Create.of(new Object[0]));
        }
        if (this.overrides.containsKey(transform.getClass())) {
            Class<?> transformClass = transform.getClass();
            Class<?> customTransformClass = this.overrides.get(transform.getClass());
            PTransform customTransform = (PTransform)InstanceBuilder.ofType(customTransformClass).withArg(DataflowPipelineRunner.class, this).withArg(transformClass, transform).build();
            return Pipeline.applyTransform(input, customTransform);
        }
        return super.apply(transform, input);
    }

    private <T> PCollection<T> applyPubsubStreamingRead(PubsubIO.Read.Bound<?> initialTransform, PInput input) {
        PubsubIO.Read.Bound<?> transform = initialTransform;
        return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED).setCoder((Coder)transform.getCoder());
    }

    private <T> PCollection<T> applyWindow(Window.Bound<?> intitialTransform, PCollection<?> initialInput) {
        Window.Bound<?> transform = intitialTransform;
        PCollection<?> input = initialInput;
        return (PCollection)super.apply(new AssignWindows(transform), input);
    }

    private String debuggerMessage(String projectId, String uniquifier) {
        return String.format("To debug your job, visit Google Cloud Debugger at: https://console.developers.google.com/debug?project=%s&dbgee=%s", projectId, uniquifier);
    }

    private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
        if (!options.getEnableCloudDebugger()) {
            return;
        }
        if (options.getDebuggee() != null) {
            throw new RuntimeException("Should not specify the debuggee");
        }
        Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
        Debuggee debuggee = this.registerDebuggee(debuggerClient, uniquifier);
        options.setDebuggee(debuggee);
        System.out.println(this.debuggerMessage(options.getProject(), debuggee.getUniquifier()));
    }

    private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
        RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
        registerReq.setDebuggee(new Debuggee().setProject(this.options.getProject()).setUniquifier(uniquifier).setDescription(uniquifier).setAgentVersion("google.com/cloud-dataflow-java/v1"));
        try {
            RegisterDebuggeeResponse registerResponse = (RegisterDebuggeeResponse)debuggerClient.controller().debuggees().register(registerReq).execute();
            Debuggee debuggee = registerResponse.getDebuggee();
            if (debuggee.getStatus() != null && debuggee.getStatus().getIsError().booleanValue()) {
                throw new RuntimeException("Unable to register with the debugger: " + debuggee.getStatus().getDescription().getFormat());
            }
            return debuggee;
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to register with the debugger: ", e);
        }
    }

    @Override
    public DataflowPipelineJob run(Pipeline pipeline) {
        Job jobResult;
        this.logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
        LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.");
        List<DataflowPackage> packages = this.options.getStager().stageFiles();
        int randomNum = new Random().nextInt(9000) + 1000;
        String requestId = DateTimeFormat.forPattern((String)"YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC).print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;
        DataflowPipelineOptions dataflowOptions = this.options.as(DataflowPipelineOptions.class);
        this.maybeRegisterDebuggee(dataflowOptions, requestId);
        DataflowPipelineTranslator.JobSpecification jobSpecification = this.translator.translate(pipeline, this, packages);
        Job newJob = jobSpecification.getJob();
        newJob.setClientRequestId(requestId);
        String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
        System.out.println("Dataflow SDK version: " + version);
        newJob.getEnvironment().setUserAgent((Map)((Object)DataflowReleaseInfo.getReleaseInfo()));
        if (!Strings.isNullOrEmpty(this.options.getTempLocation())) {
            newJob.getEnvironment().setTempStoragePrefix(dataflowOptions.getPathValidator().verifyPath(this.options.getTempLocation()));
        }
        newJob.getEnvironment().setDataset(this.options.getTempDatasetId());
        newJob.getEnvironment().setExperiments(this.options.getExperiments());
        String workerHarnessContainerImage = this.options.as(DataflowPipelineWorkerPoolOptions.class).getWorkerHarnessContainerImage();
        for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
            workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
        }
        HashMap<String, String> environmentVersion = new HashMap<String, String>();
        environmentVersion.put("major", ENVIRONMENT_MAJOR_VERSION);
        newJob.getEnvironment().setVersion(environmentVersion);
        String jobType = "JAVA_BATCH_AUTOSCALING";
        if (this.options.isStreaming()) {
            jobType = "STREAMING";
        }
        environmentVersion.put("job_type", jobType);
        if (this.hooks != null) {
            this.hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
        }
        if (!Strings.isNullOrEmpty(this.options.getDataflowJobFile())) {
            this.runJobFileHooks(newJob);
        }
        if (this.hooks != null && !this.hooks.shouldActuallyRunJob()) {
            return null;
        }
        String jobIdToUpdate = null;
        if (this.options.getUpdate()) {
            jobIdToUpdate = this.getJobIdFromName(this.options.getJobName());
            newJob.setTransformNameMapping(this.options.getTransformNameMapping());
            newJob.setReplaceJobId(jobIdToUpdate);
        }
        try {
            jobResult = (Job)this.dataflowClient.projects().jobs().create(this.options.getProject(), newJob).execute();
        }
        catch (GoogleJsonResponseException e) {
            String errorMessages = "Unexpected errors";
            if (e.getDetails() != null) {
                errorMessages = Utf8.encodedLength(newJob.toString()) >= 0xA00000 ? "The size of the serialized JSON representation of the pipeline exceeds the allowable limit. For more information, please check the FAQ link below:\nhttps://cloud.google.com/dataflow/faq" : e.getDetails().getMessage();
            }
            throw new RuntimeException("Failed to create a workflow job: " + errorMessages, e);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create a workflow job", e);
        }
        AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline);
        Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = aggregatorExtractor.getAggregatorSteps();
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames());
        DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(this.options.getProject(), jobResult.getId(), Transport.newRawDataflowClient(this.options).build(), aggregatorTransforms);
        if (jobResult.getClientRequestId() != null && !jobResult.getClientRequestId().isEmpty() && !jobResult.getClientRequestId().equals(requestId)) {
            if (this.options.getUpdate()) {
                throw new DataflowJobAlreadyUpdatedException(dataflowPipelineJob, String.format("The job named %s with id: %s has already been updated into job id: %s and cannot be updated again.", newJob.getName(), jobIdToUpdate, jobResult.getId()));
            }
            throw new DataflowJobAlreadyExistsException(dataflowPipelineJob, String.format("There is already an active job named %s with id: %s. If you want to submit a second job, try again by setting a different name using --jobName.", newJob.getName(), jobResult.getId()));
        }
        LOG.info("To access the Dataflow monitoring console, please navigate to {}", (Object)MonitoringUtil.getJobMonitoringPageURL(this.options.getProject(), jobResult.getId()));
        System.out.println("Submitted job: " + jobResult.getId());
        LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}", (Object)MonitoringUtil.getGcloudCancelCommand(this.options, jobResult.getId()));
        return dataflowPipelineJob;
    }

    public DataflowPipelineTranslator getTranslator() {
        return this.translator;
    }

    @Experimental
    public void setHooks(DataflowPipelineRunnerHooks hooks) {
        this.hooks = hooks;
    }

    private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
        if (!this.ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
            final TreeSet ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet();
            pipeline.traverseTopologically(new Pipeline.PipelineVisitor(){

                @Override
                public void visitValue(PValue value, TransformTreeNode producer) {
                }

                @Override
                public void visitTransform(TransformTreeNode node) {
                    if (DataflowPipelineRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                        ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                    }
                }

                @Override
                public void enterCompositeTransform(TransformTreeNode node) {
                    if (DataflowPipelineRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                        ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                    }
                }

                @Override
                public void leaveCompositeTransform(TransformTreeNode node) {
                }
            });
            LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} because the key coder is not deterministic. Falling back to singleton implementation which may cause memory and/or performance problems. Future major versions of Dataflow will require deterministic key coders.", ptransformViewNamesWithNonDeterministicKeyCoders);
        }
    }

    private void runJobFileHooks(Job newJob) {
        try {
            WritableByteChannel writer = IOChannelUtils.create(this.options.getDataflowJobFile(), "text/plain");
            PrintWriter printWriter = new PrintWriter(Channels.newOutputStream(writer));
            String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
            printWriter.print(workSpecJson);
            printWriter.flush();
            printWriter.close();
            LOG.info("Printed job specification to {}", (Object)this.options.getDataflowJobFile());
        }
        catch (IllegalStateException ex) {
            String error = "Cannot translate workflow spec to JSON.";
            if (this.hooks != null && this.hooks.failOnJobFileWriteFailure()) {
                throw new RuntimeException(error, ex);
            }
            LOG.warn(error, (Throwable)ex);
        }
        catch (IOException ex) {
            String error = String.format("Cannot create output file at {}", this.options.getDataflowJobFile());
            if (this.hooks != null && this.hooks.failOnJobFileWriteFailure()) {
                throw new RuntimeException(error, ex);
            }
            LOG.warn(error, (Throwable)ex);
        }
    }

    boolean doesPCollectionRequireIndexedFormat(PCollection<?> pcol) {
        return this.pcollectionsRequiringIndexedFormat.contains(pcol);
    }

    private void addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
        this.pcollectionsRequiringIndexedFormat.add(pcol);
    }

    private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
        this.ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
    }

    public String toString() {
        return "DataflowPipelineRunner#" + this.options.getJobName();
    }

    protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
        if (!(classLoader instanceof URLClassLoader)) {
            String message = String.format("Unable to use ClassLoader to detect classpath elements. Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
            LOG.error(message);
            throw new IllegalArgumentException(message);
        }
        ArrayList<String> files = new ArrayList<String>();
        for (URL url : ((URLClassLoader)classLoader).getURLs()) {
            try {
                files.add(new File(url.toURI()).getAbsolutePath());
            }
            catch (IllegalArgumentException | URISyntaxException e) {
                String message = String.format("Unable to convert url (%s) to file.", url);
                LOG.error(message);
                throw new IllegalArgumentException(message, e);
            }
        }
        return files;
    }

    private String getJobIdFromName(String jobName) {
        try {
            String token = null;
            do {
                ListJobsResponse listResult = (ListJobsResponse)this.dataflowClient.projects().jobs().list(this.options.getProject()).setPageToken(token).execute();
                token = listResult.getNextPageToken();
                for (Job job : listResult.getJobs()) {
                    if (!job.getName().equals(jobName) || !MonitoringUtil.toState(job.getCurrentState()).equals((Object)PipelineResult.State.RUNNING)) continue;
                    return job.getId();
                }
            } while (token != null);
        }
        catch (GoogleJsonResponseException e) {
            throw new RuntimeException("Got error while looking up jobs: " + (e.getDetails() != null ? e.getDetails().getMessage() : e), e);
        }
        catch (IOException e) {
            throw new RuntimeException("Got error while looking up jobs: ", e);
        }
        throw new IllegalArgumentException("Could not find running job named " + jobName);
    }

    static {
        DataflowPipelineTranslator.registerTransformTranslator(PubsubIO.Read.Bound.class, new StreamingPubsubIOReadTranslator());
    }

    private static class UnsupportedIO<InputT extends PInput, OutputT extends POutput>
    extends PTransform<InputT, OutputT> {
        @Nullable
        private PTransform<?, ?> transform;
        @Nullable
        private DoFn<?, ?> doFn;

        public UnsupportedIO(DataflowPipelineRunner runner, AvroIO.Read.Bound<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, BigQueryIO.Read.Bound transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, TextIO.Read.Bound<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, Read.Bounded<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, Read.Unbounded<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, AvroIO.Write.Bound<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, TextIO.Write.Bound<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, PubsubIO.Read.Bound.PubsubReader doFn) {
            this.doFn = doFn;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, PubsubIO.Write.Bound.PubsubWriter doFn) {
            this.doFn = doFn;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSource<?> transform) {
            this.transform = transform;
        }

        public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSink<?> transform) {
            this.transform = transform;
        }

        @Override
        public OutputT apply(InputT input) {
            String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming() ? "streaming" : "batch";
            String name = this.transform == null ? StringUtils.approximateSimpleName(this.doFn.getClass()) : StringUtils.approximatePTransformName(this.transform.getClass());
            throw new UnsupportedOperationException(String.format("The DataflowPipelineRunner in %s mode does not support %s.", mode, name));
        }
    }

    private static class Concatenate<T>
    extends Combine.CombineFn<T, List<T>, List<T>> {
        private Concatenate() {
        }

        @Override
        public List<T> createAccumulator() {
            return new ArrayList();
        }

        @Override
        public List<T> addInput(List<T> accumulator, T input) {
            accumulator.add(input);
            return accumulator;
        }

        @Override
        public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
            Object result = this.createAccumulator();
            for (List<T> accumulator : accumulators) {
                result.addAll(accumulator);
            }
            return result;
        }

        @Override
        public List<T> extractOutput(List<T> accumulator) {
            return accumulator;
        }

        @Override
        public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
            return ListCoder.of(inputCoder);
        }

        @Override
        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
            return ListCoder.of(inputCoder);
        }
    }

    private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
    extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
        Combine.GloballyAsSingletonView<InputT, OutputT> transform;

        public StreamingCombineGloballyAsSingletonView(DataflowPipelineRunner runner, Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
            this.transform = transform;
        }

        @Override
        public PCollectionView<OutputT> apply(PCollection<InputT> input) {
            PCollection combined = (PCollection)input.apply(Combine.globally(this.transform.getCombineFn()).withoutDefaults().withFanout(this.transform.getFanout()));
            PCollectionView<Object> view = PCollectionViews.singletonView(combined.getPipeline(), combined.getWindowingStrategy(), this.transform.getInsertDefault(), this.transform.getInsertDefault() ? (Object)this.transform.getCombineFn().defaultValue() : null, combined.getCoder());
            return ((PCollection)((PCollection)combined.apply(ParDo.of(new WrapAsList()))).apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, combined.getCoder())))).apply(View.CreatePCollectionView.of(view));
        }

        @Override
        protected String getKindString() {
            return "StreamingCombineGloballyAsSingletonView";
        }
    }

    private static class StreamingViewAsSingleton<T>
    extends PTransform<PCollection<T>, PCollectionView<T>> {
        private View.AsSingleton<T> transform;

        public StreamingViewAsSingleton(DataflowPipelineRunner runner, View.AsSingleton<T> transform) {
            this.transform = transform;
        }

        @Override
        public PCollectionView<T> apply(PCollection<T> input) {
            Combine.Globally combine = Combine.globally(new SingletonCombine<T>(this.transform.hasDefaultValue(), this.transform.defaultValue()));
            if (!this.transform.hasDefaultValue()) {
                combine = combine.withoutDefaults();
            }
            return (PCollectionView)input.apply(combine.asSingletonView());
        }

        @Override
        protected String getKindString() {
            return "StreamingViewAsSingleton";
        }

        private static class SingletonCombine<T>
        extends Combine.BinaryCombineFn<T> {
            private boolean hasDefaultValue;
            private T defaultValue;

            SingletonCombine(boolean hasDefaultValue, T defaultValue) {
                this.hasDefaultValue = hasDefaultValue;
                this.defaultValue = defaultValue;
            }

            @Override
            public T apply(T left, T right) {
                throw new IllegalArgumentException("PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value");
            }

            @Override
            public T identity() {
                if (this.hasDefaultValue) {
                    return this.defaultValue;
                }
                throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. Consider setting withDefault to provide a default value");
            }
        }
    }

    private static class WrapAsList<T>
    extends DoFn<T, List<T>> {
        private WrapAsList() {
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            c.output(Arrays.asList(c.element()));
        }
    }

    private static class StreamingViewAsIterable<T>
    extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
        public StreamingViewAsIterable(DataflowPipelineRunner runner, View.AsIterable<T> transform) {
        }

        @Override
        public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
            PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
            return (PCollectionView)((Object)((PCollection)((PCollection)input.apply(Combine.globally(new Concatenate()).withoutDefaults())).apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))).apply(View.CreatePCollectionView.of(view)));
        }

        @Override
        protected String getKindString() {
            return "StreamingViewAsIterable";
        }
    }

    private static class StreamingViewAsList<T>
    extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
        public StreamingViewAsList(DataflowPipelineRunner runner, View.AsList<T> transform) {
        }

        @Override
        public PCollectionView<List<T>> apply(PCollection<T> input) {
            PCollectionView<List<T>> view = PCollectionViews.listView(input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
            return (PCollectionView)((Object)((PCollection)((PCollection)input.apply(Combine.globally(new Concatenate()).withoutDefaults())).apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))).apply(View.CreatePCollectionView.of(view)));
        }

        @Override
        protected String getKindString() {
            return "StreamingViewAsList";
        }
    }

    private static class StreamingViewAsMultimap<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
        private final DataflowPipelineRunner runner;

        public StreamingViewAsMultimap(DataflowPipelineRunner runner, View.AsMultimap<K, V> transform) {
            this.runner = runner;
        }

        @Override
        public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
            PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
            KvCoder inputCoder = (KvCoder)input.getCoder();
            try {
                inputCoder.getKeyCoder().verifyDeterministic();
            }
            catch (Coder.NonDeterministicException e) {
                this.runner.recordViewUsesNonDeterministicKeyCoder(this);
            }
            return (PCollectionView)((Object)((PCollection)((PCollection)input.apply(Combine.globally(new Concatenate()).withoutDefaults())).apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))).apply(View.CreatePCollectionView.of(view)));
        }

        @Override
        protected String getKindString() {
            return "StreamingViewAsMultimap";
        }
    }

    private static class StreamingViewAsMap<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
        private final DataflowPipelineRunner runner;

        public StreamingViewAsMap(DataflowPipelineRunner runner, View.AsMap<K, V> transform) {
            this.runner = runner;
        }

        @Override
        public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
            PCollectionView<Map<K, V>> view = PCollectionViews.mapView(input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
            KvCoder inputCoder = (KvCoder)input.getCoder();
            try {
                inputCoder.getKeyCoder().verifyDeterministic();
            }
            catch (Coder.NonDeterministicException e) {
                this.runner.recordViewUsesNonDeterministicKeyCoder(this);
            }
            return (PCollectionView)((Object)((PCollection)((PCollection)input.apply(Combine.globally(new Concatenate()).withoutDefaults())).apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))).apply(View.CreatePCollectionView.of(view)));
        }

        @Override
        protected String getKindString() {
            return "StreamingViewAsMap";
        }
    }

    private static class StreamingPCollectionViewWriterFn<T>
    extends DoFn<Iterable<T>, T>
    implements DoFn.RequiresWindowAccess {
        private final PCollectionView<?> view;
        private final Coder<T> dataCoder;

        public static <T> StreamingPCollectionViewWriterFn<T> create(PCollectionView<?> view, Coder<T> dataCoder) {
            return new StreamingPCollectionViewWriterFn<T>(view, dataCoder);
        }

        private StreamingPCollectionViewWriterFn(PCollectionView<?> view, Coder<T> dataCoder) {
            this.view = view;
            this.dataCoder = dataCoder;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            ArrayList output = new ArrayList();
            for (Object elem : (Iterable)c.element()) {
                output.add(WindowedValue.of(elem, c.timestamp(), c.window(), c.pane()));
            }
            c.windowingInternals().writePCollectionViewData(this.view.getTagInternal(), output, this.dataCoder);
        }
    }

    private static class StreamingCreate<T>
    extends PTransform<PInput, PCollection<T>> {
        private final Create.Values<T> transform;

        public StreamingCreate(DataflowPipelineRunner runner, Create.Values<T> transform) {
            this.transform = transform;
        }

        @Override
        public PCollection<T> apply(PInput input) {
            try {
                Coder<T> coder = this.transform.getDefaultOutputCoder(input);
                return ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((Object)((PCollection)Pipeline.applyTransform(input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/"))).apply(ParDo.of(new OutputNullKv())))).apply("GlobalSingleton", Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())).apply(GroupByKey.create())).setWindowingStrategyInternal(WindowingStrategy.globalDefault()).apply(Window.into(new GlobalWindows()))).apply(ParDo.of(new OutputElements<T>(this.transform.getElements(), coder)))).setCoder((Coder)coder)).setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
            }
            catch (CannotProvideCoderException e) {
                throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly.", e);
            }
        }

        @Override
        protected String getKindString() {
            return "StreamingCreate";
        }

        private static class OutputElements<T>
        extends DoFn<Object, T> {
            private final Coder<T> coder;
            private final List<byte[]> encodedElements;

            public OutputElements(Iterable<T> elems, Coder<T> coder) {
                this.coder = coder;
                this.encodedElements = new ArrayList<byte[]>();
                for (T t : elems) {
                    try {
                        this.encodedElements.add(CoderUtils.encodeToByteArray(coder, t));
                    }
                    catch (CoderException e) {
                        throw new IllegalArgumentException("Unable to encode value " + t + " with coder " + coder, e);
                    }
                }
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws IOException {
                for (byte[] encodedElement : this.encodedElements) {
                    c.output(CoderUtils.decodeFromByteArray(this.coder, encodedElement));
                }
            }
        }

        private static class OutputNullKv
        extends DoFn<String, KV<Void, Void>> {
            private OutputNullKv() {
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                c.output(KV.of(null, null));
            }
        }
    }

    private static class StreamingBoundedRead<T>
    extends PTransform<PInput, PCollection<T>> {
        private final BoundedSource<T> source;

        public StreamingBoundedRead(DataflowPipelineRunner runner, Read.Bounded<T> transform) {
            this.source = transform.getSource();
        }

        @Override
        protected Coder<T> getDefaultOutputCoder() {
            return this.source.getDefaultOutputCoder();
        }

        @Override
        public final PCollection<T> apply(PInput input) {
            this.source.validate();
            return ((PCollection)Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<T>(this.source))).setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
        }
    }

    private static class Deduplicate<T>
    extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
        private static final int NUM_RESHARD_KEYS = 10000;

        private Deduplicate() {
        }

        @Override
        public PCollection<T> apply(PCollection<ValueWithRecordId<T>> input) {
            return (PCollection)((PCollection)((PCollection)input.apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>(){

                @Override
                public Integer apply(ValueWithRecordId<T> value) {
                    return Arrays.hashCode(value.getId()) % 10000;
                }
            }))).apply(Reshuffle.of())).apply(ParDo.named("StripIds").of(new DoFn<KV<Integer, ValueWithRecordId<T>>, T>(){

                @Override
                public void processElement(DoFn.ProcessContext c) {
                    c.output(((ValueWithRecordId)((KV)c.element()).getValue()).getValue());
                }
            }));
        }
    }

    private static class StreamingUnboundedRead<T>
    extends PTransform<PInput, PCollection<T>> {
        private final UnboundedSource<T, ?> source;

        public StreamingUnboundedRead(DataflowPipelineRunner runner, Read.Unbounded<T> transform) {
            this.source = transform.getSource();
        }

        @Override
        protected Coder<T> getDefaultOutputCoder() {
            return this.source.getDefaultOutputCoder();
        }

        @Override
        public final PCollection<T> apply(PInput input) {
            this.source.validate();
            if (this.source.requiresDeduping()) {
                return (PCollection)((PCollection)Pipeline.applyTransform(input, new ReadWithIds(this.source))).apply(new Deduplicate());
            }
            return ((PCollection)Pipeline.applyTransform(input, new ReadWithIds(this.source))).apply(ValueWithRecordId.stripIds());
        }

        @Override
        public String getKindString() {
            return "Read(" + StringUtils.approximateSimpleName(this.source.getClass()) + ")";
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(ReadWithIds.class, new ReadWithIdsTranslator());
        }

        private static class ReadWithIdsTranslator
        implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> {
            private ReadWithIdsTranslator() {
            }

            @Override
            public void translate(ReadWithIds<?> transform, DataflowPipelineTranslator.TranslationContext context) {
                ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
            }
        }

        private static class ReadWithIds<T>
        extends PTransform<PInput, PCollection<ValueWithRecordId<T>>> {
            private final UnboundedSource<T, ?> source;

            private ReadWithIds(UnboundedSource<T, ?> source) {
                this.source = source;
            }

            @Override
            public final PCollection<ValueWithRecordId<T>> apply(PInput input) {
                return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
            }

            @Override
            protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
                return ValueWithRecordId.ValueWithRecordIdCoder.of(this.source.getDefaultOutputCoder());
            }

            public UnboundedSource<T, ?> getSource() {
                return this.source;
            }
        }
    }

    private static class StreamingPubsubIOWriteTranslator
    implements DataflowPipelineTranslator.TransformTranslator<StreamingPubsubIOWrite> {
        private StreamingPubsubIOWriteTranslator() {
        }

        @Override
        public void translate(StreamingPubsubIOWrite transform, DataflowPipelineTranslator.TranslationContext context) {
            this.translateTyped(transform, context);
        }

        private <T> void translateTyped(StreamingPubsubIOWrite<T> transform, DataflowPipelineTranslator.TranslationContext context) {
            Preconditions.checkState(context.getPipelineOptions().isStreaming(), "StreamingPubsubIOWrite is only for streaming pipelines.");
            PubsubIO.Write.Bound<T> overriddenTransform = transform.getOverriddenTransform();
            context.addStep(transform, "ParallelWrite");
            context.addInput("format", "pubsub");
            context.addInput("pubsub_topic", overriddenTransform.getTopic().asV1Beta1Path());
            if (overriddenTransform.getTimestampLabel() != null) {
                context.addInput("pubsub_timestamp_label", overriddenTransform.getTimestampLabel());
            }
            if (overriddenTransform.getIdLabel() != null) {
                context.addInput("pubsub_id_label", overriddenTransform.getIdLabel());
            }
            context.addEncodingInput(WindowedValue.getValueOnlyCoder(overriddenTransform.getCoder()));
            context.addInput("parallel_input", (PInput)context.getInput(transform));
        }
    }

    private static class StreamingPubsubIOWrite<T>
    extends PTransform<PCollection<T>, PDone> {
        private final PubsubIO.Write.Bound<T> transform;

        public StreamingPubsubIOWrite(DataflowPipelineRunner runner, PubsubIO.Write.Bound<T> transform) {
            this.transform = transform;
        }

        PubsubIO.Write.Bound<T> getOverriddenTransform() {
            return this.transform;
        }

        @Override
        public PDone apply(PCollection<T> input) {
            return PDone.in(input.getPipeline());
        }

        @Override
        protected String getKindString() {
            return "StreamingPubsubIOWrite";
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator());
        }
    }

    private static class StreamingPubsubIOReadTranslator
    implements DataflowPipelineTranslator.TransformTranslator<PubsubIO.Read.Bound> {
        private StreamingPubsubIOReadTranslator() {
        }

        @Override
        public void translate(PubsubIO.Read.Bound transform, DataflowPipelineTranslator.TranslationContext context) {
            this.translateTyped(transform, context);
        }

        private <T> void translateTyped(PubsubIO.Read.Bound<T> transform, DataflowPipelineTranslator.TranslationContext context) {
            Preconditions.checkState(context.getPipelineOptions().isStreaming(), "StreamingPubsubIORead is only for streaming pipelines.");
            context.addStep(transform, "ParallelRead");
            context.addInput("format", "pubsub");
            if (transform.getTopic() != null) {
                context.addInput("pubsub_topic", transform.getTopic().asV1Beta1Path());
            }
            if (transform.getSubscription() != null) {
                context.addInput("pubsub_subscription", transform.getSubscription().asV1Beta1Path());
            }
            if (transform.getTimestampLabel() != null) {
                context.addInput("pubsub_timestamp_label", transform.getTimestampLabel());
            }
            if (transform.getIdLabel() != null) {
                context.addInput("pubsub_id_label", transform.getIdLabel());
            }
            context.addValueOnlyOutput("output", (PValue)context.getOutput(transform));
        }
    }

    private static class StreamingWrite<T>
    extends PTransform<PCollection<T>, PDone> {
        public StreamingWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) {
        }

        @Override
        public PDone apply(PCollection<T> input) {
            throw new UnsupportedOperationException("The Write transform is not supported by the Dataflow streaming runner.");
        }

        @Override
        protected String getKindString() {
            return "StreamingWrite";
        }
    }

    private static class BatchBigQueryIONativeWriteTranslator
    implements DataflowPipelineTranslator.TransformTranslator<BatchBigQueryIONativeWrite> {
        private BatchBigQueryIONativeWriteTranslator() {
        }

        @Override
        public void translate(BatchBigQueryIONativeWrite transform, DataflowPipelineTranslator.TranslationContext context) {
            this.translateWriteHelper(transform, transform.transform, context);
        }

        private void translateWriteHelper(BatchBigQueryIONativeWrite transform, BigQueryIO.Write.Bound originalTransform, DataflowPipelineTranslator.TranslationContext context) {
            if (context.getPipelineOptions().isStreaming()) {
                throw new AssertionError((Object)"BigQueryIO is specified to use streaming write in batch mode.");
            }
            TableReference table = originalTransform.getTable();
            context.addStep(transform, "ParallelWrite");
            context.addInput("format", "bigquery");
            context.addInput("table", table.getTableId());
            context.addInput("dataset", table.getDatasetId());
            if (table.getProjectId() != null) {
                context.addInput("project", table.getProjectId());
            }
            if (originalTransform.getSchema() != null) {
                try {
                    context.addInput("schema", JSON_FACTORY.toString((Object)originalTransform.getSchema()));
                }
                catch (IOException exn) {
                    throw new IllegalArgumentException("Invalid table schema.", exn);
                }
            }
            context.addInput("create_disposition", originalTransform.getCreateDisposition().name());
            context.addInput("write_disposition", originalTransform.getWriteDisposition().name());
            context.addEncodingInput(WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of()));
            context.addInput("parallel_input", context.getInput(transform));
        }
    }

    private static class BatchBigQueryIONativeWrite
    extends PTransform<PCollection<TableRow>, PDone> {
        private final BigQueryIO.Write.Bound transform;

        public BatchBigQueryIONativeWrite(BigQueryIO.Write.Bound transform) {
            this.transform = transform;
        }

        @Override
        public PDone apply(PCollection<TableRow> input) {
            return PDone.in(input.getPipeline());
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            this.transform.populateDisplayData(builder);
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(BatchBigQueryIONativeWrite.class, new BatchBigQueryIONativeWriteTranslator());
        }
    }

    private static class BatchBigQueryIOWrite
    extends PTransform<PCollection<TableRow>, PDone> {
        private final BigQueryIO.Write.Bound transform;

        public BatchBigQueryIOWrite(DataflowPipelineRunner runner, BigQueryIO.Write.Bound transform) {
            this.transform = transform;
        }

        @Override
        public PDone apply(PCollection<TableRow> input) {
            if (this.transform.getTable() == null) {
                return this.transform.apply(input);
            }
            return input.apply(new BatchBigQueryIONativeWrite(this.transform));
        }
    }

    public static class BatchBigQueryIONativeReadTranslator
    implements DataflowPipelineTranslator.TransformTranslator<BatchBigQueryIONativeRead> {
        @Override
        public void translate(BatchBigQueryIONativeRead transform, DataflowPipelineTranslator.TranslationContext context) {
            this.translateWriteHelper(transform, transform.transform, context);
        }

        private void translateWriteHelper(BatchBigQueryIONativeRead transform, BigQueryIO.Read.Bound originalTransform, DataflowPipelineTranslator.TranslationContext context) {
            context.addStep(transform, "ParallelRead");
            context.addInput("format", "bigquery");
            context.addInput("bigquery_export_format", "FORMAT_AVRO");
            if (originalTransform.getQuery() != null) {
                context.addInput("bigquery_query", originalTransform.getQuery());
                context.addInput("bigquery_flatten_results", originalTransform.getFlattenResults());
                context.addInput("bigquery_use_legacy_sql", originalTransform.getUseLegacySql());
            } else {
                TableReference table = originalTransform.getTable();
                if (table.getProjectId() == null) {
                    String projectIdFromOptions = context.getPipelineOptions().getProject();
                    LOG.warn("No project specified for BigQuery table \"{}.{}\". Assuming it is in \"{}\". If the table is in a different project please specify it as a part of the BigQuery table definition.", new Object[]{table.getDatasetId(), table.getTableId(), projectIdFromOptions});
                    table.setProjectId(projectIdFromOptions);
                }
                context.addInput("table", table.getTableId());
                context.addInput("dataset", table.getDatasetId());
                if (table.getProjectId() != null) {
                    context.addInput("project", table.getProjectId());
                }
            }
            context.addValueOnlyOutput("output", context.getOutput(transform));
        }
    }

    private static class BatchBigQueryIONativeRead
    extends PTransform<PInput, PCollection<TableRow>> {
        private final BigQueryIO.Read.Bound transform;

        public BatchBigQueryIONativeRead(DataflowPipelineRunner runner, BigQueryIO.Read.Bound transform) {
            this.transform = transform;
        }

        @Override
        public PCollection<TableRow> apply(PInput input) {
            return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED).setCoder((Coder)TableRowJsonCoder.of());
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            this.transform.populateDisplayData(builder);
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(BatchBigQueryIONativeRead.class, new BatchBigQueryIONativeReadTranslator());
        }
    }

    private static class BatchWrite<T>
    extends PTransform<PCollection<T>, PDone> {
        private final DataflowPipelineRunner runner;
        private final Write.Bound<T> transform;

        public BatchWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) {
            this.runner = runner;
            this.transform = transform;
        }

        @Override
        public PDone apply(PCollection<T> input) {
            if (this.transform.getSink() instanceof FileBasedSink) {
                FileBasedSink sink = (FileBasedSink)this.transform.getSink();
                PathValidator validator = this.runner.options.getPathValidator();
                validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
            }
            return this.transform.apply(input);
        }
    }

    private static class IterableWithWindowedValuesToIterable<V>
    implements Function<Iterable<WindowedValue<V>>, Iterable<V>>,
    Serializable {
        private static final IterableWithWindowedValuesToIterable<?> INSTANCE = new IterableWithWindowedValuesToIterable();

        private IterableWithWindowedValuesToIterable() {
        }

        private static <V> IterableWithWindowedValuesToIterable<V> of() {
            return INSTANCE;
        }

        @Override
        public Iterable<V> apply(Iterable<WindowedValue<V>> input) {
            return Iterables.transform(input, WindowedValueToValue.of());
        }
    }

    private static class WindowedValueToValue<V>
    implements Function<WindowedValue<V>, V>,
    Serializable {
        private static final WindowedValueToValue<?> INSTANCE = new WindowedValueToValue();

        private WindowedValueToValue() {
        }

        private static <V> WindowedValueToValue<V> of() {
            return INSTANCE;
        }

        @Override
        public V apply(WindowedValue<V> input) {
            return input.getValue();
        }
    }

    static class TransformedMapCoder<K, V1, V2>
    extends StandardCoder<TransformedMap<K, V1, V2>> {
        private final Coder<Function<V1, V2>> transformCoder;
        private final Coder<Map<K, V1>> originalMapCoder;

        private TransformedMapCoder(Coder<Function<V1, V2>> transformCoder, Coder<Map<K, V1>> originalMapCoder) {
            this.transformCoder = transformCoder;
            this.originalMapCoder = originalMapCoder;
        }

        public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(Coder<Function<V1, V2>> transformCoder, Coder<Map<K, V1>> originalMapCoder) {
            return new TransformedMapCoder<K, V1, V2>(transformCoder, originalMapCoder);
        }

        @JsonCreator
        public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(@JsonProperty(value="component_encodings") List<Coder<?>> components) {
            Preconditions.checkArgument(components.size() == 2, "Expecting 2 components, got " + components.size());
            Coder<Function<V1, V2>> transformCoder = components.get(0);
            Coder<Map<K, V1>> originalMapCoder = components.get(1);
            return TransformedMapCoder.of(transformCoder, originalMapCoder);
        }

        @Override
        public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException {
            this.transformCoder.encode(((TransformedMap)value).transform, outStream, context.nested());
            this.originalMapCoder.encode(((TransformedMap)value).originalMap, outStream, context.nested());
        }

        @Override
        public TransformedMap<K, V1, V2> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException {
            return new TransformedMap(this.transformCoder.decode(inStream, context.nested()), this.originalMapCoder.decode(inStream, context.nested()));
        }

        @Override
        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.transformCoder, this.originalMapCoder);
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.verifyDeterministic("Expected transform coder to be deterministic.", this.transformCoder);
            this.verifyDeterministic("Expected map coder to be deterministic.", this.originalMapCoder);
        }
    }

    static class TransformedMap<K, V1, V2>
    extends ForwardingMap<K, V2> {
        private final Function<V1, V2> transform;
        private final Map<K, V1> originalMap;
        private final Map<K, V2> transformedMap;

        private TransformedMap(Function<V1, V2> transform, Map<K, V1> originalMap) {
            this.transform = transform;
            this.originalMap = Collections.unmodifiableMap(originalMap);
            this.transformedMap = Maps.transformValues(originalMap, transform);
        }

        @Override
        protected Map<K, V2> delegate() {
            return this.transformedMap;
        }
    }

    static class BatchViewAsMultimap<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
        private final DataflowPipelineRunner runner;

        public BatchViewAsMultimap(DataflowPipelineRunner runner, View.AsMultimap<K, V> transform) {
            this.runner = runner;
        }

        @Override
        public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
            return this.applyInternal(input);
        }

        private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> applyInternal(PCollection<KV<K, V>> input) {
            KvCoder inputCoder = (KvCoder)input.getCoder();
            try {
                PCollectionView view = PCollectionViews.multimapView(input.getPipeline(), input.getWindowingStrategy(), inputCoder);
                return BatchViewAsMultimap.applyForMapLike(this.runner, input, view, false);
            }
            catch (Coder.NonDeterministicException e) {
                this.runner.recordViewUsesNonDeterministicKeyCoder(this);
                return this.applyForSingletonFallback(input);
            }
        }

        private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> applyForSingletonFallback(PCollection<KV<K, V>> input) {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            KvCoder inputCoder = (KvCoder)input.getCoder();
            SerializableCoder transformCoder = SerializableCoder.of(IterableWithWindowedValuesToIterable.class);
            TransformedMapCoder finalValueCoder = TransformedMapCoder.of(transformCoder, MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(WindowedValue.FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));
            TransformedMap defaultValue = new TransformedMap(IterableWithWindowedValuesToIterable.of(), ImmutableMap.of());
            return BatchViewAsSingleton.applyForSingleton(this.runner, input, new ToMultimapDoFn(windowCoder), true, defaultValue, finalValueCoder);
        }

        private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(DataflowPipelineRunner runner, PCollection<KV<K, V>> input, PCollectionView<ViewT> view, boolean uniqueKeysExpected) throws Coder.NonDeterministicException {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            KvCoder inputCoder = (KvCoder)input.getCoder();
            inputCoder.getKeyCoder().verifyDeterministic();
            IsmFormat.IsmRecordCoder ismCoder = BatchViewAsMultimap.coderForMapLike(windowCoder, inputCoder.getKeyCoder(), inputCoder.getValueCoder());
            TupleTag mainOutputTag = new TupleTag();
            TupleTag outputForSizeTag = new TupleTag();
            TupleTag outputForEntrySetTag = new TupleTag();
            PCollectionTuple outputTuple = (PCollectionTuple)((PCollection)input.apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow(ismCoder))).apply(ParDo.of(new ToIsmRecordForMapLikeDoFn(outputForSizeTag, outputForEntrySetTag, windowCoder, inputCoder.getKeyCoder(), ismCoder, uniqueKeysExpected)).withOutputTags(mainOutputTag, TupleTagList.of(ImmutableList.of(outputForSizeTag, outputForEntrySetTag))));
            PCollection perHashWithReifiedWindows = outputTuple.get(mainOutputTag);
            perHashWithReifiedWindows.setCoder((Coder)ismCoder);
            PCollection outputForSize = outputTuple.get(outputForSizeTag);
            outputForSize.setCoder((Coder)KvCoder.of(VarIntCoder.of(), KvCoder.of(windowCoder, VarLongCoder.of())));
            PCollection windowMapSizeMetadata = (PCollection)((PCollection)outputForSize.apply("GBKaSVForSize", new GroupByKeyAndSortValuesOnly())).apply(ParDo.of(new ToIsmMetadataRecordForSizeDoFn(windowCoder)));
            windowMapSizeMetadata.setCoder((Coder)ismCoder);
            PCollection outputForEntrySet = outputTuple.get(outputForEntrySetTag);
            outputForEntrySet.setCoder((Coder)KvCoder.of(VarIntCoder.of(), KvCoder.of(windowCoder, inputCoder.getKeyCoder())));
            PCollection windowMapKeysMetadata = (PCollection)((PCollection)outputForEntrySet.apply("GBKaSVForKeys", new GroupByKeyAndSortValuesOnly())).apply(ParDo.of(new ToIsmMetadataRecordForKeyDoFn(inputCoder.getKeyCoder(), windowCoder)));
            windowMapKeysMetadata.setCoder((Coder)ismCoder);
            runner.addPCollectionRequiringIndexedFormat(perHashWithReifiedWindows);
            runner.addPCollectionRequiringIndexedFormat(windowMapSizeMetadata);
            runner.addPCollectionRequiringIndexedFormat(windowMapKeysMetadata);
            PCollectionList outputs = PCollectionList.of(ImmutableList.of(perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
            return (PCollectionView)((PCollection)Pipeline.applyTransform(outputs, Flatten.pCollections())).apply(View.CreatePCollectionView.of(view));
        }

        @Override
        protected String getKindString() {
            return "BatchViewAsMultimap";
        }

        static <V> IsmFormat.IsmRecordCoder<WindowedValue<V>> coderForMapLike(Coder<? extends BoundedWindow> windowCoder, Coder<?> keyCoder, Coder<V> valueCoder) {
            return IsmFormat.IsmRecordCoder.of(1, 2, ImmutableList.of(IsmFormat.MetadataKeyCoder.of(keyCoder), windowCoder, BigEndianLongCoder.of()), WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder));
        }

        static class ToMultimapDoFn<K, V, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, IsmFormat.IsmRecord<WindowedValue<TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>>>> {
            private final Coder<W> windowCoder;

            ToMultimapDoFn(Coder<W> windowCoder) {
                this.windowCoder = windowCoder;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Optional<Object> previousWindowStructuralValue = Optional.absent();
                Optional<Object> previousWindow = Optional.absent();
                HashMultimap multimap = HashMultimap.create();
                for (KV kv : (Iterable)((KV)c.element()).getValue()) {
                    Object currentWindowStructuralValue = this.windowCoder.structuralValue(kv.getKey());
                    if (previousWindowStructuralValue.isPresent() && !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
                        Map resultMap = multimap.asMap();
                        c.output(IsmFormat.IsmRecord.of(ImmutableList.of(previousWindow.get()), WindowedValue.valueInEmptyWindows(new TransformedMap(IterableWithWindowedValuesToIterable.of(), resultMap))));
                        multimap = HashMultimap.create();
                    }
                    multimap.put(((KV)((WindowedValue)kv.getValue()).getValue()).getKey(), ((WindowedValue)kv.getValue()).withValue(((KV)((WindowedValue)kv.getValue()).getValue()).getValue()));
                    previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
                    previousWindow = Optional.of(kv.getKey());
                }
                Map resultMap = multimap.asMap();
                c.output(IsmFormat.IsmRecord.of(ImmutableList.of(previousWindow.get()), WindowedValue.valueInEmptyWindows(new TransformedMap(IterableWithWindowedValuesToIterable.of(), resultMap))));
            }
        }

        static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmFormat.IsmRecord<WindowedValue<V>>> {
            private final Coder<K> keyCoder;
            private final Coder<W> windowCoder;

            ToIsmMetadataRecordForKeyDoFn(Coder<K> keyCoder, Coder<W> windowCoder) {
                this.keyCoder = keyCoder;
                this.windowCoder = windowCoder;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Iterator iterator = ((Iterable)((KV)c.element()).getValue()).iterator();
                KV currentValue = (KV)iterator.next();
                Object currentWindowStructuralValue = this.windowCoder.structuralValue(currentValue.getKey());
                long elementsInWindow = 1L;
                while (iterator.hasNext()) {
                    KV nextValue = (KV)iterator.next();
                    Object nextWindowStructuralValue = this.windowCoder.structuralValue(nextValue.getKey());
                    c.output(IsmFormat.IsmRecord.meta(ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), Long.valueOf(elementsInWindow)), CoderUtils.encodeToByteArray(this.keyCoder, currentValue.getValue())));
                    ++elementsInWindow;
                    if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) {
                        elementsInWindow = 1L;
                    }
                    currentValue = nextValue;
                    currentWindowStructuralValue = nextWindowStructuralValue;
                }
                c.output(IsmFormat.IsmRecord.meta(ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), Long.valueOf(elementsInWindow)), CoderUtils.encodeToByteArray(this.keyCoder, currentValue.getValue())));
            }
        }

        static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmFormat.IsmRecord<WindowedValue<V>>> {
            private final Coder<W> windowCoder;

            ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
                this.windowCoder = windowCoder;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Iterator iterator = ((Iterable)((KV)c.element()).getValue()).iterator();
                KV currentValue = (KV)iterator.next();
                Object currentWindowStructuralValue = this.windowCoder.structuralValue(currentValue.getKey());
                long size = 0L;
                while (iterator.hasNext()) {
                    KV nextValue = (KV)iterator.next();
                    Object nextWindowStructuralValue = this.windowCoder.structuralValue(nextValue.getKey());
                    size += ((Long)currentValue.getValue()).longValue();
                    if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) {
                        c.output(IsmFormat.IsmRecord.meta(ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), Long.valueOf(0L)), CoderUtils.encodeToByteArray(VarLongCoder.of(), size)));
                        size = 0L;
                    }
                    currentValue = nextValue;
                    currentWindowStructuralValue = nextWindowStructuralValue;
                }
                c.output(IsmFormat.IsmRecord.meta(ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), Long.valueOf(0L)), CoderUtils.encodeToByteArray(VarLongCoder.of(), size += ((Long)currentValue.getValue()).longValue())));
            }
        }

        static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>, IsmFormat.IsmRecord<WindowedValue<V>>> {
            private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
            private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet;
            private final Coder<W> windowCoder;
            private final Coder<K> keyCoder;
            private final IsmFormat.IsmRecordCoder<WindowedValue<V>> ismCoder;
            private final boolean uniqueKeysExpected;

            ToIsmRecordForMapLikeDoFn(TupleTag<KV<Integer, KV<W, Long>>> outputForSize, TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet, Coder<W> windowCoder, Coder<K> keyCoder, IsmFormat.IsmRecordCoder<WindowedValue<V>> ismCoder, boolean uniqueKeysExpected) {
                this.outputForSize = outputForSize;
                this.outputForEntrySet = outputForEntrySet;
                this.windowCoder = windowCoder;
                this.keyCoder = keyCoder;
                this.ismCoder = ismCoder;
                this.uniqueKeysExpected = uniqueKeysExpected;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                long currentKeyIndex = 0L;
                long currentUniqueKeyCounter = 1L;
                Iterator iterator = ((Iterable)((KV)c.element()).getValue()).iterator();
                KV currentValue = (KV)iterator.next();
                Object currentKeyStructuralValue = this.keyCoder.structuralValue(((KV)currentValue.getKey()).getKey());
                Object currentWindowStructuralValue = this.windowCoder.structuralValue(((KV)currentValue.getKey()).getValue());
                while (iterator.hasNext()) {
                    long nextUniqueKeyCounter;
                    long nextKeyIndex;
                    KV nextValue = (KV)iterator.next();
                    Object nextKeyStructuralValue = this.keyCoder.structuralValue(((KV)nextValue.getKey()).getKey());
                    Object nextWindowStructuralValue = this.windowCoder.structuralValue(((KV)nextValue.getKey()).getValue());
                    this.outputDataRecord(c, currentValue, currentKeyIndex);
                    if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) {
                        this.outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter);
                        this.outputMetadataRecordForEntrySet(c, currentValue);
                        nextKeyIndex = 0L;
                        nextUniqueKeyCounter = 1L;
                    } else if (!currentKeyStructuralValue.equals(nextKeyStructuralValue)) {
                        this.outputMetadataRecordForEntrySet(c, currentValue);
                        nextKeyIndex = 0L;
                        nextUniqueKeyCounter = currentUniqueKeyCounter + 1L;
                    } else if (!this.uniqueKeysExpected) {
                        nextKeyIndex = currentKeyIndex + 1L;
                        nextUniqueKeyCounter = currentUniqueKeyCounter;
                    } else {
                        throw new IllegalStateException(String.format("Unique keys are expected but found key %s with values %s and %s in window %s.", ((KV)currentValue.getKey()).getKey(), ((WindowedValue)currentValue.getValue()).getValue(), ((WindowedValue)nextValue.getValue()).getValue(), ((KV)currentValue.getKey()).getValue()));
                    }
                    currentValue = nextValue;
                    currentWindowStructuralValue = nextWindowStructuralValue;
                    currentKeyStructuralValue = nextKeyStructuralValue;
                    currentKeyIndex = nextKeyIndex;
                    currentUniqueKeyCounter = nextUniqueKeyCounter;
                }
                this.outputDataRecord(c, currentValue, currentKeyIndex);
                this.outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter);
                this.outputMetadataRecordForEntrySet(c, currentValue);
            }

            private void outputDataRecord(DoFn.ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long keyIndex) {
                IsmFormat.IsmRecord<WindowedValue<V>> ismRecord = IsmFormat.IsmRecord.of(ImmutableList.of(value.getKey().getKey(), value.getKey().getValue(), Long.valueOf(keyIndex)), value.getValue());
                c.output(ismRecord);
            }

            private void outputMetadataRecordForSize(DoFn.ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long uniqueKeyCount) {
                c.sideOutput(this.outputForSize, KV.of(this.ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), uniqueKeyCount)));
            }

            private void outputMetadataRecordForEntrySet(DoFn.ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value) {
                c.sideOutput(this.outputForEntrySet, KV.of(this.ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), value.getKey().getKey())));
            }
        }

        private static class GroupByKeyHashAndSortByKeyAndWindow<K, V, W extends BoundedWindow>
        extends PTransform<PCollection<KV<K, V>>, PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>> {
            private final IsmFormat.IsmRecordCoder<?> coder;

            public GroupByKeyHashAndSortByKeyAndWindow(IsmFormat.IsmRecordCoder<?> coder) {
                this.coder = coder;
            }

            @Override
            public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>> apply(PCollection<KV<K, V>> input) {
                Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
                KvCoder inputCoder = (KvCoder)input.getCoder();
                PCollection keyedByHash = (PCollection)input.apply(ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn(this.coder)));
                keyedByHash.setCoder((Coder)KvCoder.of(VarIntCoder.of(), KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), windowCoder), WindowedValue.FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));
                return (PCollection)keyedByHash.apply(new GroupByKeyAndSortValuesOnly());
            }

            @SystemDoFnInternal
            private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
            extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
            implements DoFn.RequiresWindowAccess {
                private final IsmFormat.IsmRecordCoder<?> coder;

                private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmFormat.IsmRecordCoder<?> coder) {
                    this.coder = coder;
                }

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    BoundedWindow window = c.window();
                    c.output(KV.of(this.coder.hash(ImmutableList.of(((KV)c.element()).getKey())), KV.of(KV.of(((KV)c.element()).getKey(), window), WindowedValue.of(((KV)c.element()).getValue(), c.timestamp(), window, c.pane()))));
                }
            }
        }
    }

    static class BatchViewAsMap<K, V>
    extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
        private final DataflowPipelineRunner runner;

        public BatchViewAsMap(DataflowPipelineRunner runner, View.AsMap<K, V> transform) {
            this.runner = runner;
        }

        @Override
        public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
            return this.applyInternal(input);
        }

        private <W extends BoundedWindow> PCollectionView<Map<K, V>> applyInternal(PCollection<KV<K, V>> input) {
            KvCoder inputCoder = (KvCoder)input.getCoder();
            try {
                PCollectionView view = PCollectionViews.mapView(input.getPipeline(), input.getWindowingStrategy(), inputCoder);
                return BatchViewAsMultimap.applyForMapLike(this.runner, input, view, true);
            }
            catch (Coder.NonDeterministicException e) {
                this.runner.recordViewUsesNonDeterministicKeyCoder(this);
                return this.applyForSingletonFallback(input);
            }
        }

        @Override
        protected String getKindString() {
            return "BatchViewAsMap";
        }

        private <W extends BoundedWindow> PCollectionView<Map<K, V>> applyForSingletonFallback(PCollection<KV<K, V>> input) {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            KvCoder inputCoder = (KvCoder)input.getCoder();
            SerializableCoder transformCoder = SerializableCoder.of(WindowedValueToValue.class);
            TransformedMapCoder finalValueCoder = TransformedMapCoder.of(transformCoder, MapCoder.of(inputCoder.getKeyCoder(), WindowedValue.FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)));
            TransformedMap defaultValue = new TransformedMap(WindowedValueToValue.of(), ImmutableMap.of());
            return BatchViewAsSingleton.applyForSingleton(this.runner, input, new ToMapDoFn(windowCoder), true, defaultValue, finalValueCoder);
        }

        static class ToMapDoFn<K, V, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, IsmFormat.IsmRecord<WindowedValue<TransformedMap<K, WindowedValue<V>, V>>>> {
            private final Coder<W> windowCoder;

            ToMapDoFn(Coder<W> windowCoder) {
                this.windowCoder = windowCoder;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Optional<Object> previousWindowStructuralValue = Optional.absent();
                Optional<Object> previousWindow = Optional.absent();
                HashMap map = new HashMap();
                for (KV kv : (Iterable)((KV)c.element()).getValue()) {
                    Object currentWindowStructuralValue = this.windowCoder.structuralValue(kv.getKey());
                    if (previousWindowStructuralValue.isPresent() && !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
                        c.output(IsmFormat.IsmRecord.of(ImmutableList.of(previousWindow.get()), WindowedValue.valueInEmptyWindows(new TransformedMap(WindowedValueToValue.of(), map))));
                        map = new HashMap();
                    }
                    Preconditions.checkState(!map.containsKey(((KV)((WindowedValue)kv.getValue()).getValue()).getKey()), "Multiple values [%s, %s] found for single key [%s] within window [%s].", map.get(((KV)((WindowedValue)kv.getValue()).getValue()).getKey()), ((KV)((WindowedValue)kv.getValue()).getValue()).getValue(), kv.getKey());
                    map.put(((KV)((WindowedValue)kv.getValue()).getValue()).getKey(), ((WindowedValue)kv.getValue()).withValue(((KV)((WindowedValue)kv.getValue()).getValue()).getValue()));
                    previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
                    previousWindow = Optional.of(kv.getKey());
                }
                c.output(IsmFormat.IsmRecord.of(ImmutableList.of(previousWindow.get()), WindowedValue.valueInEmptyWindows(new TransformedMap(WindowedValueToValue.of(), map))));
            }
        }
    }

    static class BatchViewAsList<T>
    extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
        private final DataflowPipelineRunner runner;

        public BatchViewAsList(DataflowPipelineRunner runner, View.AsList<T> transform) {
            this.runner = runner;
        }

        @Override
        public PCollectionView<List<T>> apply(PCollection<T> input) {
            PCollectionView<List<T>> view = PCollectionViews.listView(input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
            return BatchViewAsList.applyForIterableLike(this.runner, input, view);
        }

        static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike(DataflowPipelineRunner runner, PCollection<T> input, PCollectionView<ViewT> view) {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            IsmFormat.IsmRecordCoder<WindowedValue<T>> ismCoder = BatchViewAsList.coderForListLike(windowCoder, input.getCoder());
            if (input.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) {
                PCollection reifiedPerWindowAndSorted = (PCollection)input.apply(ParDo.of(new ToIsmRecordForGlobalWindowDoFn()));
                reifiedPerWindowAndSorted.setCoder(ismCoder);
                runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
                return (PCollectionView)reifiedPerWindowAndSorted.apply(View.CreatePCollectionView.of(view));
            }
            PCollection reifiedPerWindowAndSorted = (PCollection)((PCollection)input.apply(new GroupByWindowHashAsKeyAndWindowAsSortKey(ismCoder))).apply(ParDo.of(new ToIsmRecordForNonGlobalWindowDoFn(windowCoder)));
            reifiedPerWindowAndSorted.setCoder(ismCoder);
            runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
            return (PCollectionView)reifiedPerWindowAndSorted.apply(View.CreatePCollectionView.of(view));
        }

        @Override
        protected String getKindString() {
            return "BatchViewAsList";
        }

        static <T> IsmFormat.IsmRecordCoder<WindowedValue<T>> coderForListLike(Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) {
            return IsmFormat.IsmRecordCoder.of(1, 0, ImmutableList.of(windowCoder, BigEndianLongCoder.of()), WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder));
        }

        @SystemDoFnInternal
        static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmFormat.IsmRecord<WindowedValue<T>>> {
            private final Coder<W> windowCoder;

            ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) {
                this.windowCoder = windowCoder;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                long elementsInWindow = 0L;
                Optional<Object> previousWindowStructuralValue = Optional.absent();
                for (KV value : (Iterable)((KV)c.element()).getValue()) {
                    Object currentWindowStructuralValue = this.windowCoder.structuralValue(value.getKey());
                    if (previousWindowStructuralValue.isPresent() && !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
                        elementsInWindow = 0L;
                    }
                    c.output(IsmFormat.IsmRecord.of(ImmutableList.of(value.getKey(), Long.valueOf(elementsInWindow)), value.getValue()));
                    previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
                    ++elementsInWindow;
                }
            }
        }

        @SystemDoFnInternal
        static class ToIsmRecordForGlobalWindowDoFn<T>
        extends DoFn<T, IsmFormat.IsmRecord<WindowedValue<T>>> {
            long indexInBundle;

            ToIsmRecordForGlobalWindowDoFn() {
            }

            @Override
            public void startBundle(DoFn.Context c) throws Exception {
                this.indexInBundle = 0L;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                c.output(IsmFormat.IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, Long.valueOf(this.indexInBundle)), WindowedValue.of(c.element(), c.timestamp(), GlobalWindow.INSTANCE, c.pane())));
                ++this.indexInBundle;
            }
        }
    }

    static class BatchViewAsIterable<T>
    extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
        private final DataflowPipelineRunner runner;

        public BatchViewAsIterable(DataflowPipelineRunner runner, View.AsIterable<T> transform) {
            this.runner = runner;
        }

        @Override
        public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
            PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
            return BatchViewAsList.applyForIterableLike(this.runner, input, view);
        }
    }

    static class BatchViewAsSingleton<T>
    extends PTransform<PCollection<T>, PCollectionView<T>> {
        private final DataflowPipelineRunner runner;
        private final View.AsSingleton<T> transform;

        public BatchViewAsSingleton(DataflowPipelineRunner runner, View.AsSingleton<T> transform) {
            this.runner = runner;
            this.transform = transform;
        }

        @Override
        public PCollectionView<T> apply(PCollection<T> input) {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            return BatchViewAsSingleton.applyForSingleton(this.runner, input, new IsmRecordForSingularValuePerWindowDoFn(windowCoder), this.transform.hasDefaultValue(), this.transform.defaultValue(), input.getCoder());
        }

        static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT> applyForSingleton(DataflowPipelineRunner runner, PCollection<T> input, DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmFormat.IsmRecord<WindowedValue<FinalT>>> doFn, boolean hasDefault, FinalT defaultValue, Coder<FinalT> defaultValueCoder) {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            PCollectionView<FinalT> view = PCollectionViews.singletonView(input.getPipeline(), input.getWindowingStrategy(), hasDefault, defaultValue, defaultValueCoder);
            IsmFormat.IsmRecordCoder<WindowedValue<FinalT>> ismCoder = BatchViewAsSingleton.coderForSingleton(windowCoder, defaultValueCoder);
            PCollection reifiedPerWindowAndSorted = (PCollection)((Object)((PCollection)input.apply(new GroupByWindowHashAsKeyAndWindowAsSortKey(ismCoder))).apply(ParDo.of(doFn)));
            reifiedPerWindowAndSorted.setCoder(ismCoder);
            runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
            return (PCollectionView)reifiedPerWindowAndSorted.apply(View.CreatePCollectionView.of(view));
        }

        @Override
        protected String getKindString() {
            return "BatchViewAsSingleton";
        }

        static <T> IsmFormat.IsmRecordCoder<WindowedValue<T>> coderForSingleton(Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) {
            return IsmFormat.IsmRecordCoder.of(1, 0, ImmutableList.of(windowCoder), WindowedValue.FullWindowedValueCoder.of(valueCoder, windowCoder));
        }

        static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow>
        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmFormat.IsmRecord<WindowedValue<T>>> {
            private final Coder<W> windowCoder;

            IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) {
                this.windowCoder = windowCoder;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Optional<Object> previousWindowStructuralValue = Optional.absent();
                Object previousValue = null;
                for (KV next : (Iterable)((KV)c.element()).getValue()) {
                    Object currentWindowStructuralValue = this.windowCoder.structuralValue(next.getKey());
                    Preconditions.checkState(!previousWindowStructuralValue.isPresent() || !previousWindowStructuralValue.get().equals(currentWindowStructuralValue), "Multiple values [%s, %s] found for singleton within window [%s].", previousValue, ((WindowedValue)next.getValue()).getValue(), next.getKey());
                    c.output(IsmFormat.IsmRecord.of(ImmutableList.of(next.getKey()), next.getValue()));
                    previousWindowStructuralValue = Optional.of(currentWindowStructuralValue);
                    previousValue = ((WindowedValue)next.getValue()).getValue();
                }
            }
        }
    }

    private static class GroupByWindowHashAsKeyAndWindowAsSortKey<T, W extends BoundedWindow>
    extends PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> {
        private final IsmFormat.IsmRecordCoder<?> ismCoderForHash;

        private GroupByWindowHashAsKeyAndWindowAsSortKey(IsmFormat.IsmRecordCoder<?> ismCoderForHash) {
            this.ismCoderForHash = ismCoderForHash;
        }

        @Override
        public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> apply(PCollection<T> input) {
            Coder<?> windowCoder = input.getWindowingStrategy().getWindowFn().windowCoder();
            PCollection rval = (PCollection)input.apply(ParDo.of(new UseWindowHashAsKeyAndWindowAsSortKeyDoFn(this.ismCoderForHash)));
            rval.setCoder((Coder)KvCoder.of(VarIntCoder.of(), KvCoder.of(windowCoder, WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowCoder))));
            return (PCollection)rval.apply(new GroupByKeyAndSortValuesOnly());
        }

        @SystemDoFnInternal
        private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow>
        extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>>
        implements DoFn.RequiresWindowAccess {
            private final IsmFormat.IsmRecordCoder<?> ismCoderForHash;

            private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmFormat.IsmRecordCoder<?> ismCoderForHash) {
                this.ismCoderForHash = ismCoderForHash;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                BoundedWindow window = c.window();
                c.output(KV.of(this.ismCoderForHash.hash(ImmutableList.of(window)), KV.of(window, WindowedValue.of(c.element(), c.timestamp(), c.window(), c.pane()))));
            }
        }
    }

    static class GroupByKeyAndSortValuesOnly<K1, K2, V>
    extends PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1, Iterable<KV<K2, V>>>>> {
        private GroupByKeyAndSortValuesOnly() {
        }

        @Override
        public PCollection<KV<K1, Iterable<KV<K2, V>>>> apply(PCollection<KV<K1, KV<K2, V>>> input) {
            PCollection<KV<K1, Iterable<KV<K2, V>>>> rval = PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
            KvCoder inputCoder = (KvCoder)input.getCoder();
            rval.setCoder((Coder)KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
            return rval;
        }
    }
}

