/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable.artifact;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.runners.direct.portable.artifact.LocalArtifactStagingLocation;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalFileSystemArtifactStagerService
extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase
implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemArtifactStagerService.class);
    private final LocalArtifactStagingLocation location;

    public static LocalFileSystemArtifactStagerService forRootDirectory(File base) {
        return new LocalFileSystemArtifactStagerService(base);
    }

    private LocalFileSystemArtifactStagerService(File stagingBase) {
        this.location = LocalArtifactStagingLocation.createAt(stagingBase);
    }

    public StreamObserver<ArtifactApi.PutArtifactRequest> putArtifact(StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver) {
        return new CreateAndWriteFileObserver(responseObserver);
    }

    public void commitManifest(ArtifactApi.CommitManifestRequest request, StreamObserver<ArtifactApi.CommitManifestResponse> responseObserver) {
        try {
            this.commitManifestOrThrow(request, responseObserver);
        }
        catch (StatusRuntimeException e) {
            responseObserver.onError((Throwable)e);
            LOG.error("Failed to commit Manifest {}", (Object)request.getManifest(), (Object)e);
        }
        catch (Exception e) {
            responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).withDescription(Throwables.getStackTraceAsString((Throwable)e)).asRuntimeException());
            LOG.error("Failed to commit Manifest {}", (Object)request.getManifest(), (Object)e);
        }
    }

    private void commitManifestOrThrow(ArtifactApi.CommitManifestRequest request, StreamObserver<ArtifactApi.CommitManifestResponse> responseObserver) throws IOException {
        ArrayList<ArtifactApi.ArtifactMetadata> missing = new ArrayList<ArtifactApi.ArtifactMetadata>();
        for (ArtifactApi.ArtifactMetadata artifact : request.getManifest().getArtifactList()) {
            if (this.location.getArtifactFile(artifact.getName()).exists()) continue;
            missing.add(artifact);
        }
        if (!missing.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription(String.format("Attempted to commit manifest with missing Artifacts: [%s]", missing)).asRuntimeException();
        }
        File mf = this.location.getManifestFile();
        Preconditions.checkState((boolean)mf.createNewFile(), (Object)"Could not create file to store manifest");
        try (FileOutputStream mfOut = new FileOutputStream(mf);){
            request.getManifest().writeTo((OutputStream)mfOut);
        }
        responseObserver.onNext((Object)ArtifactApi.CommitManifestResponse.newBuilder().setRetrievalToken(this.location.getRootPath()).build());
        responseObserver.onCompleted();
    }

    @Override
    public void close() throws Exception {
    }

    @VisibleForTesting
    LocalArtifactStagingLocation getLocation() {
        return this.location;
    }

    private static class FileWritingObserver
    implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final File destination;
        private final OutputStream target;
        private final StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver;

        private FileWritingObserver(File destination, OutputStream target, StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver) {
            this.destination = destination;
            this.target = target;
            this.responseObserver = responseObserver;
        }

        public void onNext(ArtifactApi.PutArtifactRequest value) {
            try {
                if (value.getData() == null) {
                    StatusRuntimeException e = Status.INVALID_ARGUMENT.withDescription(String.format("Expected all chunks in the current stream state to contain data, got %s", value.getContentCase())).asRuntimeException();
                    throw e;
                }
                value.getData().getData().writeTo(this.target);
            }
            catch (Exception e) {
                this.cleanedUp(e);
            }
        }

        public void onError(Throwable t) {
            if (this.cleanedUp(null)) {
                this.responseObserver.onCompleted();
            }
        }

        public void onCompleted() {
            try {
                this.target.close();
            }
            catch (IOException e) {
                LOG.error("Failed to complete writing file {}", (Object)this.destination, (Object)e);
                this.cleanedUp(e);
                return;
            }
            this.responseObserver.onNext((Object)ArtifactApi.PutArtifactResponse.getDefaultInstance());
            this.responseObserver.onCompleted();
        }

        private boolean cleanedUp(@Nullable Throwable whyFailed) {
            Throwable actual = whyFailed;
            try {
                this.target.close();
                if (!this.destination.delete()) {
                    LOG.debug("Couldn't delete failed write at {}", (Object)this.destination);
                }
            }
            catch (IOException e) {
                if (whyFailed == null) {
                    actual = e;
                } else {
                    actual.addSuppressed(e);
                }
                LOG.error("Failed to clean up after writing file {}", (Object)this.destination, (Object)e);
            }
            if (actual != null) {
                if (actual instanceof StatusException || actual instanceof StatusRuntimeException) {
                    this.responseObserver.onError(actual);
                } else {
                    Status status = Status.INTERNAL.withCause(actual).withDescription(Throwables.getStackTraceAsString((Throwable)actual));
                    this.responseObserver.onError((Throwable)status.asException());
                }
            }
            return actual == null;
        }
    }

    private class CreateAndWriteFileObserver
    implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver;
        private FileWritingObserver writer;

        private CreateAndWriteFileObserver(StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver) {
            this.responseObserver = responseObserver;
        }

        public void onNext(ArtifactApi.PutArtifactRequest value) {
            try {
                if (this.writer == null) {
                    if (!value.getContentCase().equals((Object)ArtifactApi.PutArtifactRequest.ContentCase.METADATA)) {
                        throw Status.INVALID_ARGUMENT.withDescription(String.format("Expected the first %s to contain the Artifact Name, got %s", ArtifactApi.PutArtifactRequest.class.getSimpleName(), value.getContentCase())).asRuntimeException();
                    }
                    this.writer = this.createFile(value.getMetadata().getMetadata());
                } else {
                    this.writer.onNext(value);
                }
            }
            catch (StatusRuntimeException e) {
                this.responseObserver.onError((Throwable)e);
            }
            catch (Exception e) {
                this.responseObserver.onError((Throwable)Status.INTERNAL.withCause((Throwable)e).withDescription(Throwables.getStackTraceAsString((Throwable)e)).asRuntimeException());
            }
        }

        private FileWritingObserver createFile(ArtifactApi.ArtifactMetadata metadata) throws IOException {
            File destination = LocalFileSystemArtifactStagerService.this.location.getArtifactFile(metadata.getName());
            if (!destination.createNewFile()) {
                throw Status.ALREADY_EXISTS.withDescription(String.format("Artifact with name %s already exists", metadata)).asRuntimeException();
            }
            return new FileWritingObserver(destination, new FileOutputStream(destination), this.responseObserver);
        }

        public void onError(Throwable t) {
            if (this.writer != null) {
                this.writer.onError(t);
            } else {
                this.responseObserver.onCompleted();
            }
        }

        public void onCompleted() {
            if (this.writer != null) {
                this.writer.onCompleted();
            } else {
                this.responseObserver.onCompleted();
            }
        }
    }
}

