/*
 * Decompiled with CFR 0.152.
 */
package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.BaseDataSink;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.Logs;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.slf4j.Logger;

class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
extends BaseDataSink<PT, IT, OT> {
    private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
    private final BatchJobSender myBatchJobSender;
    private final IJobPersistence myJobPersistence;
    private final String myJobDefinitionId;
    private final int myJobDefinitionVersion;
    private final JobDefinitionStep<PT, OT, ?> myTargetStep;
    private final AtomicInteger myChunkCounter = new AtomicInteger(0);
    private final AtomicReference<String> myLastChunkId = new AtomicReference();
    private final boolean myGatedExecution;

    JobDataSink(@Nonnull BatchJobSender theBatchJobSender, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinition<?> theDefinition, @Nonnull String theInstanceId, @Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
        super(theInstanceId, theJobWorkCursor);
        this.myBatchJobSender = theBatchJobSender;
        this.myJobPersistence = theJobPersistence;
        this.myJobDefinitionId = theDefinition.getJobDefinitionId();
        this.myJobDefinitionVersion = theDefinition.getJobDefinitionVersion();
        this.myTargetStep = theJobWorkCursor.nextStep;
        this.myGatedExecution = theDefinition.isGatedExecution();
    }

    @Override
    public void accept(WorkChunkData<OT> theData) {
        String instanceId = this.getInstanceId();
        String targetStepId = this.myTargetStep.getStepId();
        int sequence = this.myChunkCounter.getAndIncrement();
        OT dataValue = theData.getData();
        String dataValueString = JsonUtil.serialize(dataValue, (boolean)false);
        WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(this.myJobDefinitionId, this.myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
        String chunkId = this.myJobPersistence.onWorkChunkCreate(batchWorkChunk);
        this.myLastChunkId.set(chunkId);
        if (!this.myGatedExecution) {
            JobWorkNotification workNotification = new JobWorkNotification(this.myJobDefinitionId, this.myJobDefinitionVersion, instanceId, targetStepId, chunkId);
            this.myBatchJobSender.sendWorkChannelMessage(workNotification);
        }
    }

    @Override
    public int getWorkChunkCount() {
        return this.myChunkCounter.get();
    }

    public String getOnlyChunkId() {
        if (this.getWorkChunkCount() != 1) {
            String msg = String.format("Expected this sink to have exactly one work chunk but there are %d.  Job %s v%s step %s", this.getWorkChunkCount(), this.myJobDefinitionId, this.myJobDefinitionVersion, this.myTargetStep);
            throw new IllegalStateException(Msg.code((int)2082) + msg);
        }
        return this.myLastChunkId.get();
    }
}

