/*
 * Decompiled with CFR 0.152.
 */
package com.ksyun.kmr.hadoop.fs.ks3.parallel.conveyor;

import com.ksyun.kmr.hadoop.fs.ks3.Ks3FileSystemStore;
import com.ksyun.kmr.hadoop.fs.ks3.Utils;
import com.ksyun.kmr.hadoop.fs.ks3.committer.PendingCommit;
import com.ksyun.kmr.hadoop.fs.ks3.committer.PendingCommitList;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.ActionSource;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.Conveyor;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.EngineShutter;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.MultiActionEngine;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shadedforhadoopks3.com.google.common.util.concurrent.RateLimiter;

public class CommitAction
extends Conveyor
implements ActionSource {
    private static final Logger LOG = LoggerFactory.getLogger(CommitAction.class);
    Ks3FileSystemStore store;
    MultiActionEngine engineForCommit;
    MultiActionEngine engine;
    public Sink sink;

    public CommitAction(Ks3FileSystemStore store) {
        this(store, new AtomicReference<Exception>());
    }

    public CommitAction(Ks3FileSystemStore store, AtomicReference<Exception> exceptionAtomicReference) {
        super(exceptionAtomicReference);
        this.store = store;
    }

    @Override
    public void startEnginesWithoutCheckStarted() {
        MultiActionEngine engineForCommit = new MultiActionEngine("parallel commit", this.store.parallel_commit_pool_size, this.store.parallel_commit_thread_size, this.store.parallel_commit_speed_limit, (AtomicReference<Exception>)this.exceptionAtomicReference, (recvData, e) -> {
            RateLimiter rateLimiter = e.getRateLimiter();
            PendingCommit commit = (PendingCommit)recvData.getValue("data");
            if (commit.getKs3PartETags().size() == 0) {
                Utils.recordCostTime(LOG, "createEmpty", () -> this.store.createEmptyObject(commit.key, rateLimiter));
            } else {
                Utils.recordCostTime(LOG, "multicommit", () -> this.store.completeMultipartUpload(commit.key, commit.uploadId, commit.getKs3PartETags(), rateLimiter));
            }
        });
        MultiActionEngine engine = new MultiActionEngine("parallel unserialize", this.store.parallel_unser_pool_size, this.store.parallel_unser_thread_size, this.store.parallel_unser_speed_limit, (AtomicReference<Exception>)this.exceptionAtomicReference, (recvData, e) -> {
            RateLimiter rateLimiter = e.getRateLimiter();
            Path commitPath = (Path)recvData.getValue("data");
            String key = this.store.pathToKey(commitPath);
            PendingCommitList commitList = PendingCommitList.load(this.store, key, rateLimiter);
            for (PendingCommit commit : commitList.data) {
                if (!engineForCommit.sendData(Collections.singletonMap("data", commit))) break;
            }
            if (this.sink != null) {
                this.sink.callback(key);
            }
        });
        this.engineForCommit = engineForCommit;
        this.engine = engine;
    }

    @Override
    public void shutdown() {
        EngineShutter.shutdownAll(this.engine, this.engineForCommit);
    }

    @Override
    public MultiActionEngine source() {
        return this.engine;
    }

    public static interface Sink {
        public void callback(String var1);
    }
}

