/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.common.ElementTreePathBuilder;
import io.camunda.zeebe.engine.processing.job.JobBatchCollector;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.immutable.ProcessingState;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
import io.camunda.zeebe.protocol.record.value.JobKind;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
import io.camunda.zeebe.util.ByteValue;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.Collections;
import java.util.Map;
import org.agrona.DirectBuffer;

public final class JobBatchActivateProcessor
implements TypedRecordProcessor<JobBatchRecord> {
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final TypedResponseWriter responseWriter;
    private final JobBatchCollector jobBatchCollector;
    private final KeyGenerator keyGenerator;
    private final JobMetrics jobMetrics;
    private final ElementInstanceState elementInstanceState;
    private final ProcessState processState;

    public JobBatchActivateProcessor(Writers writers, ProcessingState state, KeyGenerator keyGenerator, JobMetrics jobMetrics) {
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.responseWriter = writers.response();
        this.jobBatchCollector = new JobBatchCollector(state.getJobState(), state.getVariableState(), this.stateWriter::canWriteEventOfLength);
        this.keyGenerator = keyGenerator;
        this.jobMetrics = jobMetrics;
        this.elementInstanceState = state.getElementInstanceState();
        this.processState = state.getProcessState();
    }

    @Override
    public void processRecord(TypedRecord<JobBatchRecord> record) {
        JobBatchRecord value = (JobBatchRecord)record.getValue();
        if (this.isValid(value)) {
            this.activateJobs(record);
        } else {
            this.rejectCommand(record);
        }
    }

    private boolean isValid(JobBatchRecord record) {
        return record.getMaxJobsToActivate() > 0 && record.getTimeout() > 0L && record.getTypeBuffer().capacity() > 0;
    }

    private void activateJobs(TypedRecord<JobBatchRecord> record) {
        JobBatchRecord value = (JobBatchRecord)record.getValue();
        long jobBatchKey = this.keyGenerator.nextKey();
        Either<JobBatchCollector.TooLargeJob, Map<JobKind, Integer>> result = this.jobBatchCollector.collectJobs(record);
        Map activatedJobCountPerJobKind = (Map)result.getOrElse(Collections.emptyMap());
        result.ifLeft(largeJob -> this.raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.jobRecord(), largeJob.expectedEventLength()));
        this.activateJobBatch(record, value, jobBatchKey, activatedJobCountPerJobKind);
    }

    private void rejectCommand(TypedRecord<JobBatchRecord> record) {
        String rejectionReason;
        RejectionType rejectionType;
        JobBatchRecord value = (JobBatchRecord)record.getValue();
        String format = "Expected to activate job batch with %s to be %s, but it was %s";
        if (value.getMaxJobsToActivate() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            rejectionReason = String.format("Expected to activate job batch with %s to be %s, but it was %s", "max jobs to activate", "greater than zero", String.format("'%d'", value.getMaxJobsToActivate()));
        } else if (value.getTimeout() < 1L) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            rejectionReason = String.format("Expected to activate job batch with %s to be %s, but it was %s", "timeout", "greater than zero", String.format("'%d'", value.getTimeout()));
        } else if (value.getTypeBuffer().capacity() < 1) {
            rejectionType = RejectionType.INVALID_ARGUMENT;
            rejectionReason = String.format("Expected to activate job batch with %s to be %s, but it was %s", "type", "present", "blank");
        } else {
            throw new IllegalStateException("Expected to reject an invalid activate job batch command, but it appears to be valid");
        }
        this.rejectionWriter.appendRejection(record, rejectionType, rejectionReason);
        this.responseWriter.writeRejectionOnCommand(record, rejectionType, rejectionReason);
    }

    private void activateJobBatch(TypedRecord<JobBatchRecord> record, JobBatchRecord value, long jobBatchKey, Map<JobKind, Integer> activatedJobsCountPerJobKind) {
        this.stateWriter.appendFollowUpEvent(jobBatchKey, (Intent)JobBatchIntent.ACTIVATED, (RecordValue)value);
        this.responseWriter.writeEventOnCommand(jobBatchKey, (Intent)JobBatchIntent.ACTIVATED, (UnpackedObject)value, record);
        activatedJobsCountPerJobKind.forEach((jobKind, count) -> this.jobMetrics.jobActivated(value.getType(), (JobKind)jobKind, (int)count));
    }

    private void raiseIncidentJobTooLargeForMessageSize(long jobKey, JobRecord job, int expectedJobRecordSize) {
        String jobSize = ByteValue.prettyPrint((long)expectedJobRecordSize);
        DirectBuffer incidentMessage = BufferUtil.wrapString((String)String.format("The job with key '%s' can not be activated, because with %s it is larger than the configured message size (per default is 4 MB). Try to reduce the size by reducing the number of fetched variables or modifying the variable values.", jobKey, jobSize));
        ElementTreePathBuilder.ElementTreePathProperties treePathProperties = new ElementTreePathBuilder().withElementInstanceState(this.elementInstanceState).withProcessState(this.processState).withElementInstanceKey(job.getElementInstanceKey()).build();
        IncidentRecord incidentEvent = new IncidentRecord().setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED).setErrorMessage(incidentMessage).setBpmnProcessId(job.getBpmnProcessIdBuffer()).setProcessDefinitionKey(job.getProcessDefinitionKey()).setProcessInstanceKey(job.getProcessInstanceKey()).setElementId(job.getElementIdBuffer()).setElementInstanceKey(job.getElementInstanceKey()).setJobKey(jobKey).setTenantId(job.getTenantId()).setVariableScopeKey(job.getElementInstanceKey()).setElementInstancePath(treePathProperties.elementInstancePath()).setProcessDefinitionPath(treePathProperties.processDefinitionPath()).setCallingElementPath(treePathProperties.callingElementPath());
        this.stateWriter.appendFollowUpEvent(this.keyGenerator.nextKey(), (Intent)IncidentIntent.CREATED, (RecordValue)incidentEvent);
    }
}

