/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.dataflow.model.WorkerPool;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides;
import org.apache.beam.runners.dataflow.BatchViewOverrides;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.DataflowJobAlreadyExistsException;
import org.apache.beam.runners.dataflow.DataflowJobAlreadyUpdatedException;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
import org.apache.beam.runners.dataflow.DataflowRunnerHooks;
import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory;
import org.apache.beam.runners.dataflow.ReadTranslator;
import org.apache.beam.runners.dataflow.ReshuffleOverrideFactory;
import org.apache.beam.runners.dataflow.SplittableParDoOverrides;
import org.apache.beam.runners.dataflow.StreamingViewOverrides;
import org.apache.beam.runners.dataflow.TransformTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Joiner;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Strings;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Utf8;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowRunner
extends PipelineRunner<DataflowPipelineJob> {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class);
    private final DataflowPipelineOptions options;
    private final DataflowClient dataflowClient;
    private final DataflowPipelineTranslator translator;
    private DataflowRunnerHooks hooks;
    private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 0xA00000;
    @VisibleForTesting
    static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 0x100000;
    private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
    public static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]+[a-z0-9]";
    private Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;

    public static DataflowRunner fromOptions(PipelineOptions options) {
        String stagingLocation;
        String gcpTempLocation;
        DataflowPipelineOptions dataflowOptions = (DataflowPipelineOptions)PipelineOptionsValidator.validate(DataflowPipelineOptions.class, (PipelineOptions)options);
        ArrayList<String> missing = new ArrayList<String>();
        if (dataflowOptions.getAppName() == null) {
            missing.add("appName");
        }
        if (missing.size() > 0) {
            throw new IllegalArgumentException("Missing required values: " + Joiner.on(',').join(missing));
        }
        PathValidator validator = dataflowOptions.getPathValidator();
        try {
            gcpTempLocation = dataflowOptions.getGcpTempLocation();
        }
        catch (Exception e) {
            throw new IllegalArgumentException("DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions", e);
        }
        validator.validateOutputFilePrefixSupported(gcpTempLocation);
        try {
            stagingLocation = dataflowOptions.getStagingLocation();
        }
        catch (Exception e) {
            throw new IllegalArgumentException("DataflowRunner requires stagingLocation, but failed to retrieve a value from PipelineOptions", e);
        }
        validator.validateOutputFilePrefixSupported(stagingLocation);
        if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) {
            validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs());
        }
        if (dataflowOptions.getFilesToStage() == null) {
            dataflowOptions.setFilesToStage(DataflowRunner.detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));
            if (dataflowOptions.getFilesToStage().isEmpty()) {
                throw new IllegalArgumentException("No files to stage has been found.");
            }
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", (Object)dataflowOptions.getFilesToStage().size());
            LOG.debug("Classpath elements: {}", (Object)dataflowOptions.getFilesToStage());
        }
        String jobName = dataflowOptions.getJobName().toLowerCase();
        Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; the name must consist of only the characters [-a-z0-9], starting with a letter and ending with a letter or number");
        if (!jobName.equals(dataflowOptions.getJobName())) {
            LOG.info("PipelineOptions.jobName did not match the service requirements. Using {} instead of {}.", (Object)jobName, (Object)dataflowOptions.getJobName());
        }
        dataflowOptions.setJobName(jobName);
        String project = dataflowOptions.getProject();
        if (project.matches("[0-9]*")) {
            throw new IllegalArgumentException("Project ID '" + project + "' invalid. Please make sure you specified the Project ID, not project number.");
        }
        if (!project.matches(PROJECT_ID_REGEXP)) {
            throw new IllegalArgumentException("Project ID '" + project + "' invalid. Please make sure you specified the Project ID, not project description.");
        }
        DataflowPipelineDebugOptions debugOptions = (DataflowPipelineDebugOptions)dataflowOptions.as(DataflowPipelineDebugOptions.class);
        if (debugOptions.getNumberOfWorkerHarnessThreads() < 0) {
            throw new IllegalArgumentException("Number of worker harness threads '" + debugOptions.getNumberOfWorkerHarnessThreads() + "' invalid. Please make sure the value is non-negative.");
        }
        if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
            dataflowOptions.setGcsUploadBufferSizeBytes(0x100000);
        }
        DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
        String userAgent = String.format("%s/%s", dataflowRunnerInfo.getName(), dataflowRunnerInfo.getVersion()).replace(" ", "_");
        dataflowOptions.setUserAgent(userAgent);
        return new DataflowRunner(dataflowOptions);
    }

    @VisibleForTesting
    protected DataflowRunner(DataflowPipelineOptions options) {
        this.options = options;
        this.dataflowClient = DataflowClient.create(options);
        this.translator = DataflowPipelineTranslator.fromOptions(options);
        this.pcollectionsRequiringIndexedFormat = new HashSet();
        this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet();
    }

    private List<PTransformOverride> getOverrides(boolean streaming) {
        ImmutableList.Builder overridesBuilder = ImmutableList.builder();
        ((ImmutableList.Builder)overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.flattenWithDuplicateInputs(), DeduplicatedFlattenFactory.create()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance()));
        if (streaming) {
            if (!DataflowRunner.hasExperiment(this.options, "enable_custom_pubsub_source")) {
                overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(PubsubUnboundedSource.class), (PTransformOverrideFactory)new StreamingPubsubIOReadOverrideFactory()));
            }
            if (!DataflowRunner.hasExperiment(this.options, "enable_custom_pubsub_sink")) {
                overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), (PTransformOverrideFactory)new StreamingPubsubIOWriteOverrideFactory(this)));
            }
            if (DataflowRunner.hasExperiment(this.options, "beam_fn_api")) {
                overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(Create.Values.class), new StreamingFnApiCreateOverrideFactory()));
            }
            ((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableParDoSingle(), new ReflectiveOneToOneOverrideFactory(SplittableParDoOverrides.ParDoSingleViaMulti.class, this)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrides.SplittableParDoOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.writeWithRunnerDeterminedSharding(), new StreamingShardedWriteFactory(this.options)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(Read.Bounded.class), new StreamingBoundedReadOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(Read.Unbounded.class), new StreamingUnboundedReadOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), new StreamingViewOverrides.StreamingCreatePCollectionViewFactory()));
        } else {
            ((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.stateOrTimerParDoMulti(), BatchStatefulParDoOverrides.multiOutputOverrideFactory(this.options)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.stateOrTimerParDoSingle(), BatchStatefulParDoOverrides.singleOutputOverrideFactory(this.options)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class), new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class), new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMultimap.class, this)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class), new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsSingleton.class, this)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class), new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsList.class, this)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class), new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsIterable.class, this)));
        }
        ((ImmutableList.Builder)((ImmutableList.Builder)overridesBuilder.add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(Reshuffle.class), new ReshuffleOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(Combine.GroupedValues.class), new PrimitiveCombineGroupedValuesOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.classEqualTo(ParDo.SingleOutput.class), new PrimitiveParDoSingleFactory()));
        return overridesBuilder.build();
    }

    private String debuggerMessage(String projectId, String uniquifier) {
        return String.format("To debug your job, visit Google Cloud Debugger at: https://console.developers.google.com/debug?project=%s&dbgee=%s", projectId, uniquifier);
    }

    private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
        if (!options.getEnableCloudDebugger()) {
            return;
        }
        if (options.getDebuggee() != null) {
            throw new RuntimeException("Should not specify the debuggee");
        }
        Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build();
        Debuggee debuggee = this.registerDebuggee(debuggerClient, uniquifier);
        options.setDebuggee(debuggee);
        System.out.println(this.debuggerMessage(options.getProject(), debuggee.getUniquifier()));
    }

    private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
        RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
        registerReq.setDebuggee(new Debuggee().setProject(this.options.getProject()).setUniquifier(uniquifier).setDescription(uniquifier).setAgentVersion("google.com/cloud-dataflow-java/v1"));
        try {
            RegisterDebuggeeResponse registerResponse = (RegisterDebuggeeResponse)debuggerClient.controller().debuggees().register(registerReq).execute();
            Debuggee debuggee = registerResponse.getDebuggee();
            if (debuggee.getStatus() != null && debuggee.getStatus().getIsError().booleanValue()) {
                throw new RuntimeException("Unable to register with the debugger: " + debuggee.getStatus().getDescription().getFormat());
            }
            return debuggee;
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to register with the debugger: ", e);
        }
    }

    public DataflowPipelineJob run(Pipeline pipeline) {
        Job jobResult;
        this.logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
        if (this.containsUnboundedPCollection(pipeline)) {
            this.options.setStreaming(true);
        }
        this.replaceTransforms(pipeline);
        LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.");
        List<DataflowPackage> packages = this.options.getStager().stageFiles();
        int randomNum = new Random().nextInt(9000) + 1000;
        String requestId = DateTimeFormat.forPattern((String)"YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC).print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;
        DataflowPipelineOptions dataflowOptions = (DataflowPipelineOptions)this.options.as(DataflowPipelineOptions.class);
        this.maybeRegisterDebuggee(dataflowOptions, requestId);
        DataflowPipelineTranslator.JobSpecification jobSpecification = this.translator.translate(pipeline, this, packages);
        Job newJob = jobSpecification.getJob();
        newJob.setClientRequestId(requestId);
        DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
        String version = dataflowRunnerInfo.getVersion();
        Preconditions.checkState(!version.equals("${pom.version}"), "Unable to submit a job to the Dataflow service with unset version ${pom.version}");
        System.out.println("Dataflow SDK version: " + version);
        newJob.getEnvironment().setUserAgent(dataflowRunnerInfo.getProperties());
        if (!Strings.isNullOrEmpty(this.options.getGcpTempLocation())) {
            newJob.getEnvironment().setTempStoragePrefix(dataflowOptions.getPathValidator().verifyPath(this.options.getGcpTempLocation()));
        }
        newJob.getEnvironment().setDataset(this.options.getTempDatasetId());
        newJob.getEnvironment().setExperiments(this.options.getExperiments());
        String workerHarnessContainerImage = DataflowRunner.getContainerImageForJob(this.options);
        for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
            workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
        }
        newJob.getEnvironment().setVersion(DataflowRunner.getEnvironmentVersion(this.options));
        if (this.hooks != null) {
            this.hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
        }
        if (!Strings.isNullOrEmpty(this.options.getDataflowJobFile()) || !Strings.isNullOrEmpty(this.options.getTemplateLocation())) {
            String fileLocation;
            boolean isTemplate;
            boolean bl = isTemplate = !Strings.isNullOrEmpty(this.options.getTemplateLocation());
            if (isTemplate) {
                Preconditions.checkArgument(Strings.isNullOrEmpty(this.options.getDataflowJobFile()), "--dataflowJobFile and --templateLocation are mutually exclusive.");
            }
            Preconditions.checkArgument((fileLocation = MoreObjects.firstNonNull(this.options.getTemplateLocation(), this.options.getDataflowJobFile())).startsWith("/") || fileLocation.startsWith("gs://"), "Location must be local or on Cloud Storage, got %s.", (Object)fileLocation);
            ResourceId fileResource = FileSystems.matchNewResource((String)fileLocation, (boolean)false);
            String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
            try (PrintWriter printWriter = new PrintWriter(Channels.newOutputStream(FileSystems.create((ResourceId)fileResource, (String)"text/plain")));){
                printWriter.print(workSpecJson);
                LOG.info("Printed job specification to {}", (Object)fileLocation);
            }
            catch (IOException ex) {
                String error = String.format("Cannot create output file at %s", fileLocation);
                if (isTemplate) {
                    throw new RuntimeException(error, ex);
                }
                LOG.warn(error, (Throwable)ex);
            }
            if (isTemplate) {
                LOG.info("Template successfully created.");
                return new DataflowTemplateJob();
            }
        }
        String jobIdToUpdate = null;
        if (this.options.isUpdate()) {
            jobIdToUpdate = this.getJobIdFromName(this.options.getJobName());
            newJob.setTransformNameMapping(this.options.getTransformNameMapping());
            newJob.setReplaceJobId(jobIdToUpdate);
        }
        try {
            jobResult = this.dataflowClient.createJob(newJob);
        }
        catch (GoogleJsonResponseException e) {
            String errorMessages = "Unexpected errors";
            if (e.getDetails() != null) {
                errorMessages = Utf8.encodedLength(newJob.toString()) >= 0xA00000 ? "The size of the serialized JSON representation of the pipeline exceeds the allowable limit. For more information, please check the FAQ link below:\nhttps://cloud.google.com/dataflow/faq" : e.getDetails().getMessage();
            }
            throw new RuntimeException("Failed to create a workflow job: " + errorMessages, e);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create a workflow job", e);
        }
        DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(DataflowClient.create(this.options), jobResult.getId(), this.options, jobSpecification.getStepNames());
        if (jobResult.getClientRequestId() != null && !jobResult.getClientRequestId().isEmpty() && !jobResult.getClientRequestId().equals(requestId)) {
            if (this.options.isUpdate()) {
                throw new DataflowJobAlreadyUpdatedException(dataflowPipelineJob, String.format("The job named %s with id: %s has already been updated into job id: %s and cannot be updated again.", newJob.getName(), jobIdToUpdate, jobResult.getId()));
            }
            throw new DataflowJobAlreadyExistsException(dataflowPipelineJob, String.format("There is already an active job named %s with id: %s. If you want to submit a second job, try again by setting a different name using --jobName.", newJob.getName(), jobResult.getId()));
        }
        LOG.info("To access the Dataflow monitoring console, please navigate to {}", (Object)MonitoringUtil.getJobMonitoringPageURL(this.options.getProject(), this.options.getRegion(), jobResult.getId()));
        System.out.println("Submitted job: " + jobResult.getId());
        LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}", (Object)MonitoringUtil.getGcloudCancelCommand(this.options, jobResult.getId()));
        return dataflowPipelineJob;
    }

    public static boolean hasExperiment(DataflowPipelineDebugOptions options, String experiment) {
        List experiments = MoreObjects.firstNonNull(options.getExperiments(), Collections.emptyList());
        return experiments.contains(experiment);
    }

    private static Map<String, Object> getEnvironmentVersion(DataflowPipelineOptions options) {
        String jobType;
        String majorVersion;
        DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
        if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
            majorVersion = runnerInfo.getFnApiEnvironmentMajorVersion();
            jobType = options.isStreaming() ? "FNAPI_STREAMING" : "FNAPI_BATCH";
        } else {
            majorVersion = runnerInfo.getLegacyEnvironmentMajorVersion();
            jobType = options.isStreaming() ? "STREAMING" : "JAVA_BATCH_AUTOSCALING";
        }
        return ImmutableMap.of("major", majorVersion, "job_type", jobType);
    }

    @VisibleForTesting
    void replaceTransforms(Pipeline pipeline) {
        boolean streaming = this.options.isStreaming() || this.containsUnboundedPCollection(pipeline);
        UnconsumedReads.ensureAllReadsConsumed(pipeline);
        pipeline.replaceAll(this.getOverrides(streaming));
    }

    private boolean containsUnboundedPCollection(Pipeline p) {
        class BoundednessVisitor
        extends Pipeline.PipelineVisitor.Defaults {
            PCollection.IsBounded boundedness = PCollection.IsBounded.BOUNDED;

            BoundednessVisitor() {
            }

            public void visitValue(PValue value, TransformHierarchy.Node producer) {
                if (value instanceof PCollection) {
                    this.boundedness = this.boundedness.and(((PCollection)value).isBounded());
                }
            }
        }
        BoundednessVisitor visitor = new BoundednessVisitor();
        p.traverseTopologically((Pipeline.PipelineVisitor)visitor);
        return visitor.boundedness == PCollection.IsBounded.UNBOUNDED;
    }

    public DataflowPipelineTranslator getTranslator() {
        return this.translator;
    }

    @Experimental
    public void setHooks(DataflowRunnerHooks hooks) {
        this.hooks = hooks;
    }

    private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
        if (!this.ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
            final TreeSet ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet();
            pipeline.traverseTopologically((Pipeline.PipelineVisitor)new Pipeline.PipelineVisitor.Defaults(){

                public void visitValue(PValue value, TransformHierarchy.Node producer) {
                }

                public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                    if (DataflowRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                        ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                    }
                }

                public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
                    if (node.getTransform() instanceof View.AsMap || node.getTransform() instanceof View.AsMultimap) {
                        PCollection input = (PCollection)Iterables.getOnlyElement(node.getInputs().values());
                        KvCoder inputCoder = (KvCoder)input.getCoder();
                        try {
                            inputCoder.getKeyCoder().verifyDeterministic();
                        }
                        catch (Coder.NonDeterministicException e) {
                            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                        }
                    }
                    if (DataflowRunner.this.ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
                        ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
                    }
                    return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
                }

                public void leaveCompositeTransform(TransformHierarchy.Node node) {
                }
            });
            LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} because the key coder is not deterministic. Falling back to singleton implementation which may cause memory and/or performance problems. Future major versions of Dataflow will require deterministic key coders.", ptransformViewNamesWithNonDeterministicKeyCoders);
        }
    }

    boolean doesPCollectionRequireIndexedFormat(PCollection<?> pcol) {
        return this.pcollectionsRequiringIndexedFormat.contains(pcol);
    }

    void addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
        this.pcollectionsRequiringIndexedFormat.add(pcol);
    }

    void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
        this.ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
    }

    public String toString() {
        return "DataflowRunner#" + this.options.getJobName();
    }

    protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
        if (!(classLoader instanceof URLClassLoader)) {
            String message = String.format("Unable to use ClassLoader to detect classpath elements. Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
            LOG.error(message);
            throw new IllegalArgumentException(message);
        }
        ArrayList<String> files = new ArrayList<String>();
        for (URL url : ((URLClassLoader)classLoader).getURLs()) {
            try {
                files.add(new File(url.toURI()).getAbsolutePath());
            }
            catch (IllegalArgumentException | URISyntaxException e) {
                String message = String.format("Unable to convert url (%s) to file.", url);
                LOG.error(message);
                throw new IllegalArgumentException(message, e);
            }
        }
        return files;
    }

    private String getJobIdFromName(String jobName) {
        try {
            String token = null;
            do {
                ListJobsResponse listResult = this.dataflowClient.listJobs(token);
                token = listResult.getNextPageToken();
                for (Job job : listResult.getJobs()) {
                    if (!job.getName().equals(jobName) || !MonitoringUtil.toState(job.getCurrentState()).equals((Object)PipelineResult.State.RUNNING)) continue;
                    return job.getId();
                }
            } while (token != null);
        }
        catch (GoogleJsonResponseException e) {
            throw new RuntimeException("Got error while looking up jobs: " + (e.getDetails() != null ? e.getDetails().getMessage() : e), e);
        }
        catch (IOException e) {
            throw new RuntimeException("Got error while looking up jobs: ", e);
        }
        throw new IllegalArgumentException("Could not find running job named " + jobName);
    }

    @VisibleForTesting
    static String getContainerImageForJob(DataflowPipelineOptions options) {
        String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
        if (!workerHarnessContainerImage.contains("IMAGE")) {
            return workerHarnessContainerImage;
        }
        if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
            return workerHarnessContainerImage.replace("IMAGE", "java");
        }
        if (options.isStreaming()) {
            return workerHarnessContainerImage.replace("IMAGE", "beam-java-streaming");
        }
        return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
    }

    static void verifyStateSupported(DoFn<?, ?> fn) {
        DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
        for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
            if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) {
                throw new UnsupportedOperationException(String.format("%s does not currently support %s", DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName()));
            }
            if (!stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) continue;
            throw new UnsupportedOperationException(String.format("%s does not currently support %s", DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName()));
        }
    }

    static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) {
        if (!strategy.getWindowFn().isNonMerging()) {
            throw new UnsupportedOperationException(String.format("%s does not currently support state or timers with merging windows", DataflowRunner.class.getSimpleName()));
        }
    }

    @VisibleForTesting
    static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
    implements PTransformOverrideFactory<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> {
        static final int DEFAULT_NUM_SHARDS = 10;
        DataflowPipelineWorkerPoolOptions options;

        StreamingShardedWriteFactory(PipelineOptions options) {
            this.options = (DataflowPipelineWorkerPoolOptions)options.as(DataflowPipelineWorkerPoolOptions.class);
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>> getReplacementTransform(AppliedPTransform<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> transform) {
            int numShards = this.options.getMaxNumWorkers() > 0 ? this.options.getMaxNumWorkers() * 2 : (this.options.getNumWorkers() > 0 ? this.options.getNumWorkers() * 2 : 10);
            try {
                List<PCollectionView<?>> sideInputs = WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
                FileBasedSink sink = WriteFilesTranslation.getSink(transform);
                WriteFiles replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
                if (WriteFilesTranslation.isWindowedWrites(transform)) {
                    replacement = replacement.withWindowedWrites();
                }
                return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform)replacement.withNumShards(numShards));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
            return Collections.emptyMap();
        }
    }

    private class StreamingPubsubIOWriteOverrideFactory
    implements PTransformOverrideFactory<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> {
        private final DataflowRunner runner;

        private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
            this.runner = runner;
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<PubsubMessage>, PDone> getReplacementTransform(AppliedPTransform<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform)new StreamingPubsubIOWrite(this.runner, (PubsubUnboundedSink)transform.getTransform()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
            return Collections.emptyMap();
        }
    }

    private static class PrimitiveCombineGroupedValuesOverrideFactory<K, InputT, OutputT>
    implements PTransformOverrideFactory<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>, Combine.GroupedValues<K, InputT, OutputT>> {
        private PrimitiveCombineGroupedValuesOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>, Combine.GroupedValues<K, InputT, OutputT>> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), new CombineGroupedValues((Combine.GroupedValues)transform.getTransform(), PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollection<KV<K, OutputT>> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }
    }

    static class CombineGroupedValues<K, InputT, OutputT>
    extends PTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
        private final Combine.GroupedValues<K, InputT, OutputT> original;
        private final Coder<KV<K, OutputT>> outputCoder;

        CombineGroupedValues(Combine.GroupedValues<K, InputT, OutputT> original, Coder<KV<K, OutputT>> outputCoder) {
            this.original = original;
            this.outputCoder = outputCoder;
        }

        public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<InputT>>> input) {
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)input.getWindowingStrategy(), (PCollection.IsBounded)input.isBounded(), this.outputCoder);
        }

        public Combine.GroupedValues<K, InputT, OutputT> getOriginalCombine() {
            return this.original;
        }
    }

    @Internal
    public static class StreamingPCollectionViewWriterFn<T>
    extends DoFn<Iterable<T>, T> {
        private final PCollectionView<?> view;
        private final Coder<T> dataCoder;

        public static <T> StreamingPCollectionViewWriterFn<T> create(PCollectionView<?> view, Coder<T> dataCoder) {
            return new StreamingPCollectionViewWriterFn<T>(view, dataCoder);
        }

        private StreamingPCollectionViewWriterFn(PCollectionView<?> view, Coder<T> dataCoder) {
            this.view = view;
            this.dataCoder = dataCoder;
        }

        public PCollectionView<?> getView() {
            return this.view;
        }

        public Coder<T> getDataCoder() {
            return this.dataCoder;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, BoundedWindow w) throws Exception {
            throw new UnsupportedOperationException(String.format("%s is a marker class only and should never be executed.", ((Object)((Object)this)).getClass().getName()));
        }
    }

    private static class StreamingBoundedRead<T>
    extends PTransform<PBegin, PCollection<T>> {
        private final BoundedSource<T> source;

        public StreamingBoundedRead(Read.Bounded<T> transform) {
            this.source = transform.getSource();
        }

        public final PCollection<T> expand(PBegin input) {
            this.source.validate();
            return ((PCollection)Pipeline.applyTransform((PInput)input, new UnboundedReadFromBoundedSource<T>(this.source))).setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
        }
    }

    private static class StreamingBoundedReadOverrideFactory<T>
    implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Bounded<T>> {
        private StreamingBoundedReadOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of((PInput)transform.getPipeline().begin(), new StreamingBoundedRead((Read.Bounded)transform.getTransform()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }
    }

    private static class Deduplicate<T>
    extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
        private static final int NUM_RESHARD_KEYS = 10000;

        private Deduplicate() {
        }

        public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
            return (PCollection)((PCollection)((PCollection)input.apply((PTransform)WithKeys.of((SerializableFunction)new SerializableFunction<ValueWithRecordId<T>, Integer>(){

                public Integer apply(ValueWithRecordId<T> value) {
                    return Arrays.hashCode(value.getId()) % 10000;
                }
            }))).apply((PTransform)Reshuffle.of())).apply("StripIds", (PTransform)ParDo.of((DoFn)new DoFn<KV<Integer, ValueWithRecordId<T>>, T>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    c.output(((ValueWithRecordId)((KV)c.element()).getValue()).getValue());
                }
            }));
        }
    }

    private static class StreamingUnboundedRead<T>
    extends PTransform<PBegin, PCollection<T>> {
        private final UnboundedSource<T, ?> source;

        public StreamingUnboundedRead(Read.Unbounded<T> transform) {
            this.source = transform.getSource();
        }

        public final PCollection<T> expand(PBegin input) {
            this.source.validate();
            if (this.source.requiresDeduping()) {
                return (PCollection)((PCollection)Pipeline.applyTransform((PInput)input, new ReadWithIds(this.source))).apply(new Deduplicate());
            }
            return (PCollection)((PCollection)Pipeline.applyTransform((PInput)input, new ReadWithIds(this.source))).apply("StripIds", (PTransform)ParDo.of((DoFn)new ValueWithRecordId.StripIdsDoFn()));
        }

        public String getKindString() {
            return String.format("Read(%s)", NameUtils.approximateSimpleName(this.source));
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(ReadWithIds.class, new ReadWithIdsTranslator());
        }

        private static class ReadWithIdsTranslator
        implements TransformTranslator<ReadWithIds<?>> {
            private ReadWithIdsTranslator() {
            }

            @Override
            public void translate(ReadWithIds<?> transform, TransformTranslator.TranslationContext context) {
                ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
            }
        }

        private static class ReadWithIds<T>
        extends PTransform<PInput, PCollection<ValueWithRecordId<T>>> {
            private final UnboundedSource<T, ?> source;

            private ReadWithIds(UnboundedSource<T, ?> source) {
                this.source = source;
            }

            public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
                return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.UNBOUNDED, (Coder)ValueWithRecordId.ValueWithRecordIdCoder.of((Coder)this.source.getOutputCoder()));
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.delegate(this.source);
            }

            public UnboundedSource<T, ?> getSource() {
                return this.source;
            }
        }
    }

    private static class StreamingUnboundedReadOverrideFactory<T>
    implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Unbounded<T>> {
        private StreamingUnboundedReadOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, Read.Unbounded<T>> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of((PInput)transform.getPipeline().begin(), new StreamingUnboundedRead((Read.Unbounded)transform.getTransform()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }
    }

    private static class Impulse
    extends PTransform<PBegin, PCollection<byte[]>> {
        private final PCollection.IsBounded isBounded;

        private Impulse(PCollection.IsBounded isBounded) {
            this.isBounded = isBounded;
        }

        public PCollection<byte[]> expand(PBegin input) {
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)this.isBounded, (Coder)ByteArrayCoder.of());
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(Impulse.class, new Translator());
        }

        private static class Translator
        implements TransformTranslator<Impulse> {
            private Translator() {
            }

            @Override
            public void translate(Impulse transform, TransformTranslator.TranslationContext context) {
                if (!context.getPipelineOptions().isStreaming()) {
                    throw new UnsupportedOperationException("Impulse source for batch pipelines has not been defined.");
                }
                TransformTranslator.StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
                stepContext.addInput("format", "pubsub");
                stepContext.addInput("pubsub_subscription", "_starting_signal/");
                stepContext.addOutput(context.getOutput(transform));
            }
        }
    }

    private static class StreamingFnApiCreate<T>
    extends PTransform<PBegin, PCollection<T>> {
        private final Create.Values<T> transform;
        private final PCollection<T> originalOutput;

        private StreamingFnApiCreate(Create.Values<T> transform, PCollection<T> originalOutput) {
            this.transform = transform;
            this.originalOutput = originalOutput;
        }

        public final PCollection<T> expand(PBegin input) {
            try {
                PCollection pc = (PCollection)((PCollection)Pipeline.applyTransform((PInput)input, (PTransform)new Impulse(PCollection.IsBounded.BOUNDED))).apply((PTransform)ParDo.of(DecodeAndEmitDoFn.fromIterable(this.transform.getElements(), this.originalOutput.getCoder())));
                pc.setCoder(this.originalOutput.getCoder());
                return pc;
            }
            catch (IOException e) {
                throw new IllegalStateException("Unable to encode elements.", e);
            }
        }

        private static class DecodeAndEmitDoFn<T>
        extends DoFn<byte[], T> {
            private final Collection<byte[]> elements;
            private final RunnerApi.MessageWithComponents coderSpec;
            private transient Coder<T> coder;

            public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder) throws IOException {
                ImmutableList.Builder allElementsBytes = ImmutableList.builder();
                for (T element : elements) {
                    byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
                    allElementsBytes.add((Object)bytes);
                }
                return new DecodeAndEmitDoFn<T>(allElementsBytes.build(), elemCoder);
            }

            private Coder<T> getCoder() throws IOException {
                if (this.coder == null) {
                    this.coder = CoderTranslation.fromProto(this.coderSpec.getCoder(), RehydratedComponents.forComponents(this.coderSpec.getComponents()));
                }
                return this.coder;
            }

            private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException {
                this.elements = elements;
                this.coderSpec = CoderTranslation.toProto(coder);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws IOException {
                for (byte[] element : this.elements) {
                    context.output(CoderUtils.decodeFromByteArray(this.getCoder(), (byte[])element));
                }
            }
        }
    }

    private static class StreamingFnApiCreateOverrideFactory<T>
    implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {
        private StreamingFnApiCreateOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) {
            Create.Values original = (Create.Values)transform.getTransform();
            PCollection output = (PCollection)Iterables.getOnlyElement(transform.getOutputs().values());
            return PTransformOverrideFactory.PTransformReplacement.of((PInput)transform.getPipeline().begin(), new StreamingFnApiCreate(original, output));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }
    }

    private static class StreamingPubsubIOWriteTranslator
    implements TransformTranslator<StreamingPubsubIOWrite> {
        private StreamingPubsubIOWriteTranslator() {
        }

        @Override
        public void translate(StreamingPubsubIOWrite transform, TransformTranslator.TranslationContext context) {
            Preconditions.checkArgument(context.getPipelineOptions().isStreaming(), "StreamingPubsubIOWrite is only for streaming pipelines.");
            PubsubUnboundedSink overriddenTransform = transform.getOverriddenTransform();
            TransformTranslator.StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
            stepContext.addInput("format", "pubsub");
            if (overriddenTransform.getTopicProvider().isAccessible()) {
                stepContext.addInput("pubsub_topic", overriddenTransform.getTopic().getV1Beta1Path());
            } else {
                stepContext.addInput("pubsub_topic_runtime_override", ((ValueProvider.NestedValueProvider)overriddenTransform.getTopicProvider()).propertyName());
            }
            if (overriddenTransform.getTimestampAttribute() != null) {
                stepContext.addInput("pubsub_timestamp_label", overriddenTransform.getTimestampAttribute());
            }
            if (overriddenTransform.getIdAttribute() != null) {
                stepContext.addInput("pubsub_id_label", overriddenTransform.getIdAttribute());
            }
            stepContext.addInput("pubsub_serialized_attributes_fn", StringUtils.byteArrayToJsonString((byte[])SerializableUtils.serializeToByteArray((Serializable)((Object)new IdentityMessageFn()))));
            stepContext.addEncodingInput((Coder<?>)WindowedValue.getValueOnlyCoder((Coder)VoidCoder.of()));
            stepContext.addInput("parallel_input", (PInput)context.getInput(transform));
        }
    }

    private static class StreamingPubsubIOWrite
    extends PTransform<PCollection<PubsubMessage>, PDone> {
        private final PubsubUnboundedSink transform;

        public StreamingPubsubIOWrite(DataflowRunner runner, PubsubUnboundedSink transform) {
            this.transform = transform;
        }

        PubsubUnboundedSink getOverriddenTransform() {
            return this.transform;
        }

        public PDone expand(PCollection<PubsubMessage> input) {
            return PDone.in((Pipeline)input.getPipeline());
        }

        protected String getKindString() {
            return "StreamingPubsubIOWrite";
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator());
        }
    }

    private static class IdentityMessageFn
    extends SimpleFunction<PubsubMessage, PubsubMessage> {
        private IdentityMessageFn() {
        }

        public PubsubMessage apply(PubsubMessage input) {
            return input;
        }
    }

    private static class StreamingPubsubIOReadTranslator
    implements TransformTranslator<StreamingPubsubIORead> {
        private StreamingPubsubIOReadTranslator() {
        }

        @Override
        public void translate(StreamingPubsubIORead transform, TransformTranslator.TranslationContext context) {
            Preconditions.checkArgument(context.getPipelineOptions().isStreaming(), "StreamingPubsubIORead is only for streaming pipelines.");
            PubsubUnboundedSource overriddenTransform = transform.getOverriddenTransform();
            TransformTranslator.StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
            stepContext.addInput("format", "pubsub");
            if (overriddenTransform.getTopicProvider() != null) {
                if (overriddenTransform.getTopicProvider().isAccessible()) {
                    stepContext.addInput("pubsub_topic", overriddenTransform.getTopic().getV1Beta1Path());
                } else {
                    stepContext.addInput("pubsub_topic_runtime_override", ((ValueProvider.NestedValueProvider)overriddenTransform.getTopicProvider()).propertyName());
                }
            }
            if (overriddenTransform.getSubscriptionProvider() != null) {
                if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
                    stepContext.addInput("pubsub_subscription", overriddenTransform.getSubscription().getV1Beta1Path());
                } else {
                    stepContext.addInput("pubsub_subscription_runtime_override", ((ValueProvider.NestedValueProvider)overriddenTransform.getSubscriptionProvider()).propertyName());
                }
            }
            if (overriddenTransform.getTimestampAttribute() != null) {
                stepContext.addInput("pubsub_timestamp_label", overriddenTransform.getTimestampAttribute());
            }
            if (overriddenTransform.getIdAttribute() != null) {
                stepContext.addInput("pubsub_id_label", overriddenTransform.getIdAttribute());
            }
            if (overriddenTransform.getNeedsAttributes()) {
                stepContext.addInput("pubsub_serialized_attributes_fn", StringUtils.byteArrayToJsonString((byte[])SerializableUtils.serializeToByteArray((Serializable)((Object)new IdentityMessageFn()))));
            }
            stepContext.addOutput(context.getOutput(transform));
        }
    }

    private static class StreamingPubsubIORead
    extends PTransform<PBegin, PCollection<PubsubMessage>> {
        private final PubsubUnboundedSource transform;

        public StreamingPubsubIORead(PubsubUnboundedSource transform) {
            this.transform = transform;
        }

        PubsubUnboundedSource getOverriddenTransform() {
            return this.transform;
        }

        public PCollection<PubsubMessage> expand(PBegin input) {
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.UNBOUNDED, (Coder)new PubsubMessageWithAttributesCoder());
        }

        protected String getKindString() {
            return "StreamingPubsubIORead";
        }

        static {
            DataflowPipelineTranslator.registerTransformTranslator(StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
        }
    }

    private static class StreamingPubsubIOReadOverrideFactory
    implements PTransformOverrideFactory<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
        private StreamingPubsubIOReadOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of((PInput)transform.getPipeline().begin(), (PTransform)new StreamingPubsubIORead((PubsubUnboundedSource)transform.getTransform()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollection<PubsubMessage> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }
    }

    private static class ReflectiveOneToOneOverrideFactory<InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
    extends SingleInputOutputOverrideFactory<PCollection<InputT>, PCollection<OutputT>, TransformT> {
        private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
        private final DataflowRunner runner;

        private ReflectiveOneToOneOverrideFactory(Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement, DataflowRunner runner) {
            this.replacement = replacement;
            this.runner = runner;
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
            PTransform rep = (PTransform)InstanceBuilder.ofType(this.replacement).withArg(DataflowRunner.class, (Object)this.runner).withArg(transform.getTransform().getClass(), (Object)transform.getTransform()).build();
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform)rep);
        }
    }
}

