/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.hub.flow.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.Batcher;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.FilteredForestConfiguration;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.impl.JobTicketImpl;
import com.marklogic.client.extensions.ResourceManager;
import com.marklogic.client.extensions.ResourceServices;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.io.marker.AbstractReadHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.util.RequestParameters;
import com.marklogic.hub.DatabaseKind;
import com.marklogic.hub.HubConfig;
import com.marklogic.hub.collector.Collector;
import com.marklogic.hub.collector.DiskQueue;
import com.marklogic.hub.flow.CodeFormat;
import com.marklogic.hub.flow.Flow;
import com.marklogic.hub.flow.FlowFinishedListener;
import com.marklogic.hub.flow.FlowItemCompleteListener;
import com.marklogic.hub.flow.FlowItemFailureListener;
import com.marklogic.hub.flow.FlowRunner;
import com.marklogic.hub.flow.FlowStatusListener;
import com.marklogic.hub.flow.RunFlowResponse;
import com.marklogic.hub.job.Job;
import com.marklogic.hub.job.JobManager;
import com.marklogic.hub.job.JobStatus;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class FlowRunnerImpl
implements FlowRunner {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final int DEFAULT_THREAD_COUNT = 4;
    private static final int MAX_ERROR_MESSAGES = 10;
    private Flow flow;
    private int batchSize = 100;
    private int threadCount = 4;
    private DatabaseClient sourceClient;
    private String destinationDatabase;
    private Map<String, Object> options;
    private int previousPercentComplete;
    private boolean stopOnFailure = false;
    private List<FlowItemCompleteListener> flowItemCompleteListeners = new ArrayList<FlowItemCompleteListener>();
    private List<FlowItemFailureListener> flowItemFailureListeners = new ArrayList<FlowItemFailureListener>();
    private List<FlowStatusListener> flowStatusListeners = new ArrayList<FlowStatusListener>();
    private List<FlowFinishedListener> flowFinishedListeners = new ArrayList<FlowFinishedListener>();
    private HubConfig hubConfig;
    private Thread runningThread = null;

    public FlowRunnerImpl(HubConfig hubConfig) {
        this.hubConfig = hubConfig;
        this.sourceClient = hubConfig.newStagingClient();
        this.destinationDatabase = hubConfig.getDbName(DatabaseKind.FINAL);
    }

    @Override
    public FlowRunner withFlow(Flow flow) {
        this.flow = flow;
        return this;
    }

    @Override
    public FlowRunner withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    @Override
    public FlowRunner withThreadCount(int threadCount) {
        this.threadCount = threadCount;
        return this;
    }

    @Override
    public FlowRunner withSourceClient(DatabaseClient sourceClient) {
        this.sourceClient = sourceClient;
        return this;
    }

    @Override
    public FlowRunner withDestinationDatabase(String destinationDatabase) {
        this.destinationDatabase = destinationDatabase;
        return this;
    }

    @Override
    public FlowRunner withStopOnFailure(boolean stopOnFailure) {
        this.stopOnFailure = stopOnFailure;
        return this;
    }

    @Override
    public FlowRunner withOptions(Map<String, Object> options) {
        this.options = options;
        return this;
    }

    @Override
    public FlowRunner onItemComplete(FlowItemCompleteListener listener) {
        this.flowItemCompleteListeners.add(listener);
        return this;
    }

    @Override
    public FlowRunner onItemFailed(FlowItemFailureListener listener) {
        this.flowItemFailureListeners.add(listener);
        return this;
    }

    @Override
    public FlowRunner onStatusChanged(FlowStatusListener listener) {
        this.flowStatusListeners.add(listener);
        return this;
    }

    @Override
    public FlowRunner onFinished(FlowFinishedListener listener) {
        this.flowFinishedListeners.add(listener);
        return this;
    }

    @Override
    public void awaitCompletion() {
        try {
            this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.runningThread != null) {
            this.runningThread.join(unit.convert(timeout, TimeUnit.MILLISECONDS));
        }
    }

    @Override
    public JobTicket run() {
        DiskQueue<String> uris;
        String jobId = UUID.randomUUID().toString();
        JobManager jobManager = JobManager.create(this.hubConfig.newJobDbClient(), this.hubConfig.newTraceDbClient());
        Job job = Job.withFlow(this.flow).withJobId(jobId);
        jobManager.saveJob(job);
        Collector c = this.flow.getCollector();
        c.setHubConfig(this.hubConfig);
        c.setClient(this.sourceClient);
        AtomicLong successfulEvents = new AtomicLong(0L);
        AtomicLong failedEvents = new AtomicLong(0L);
        AtomicLong successfulBatches = new AtomicLong(0L);
        AtomicLong failedBatches = new AtomicLong(0L);
        if (this.options == null) {
            this.options = new HashMap<String, Object>();
        }
        this.options.put("entity", this.flow.getEntityName());
        this.options.put("flow", this.flow.getName());
        this.options.put("flowType", this.flow.getType().toString());
        this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, 0, "running collector"));
        jobManager.saveJob(job.withStatus(JobStatus.RUNNING_COLLECTOR));
        try {
            uris = c.run(jobId, this.flow.getEntityName(), this.flow.getName(), this.threadCount, this.options);
        }
        catch (Exception e) {
            job.setCounts(0L, 0L, 0L, 0L).withStatus(JobStatus.FAILED).withEndTime(new Date());
            StringWriter errors = new StringWriter();
            e.printStackTrace(new PrintWriter(errors));
            job.withJobOutput(errors.toString());
            jobManager.saveJob(job);
            return new JobTicketImpl(jobId, JobTicket.JobType.QUERY_BATCHER);
        }
        this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, 0, "starting harmonization"));
        Vector errorMessages = new Vector();
        DataMovementManager dataMovementManager = this.sourceClient.newDataMovementManager();
        double batchCount = Math.ceil((double)uris.size() / (double)this.batchSize);
        HashMap<String, JobTicket> ticketWrapper = new HashMap<String, JobTicket>();
        QueryBatcher tempQueryBatcher = dataMovementManager.newQueryBatcher(uris.iterator()).withBatchSize(this.batchSize).withThreadCount(this.threadCount).withJobId(jobId).onUrisReady(batch -> {
            block9: {
                try {
                    JobTicket jobTicket;
                    FlowResource flowRunner = new FlowResource(batch.getClient(), this.destinationDatabase, this.flow);
                    RunFlowResponse response = flowRunner.run(jobId, (String[])batch.getItems(), this.options);
                    failedEvents.addAndGet(response.errorCount);
                    successfulEvents.addAndGet(response.totalCount - response.errorCount);
                    if (response.errors != null && errorMessages.size() < 10) {
                        errorMessages.addAll(response.errors.stream().map(jsonNode -> this.jsonToString((JsonNode)jsonNode)).collect(Collectors.toList()));
                    }
                    if (response.errorCount < response.totalCount) {
                        successfulBatches.addAndGet(1L);
                    } else {
                        failedBatches.addAndGet(1L);
                    }
                    int percentComplete = (int)((double)successfulBatches.get() / batchCount * 100.0);
                    if (percentComplete != this.previousPercentComplete && percentComplete % 5 == 0) {
                        this.previousPercentComplete = percentComplete;
                        this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, percentComplete, ""));
                    }
                    if (this.flowItemCompleteListeners.size() > 0) {
                        response.completedItems.forEach(item -> this.flowItemCompleteListeners.forEach(listener -> listener.processCompletion(jobId, (String)item)));
                    }
                    if (this.flowItemFailureListeners.size() > 0) {
                        response.failedItems.forEach(item -> this.flowItemFailureListeners.forEach(listener -> listener.processFailure(jobId, (String)item)));
                    }
                    if (this.stopOnFailure && response.errorCount > 0L && (jobTicket = (JobTicket)ticketWrapper.get("jobTicket")) != null) {
                        dataMovementManager.stopJob(jobTicket);
                    }
                }
                catch (Exception e) {
                    if (errorMessages.size() >= 10) break block9;
                    errorMessages.add(e.toString());
                }
            }
        }).onQueryFailure(failure -> {
            failedBatches.addAndGet(1L);
            failedEvents.addAndGet(this.batchSize);
        });
        if (this.hubConfig.getLoadBalancerHosts() != null && this.hubConfig.getLoadBalancerHosts().length > 0) {
            tempQueryBatcher = tempQueryBatcher.withForestConfig((ForestConfiguration)new FilteredForestConfiguration(dataMovementManager.readForestConfig()).withWhiteList(this.hubConfig.getLoadBalancerHosts()));
        }
        QueryBatcher queryBatcher = tempQueryBatcher;
        JobTicket jobTicket = dataMovementManager.startJob(queryBatcher);
        ticketWrapper.put("jobTicket", jobTicket);
        jobManager.saveJob(job.withStatus(JobStatus.RUNNING_HARMONIZE));
        this.runningThread = new Thread(() -> {
            queryBatcher.awaitCompletion();
            this.flowStatusListeners.forEach(listener -> listener.onStatusChange(jobId, 100, ""));
            this.flowFinishedListeners.forEach(FlowFinishedListener::onFlowFinished);
            dataMovementManager.stopJob((Batcher)queryBatcher);
            JobStatus status = failedEvents.get() > 0L && this.stopOnFailure ? JobStatus.STOP_ON_ERROR : (failedEvents.get() + successfulEvents.get() != (long)uris.size() ? JobStatus.CANCELED : (failedEvents.get() > 0L && successfulEvents.get() > 0L ? JobStatus.FINISHED_WITH_ERRORS : (failedEvents.get() == 0L && successfulEvents.get() > 0L ? JobStatus.FINISHED : JobStatus.FAILED)));
            job.setCounts(successfulEvents.get(), failedEvents.get(), successfulBatches.get(), failedBatches.get()).withStatus(status).withEndTime(new Date());
            if (errorMessages.size() > 0) {
                job.withJobOutput(errorMessages);
            }
            jobManager.saveJob(job);
        });
        this.runningThread.start();
        return jobTicket;
    }

    private String jsonToString(JsonNode node) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString((Object)node);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    class FlowResource
    extends ResourceManager {
        private DatabaseClient srcClient;
        private String targetDatabase;
        private Flow flow;

        public FlowResource(DatabaseClient srcClient, String targetDatabase, Flow flow) {
            this.flow = flow;
            this.srcClient = srcClient;
            this.targetDatabase = targetDatabase;
            this.srcClient.init(flow.getCodeFormat().equals((Object)CodeFormat.JAVASCRIPT) ? "ml:sjsFlow" : "ml:flow", (ResourceManager)this);
        }

        public RunFlowResponse run(String jobId, String[] items) {
            return this.run(jobId, items, null);
        }

        public RunFlowResponse run(String jobId, String[] items, Map<String, Object> options) {
            RunFlowResponse resp;
            try {
                ResourceServices.ServiceResultIterator resultItr;
                RequestParameters params = new RequestParameters();
                params.add("entity-name", this.flow.getEntityName());
                params.add("flow-name", this.flow.getName());
                params.put("job-id", jobId);
                params.put("identifiers", items);
                params.put("target-database", this.targetDatabase);
                if (options != null) {
                    ObjectMapper objectMapper = new ObjectMapper();
                    params.put("options", objectMapper.writeValueAsString(options));
                }
                if ((resultItr = this.getServices().post(params, (AbstractWriteHandle)new StringHandle("{}").withFormat(Format.JSON), new String[0])) == null || !resultItr.hasNext()) {
                    resp = new RunFlowResponse();
                } else {
                    ResourceServices.ServiceResult res = (ResourceServices.ServiceResult)resultItr.next();
                    StringHandle handle = new StringHandle();
                    ObjectMapper objectMapper = new ObjectMapper();
                    resp = (RunFlowResponse)objectMapper.readValue(((StringHandle)res.getContent((AbstractReadHandle)handle)).get(), RunFlowResponse.class);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
            return resp;
        }
    }
}

