/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestHoodieDeltaStreamerWithMultiWriter
extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
    String basePath;
    String propsFilePath;
    String tableBasePath;

    @Override
    @BeforeEach
    public void setup() throws Exception {
        this.basePath = UtilitiesTestBase.basePath;
        super.setupTest();
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        TestDataSource.resetDataGen();
        FileIOUtils.deleteDirectory((File)new File(this.basePath));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {
        this.basePath = Paths.get(URI.create(this.basePath.replaceAll("/$", ""))).toString();
        this.propsFilePath = this.basePath + "/" + "test-multi-writer.properties";
        this.tableBasePath = this.basePath + "/testUpsertsContinuousModeWithMultipleWritersForConflicts_" + tableType;
        TestHoodieDeltaStreamerWithMultiWriter.prepareInitialConfigs(storage, this.basePath, "foo");
        TypedProperties props = TestHoodieDeltaStreamerWithMultiWriter.prepareMultiWriterProps(storage, this.basePath, this.propsFilePath);
        props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        props.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        props.setProperty("hoodie.merge.use.record.positions", "false");
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, this.propsFilePath);
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config prepJobConfig = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        prepJobConfig.continuousMode = true;
        prepJobConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        if (tableType == HoodieTableType.MERGE_ON_READ) {
            prepJobConfig.configs.add(String.format("%s=3", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()));
            prepJobConfig.configs.add(String.format("%s=0", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()));
        }
        HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(prepJob, prepJobConfig, r -> {
            if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, this.tableBasePath);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, this.tableBasePath);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(3, this.tableBasePath);
            }
            this.assertRecordCount(totalRecords, this.tableBasePath, sqlContext);
            this.assertDistanceCount(totalRecords, this.tableBasePath, sqlContext);
            return true;
        });
        HoodieDeltaStreamer.Config cfgIngestionJob = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        cfgIngestionJob.continuousMode = true;
        cfgIngestionJob.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        cfgIngestionJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer.Config cfgBackfillJob = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        cfgBackfillJob.continuousMode = false;
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(hadoopConf), (String)this.tableBasePath);
        HoodieTimeline timeline = meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        HoodieCommitMetadata commitMetadata = timeline.readCommitMetadata((HoodieInstant)timeline.firstInstant().get());
        cfgBackfillJob.checkpoint = commitMetadata.getMetadata("deltastreamer.checkpoint.key");
        cfgBackfillJob.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
        HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
        this.runJobsInParallel(this.tableBasePath, tableType, totalRecords, ingestionJob2, cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception {
        this.basePath = Paths.get(URI.create(this.basePath.replaceAll("/$", ""))).toString();
        this.propsFilePath = this.basePath + "/" + "test-multi-writer.properties";
        this.tableBasePath = this.basePath + "/testUpsertsContinuousModeWithMultipleWritersWithoutConflicts_" + tableType;
        TestHoodieDeltaStreamerWithMultiWriter.prepareInitialConfigs(storage, this.basePath, "foo");
        TypedProperties props = TestHoodieDeltaStreamerWithMultiWriter.prepareMultiWriterProps(storage, this.basePath, this.propsFilePath);
        props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        props.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, this.propsFilePath);
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config prepJobConfig = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        prepJobConfig.continuousMode = true;
        prepJobConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(prepJob, prepJobConfig, r -> {
            if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, this.tableBasePath);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, this.tableBasePath);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(3, this.tableBasePath);
            }
            this.assertRecordCount(totalRecords, this.tableBasePath, sqlContext);
            this.assertDistanceCount(totalRecords, this.tableBasePath, sqlContext);
            return true;
        });
        props = TestHoodieDeltaStreamerWithMultiWriter.prepareMultiWriterProps(storage, this.basePath, this.propsFilePath);
        props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        props.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        props.setProperty("hoodie.test.source.generate.inserts", "true");
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, this.basePath + "/" + "test-multi-writer.properties");
        HoodieDeltaStreamer.Config cfgBackfillJob2 = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.INSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
        cfgBackfillJob2.continuousMode = false;
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(hadoopConf), (String)this.tableBasePath);
        HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        HoodieCommitMetadata commitMetadata = timeline.readCommitMetadata((HoodieInstant)timeline.firstInstant().get());
        cfgBackfillJob2.checkpoint = commitMetadata.getMetadata("deltastreamer.checkpoint.key");
        cfgBackfillJob2.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer.Config cfgIngestionJob2 = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
        cfgIngestionJob2.continuousMode = true;
        cfgIngestionJob2.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc);
        HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc);
        this.runJobsInParallel(this.tableBasePath, tableType, totalRecords, ingestionJob3, cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class, names={"COPY_ON_WRITE"})
    void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
        this.basePath = Paths.get(URI.create(this.basePath.replaceAll("/$", ""))).toString();
        this.propsFilePath = this.basePath + "/" + "test-multi-writer.properties";
        this.tableBasePath = this.basePath + "/testLatestCheckpointCarryOverWithMultipleWriters_" + tableType;
        TestHoodieDeltaStreamerWithMultiWriter.prepareInitialConfigs(storage, this.basePath, "foo");
        TypedProperties props = TestHoodieDeltaStreamerWithMultiWriter.prepareMultiWriterProps(storage, this.basePath, this.propsFilePath);
        props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        props.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, this.propsFilePath);
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config prepJobConfig = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        prepJobConfig.continuousMode = true;
        prepJobConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(prepJob, prepJobConfig, r -> {
            if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, this.tableBasePath);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, this.tableBasePath);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(3, this.tableBasePath);
            }
            this.assertRecordCount(totalRecords, this.tableBasePath, sqlContext);
            this.assertDistanceCount(totalRecords, this.tableBasePath, sqlContext);
            return true;
        });
        HoodieDeltaStreamer.Config cfgBackfillJob = TestHoodieDeltaStreamerWithMultiWriter.getDeltaStreamerConfig(this.tableBasePath, tableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        cfgBackfillJob.continuousMode = false;
        props = TestHoodieDeltaStreamerWithMultiWriter.prepareMultiWriterProps(storage, this.basePath, this.propsFilePath);
        props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        props.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, this.propsFilePath);
        HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(hadoopConf), (String)this.tableBasePath);
        HoodieCommitMetadata commitMetadataForLastInstant = TestHoodieDeltaStreamerWithMultiWriter.getLatestMetadata(meta);
        cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata("deltastreamer.checkpoint.key");
        cfgBackfillJob.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
        cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
        backfillJob.sync();
        meta.reloadActiveTimeline();
        int totalCommits = meta.getCommitsTimeline().filterCompletedInstants().countInstants();
        TestHoodieDeltaStreamerWithMultiWriter.addCommitToTimeline(meta);
        meta.reloadActiveTimeline();
        this.verifyCommitMetadataCheckpoint(meta, null);
        cfgBackfillJob.checkpoint = null;
        new HoodieDeltaStreamer(cfgBackfillJob, jsc).sync();
        meta.reloadActiveTimeline();
        Assertions.assertEquals((int)(totalCommits + 2), (int)meta.getCommitsTimeline().filterCompletedInstants().countInstants());
        this.verifyCommitMetadataCheckpoint(meta, "00008");
    }

    private void verifyCommitMetadataCheckpoint(HoodieTableMetaClient metaClient, String expectedCheckpoint) throws IOException {
        HoodieCommitMetadata commitMeta = TestHoodieDeltaStreamerWithMultiWriter.getLatestMetadata(metaClient);
        if (expectedCheckpoint == null) {
            Assertions.assertThrows(HoodieException.class, () -> CheckpointUtils.getCheckpoint((HoodieCommitMetadata)commitMeta));
        } else {
            Assertions.assertEquals((Object)expectedCheckpoint, (Object)CheckpointUtils.getCheckpoint((HoodieCommitMetadata)commitMeta).getCheckpointKey());
        }
    }

    private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient meta) throws IOException {
        HoodieTimeline timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
        return timeline.readCommitMetadata((HoodieInstant)timeline.lastInstant().get());
    }

    private static TypedProperties prepareMultiWriterProps(HoodieStorage storage, String basePath, String propsFilePath) throws IOException {
        TypedProperties props = new TypedProperties();
        TestHoodieDeltaStreamerWithMultiWriter.populateCommonProps(props, basePath);
        TestHoodieDeltaStreamerWithMultiWriter.populateCommonHiveProps(props);
        props.setProperty("include", "sql-transformer.properties");
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
        props.setProperty("hoodie.streamer.schemaprovider.target.schema.file", basePath + "/target.avsc");
        props.setProperty("include", "base.properties");
        props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
        props.setProperty("hoodie.clean.failed.writes.policy", "LAZY");
        props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
        props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
        props.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
        props.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1");
        props.setProperty("hoodie.write.lock.zookeeper.port", "2828");
        props.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
        props.setProperty("hoodie.write.lock.num_retries", "10");
        props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
        props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
        props.setProperty(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), "4");
        props.setProperty(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), "4");
        props.setProperty(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), "4");
        props.setProperty(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), "4");
        props.setProperty(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name());
        UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, propsFilePath);
        return props;
    }

    private static HoodieDeltaStreamer.Config getDeltaStreamerConfig(String basePath, String tableType, WriteOperationType op, String propsFilePath, List<String> transformerClassNames) {
        HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
        cfg.targetBasePath = basePath;
        cfg.targetTableName = "hoodie_trips";
        cfg.tableType = tableType;
        cfg.sourceClassName = TestDataSource.class.getName();
        cfg.transformerClassNames = transformerClassNames;
        cfg.operation = op;
        cfg.enableHiveSync = false;
        cfg.sourceOrderingField = "timestamp";
        cfg.propsFilePath = propsFilePath;
        cfg.sourceLimit = 1000L;
        cfg.schemaProviderClassName = defaultSchemaProviderClassName;
        return cfg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception {
        block10: {
            ExecutorService service = Executors.newFixedThreadPool(2);
            HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient((StorageConfiguration)new HadoopStorageConfiguration(hadoopConf), (String)tableBasePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            String lastSuccessfulCommit = ((HoodieInstant)timeline.lastInstant().get()).requestedTime();
            Function<Boolean, Boolean> conditionForRegularIngestion = r -> {
                if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath);
                } else {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath);
                }
                this.assertRecordCount(totalRecords, tableBasePath, sqlContext);
                this.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
                return true;
            };
            AtomicBoolean continuousFailed = new AtomicBoolean(false);
            AtomicBoolean backfillFailed = new AtomicBoolean(false);
            try {
                Future<?> regularIngestionJobFuture = service.submit(() -> {
                    try {
                        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId);
                    }
                    catch (Throwable ex) {
                        continuousFailed.set(true);
                        LOG.error("Continuous job failed " + ex.getMessage());
                        throw new RuntimeException(ex);
                    }
                });
                Future<?> backfillJobFuture = service.submit(() -> {
                    try {
                        TestHoodieDeltaStreamerWithMultiWriter.awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit));
                        backfillJob.sync();
                    }
                    catch (Throwable ex) {
                        LOG.error("Backfilling job failed " + ex.getMessage());
                        backfillFailed.set(true);
                        throw new RuntimeException(ex);
                    }
                });
                backfillJobFuture.get();
                regularIngestionJobFuture.get();
                if (expectConflict) {
                    Assertions.fail((String)"Failed to handle concurrent writes");
                }
            }
            catch (Exception e) {
                if (expectConflict && backfillFailed.get() && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
                    if (!continuousFailed.get()) {
                        LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + jobId);
                        ingestionJob.shutdownGracefully();
                        break block10;
                    }
                    throw new HoodieException("Both backfilling and ingestion job failed ", (Throwable)e);
                }
                if (expectConflict && continuousFailed.get() && e.getCause().getMessage().contains("Ingestion service was shut down with exception")) {
                    if (!backfillFailed.get()) {
                        LOG.warn("Calling shutdown on backfill job since the ingstion/continuous job has failed for " + jobId);
                        backfillJob.shutdownGracefully();
                        break block10;
                    }
                    throw new HoodieException("Both backfilling and ingestion job failed ", (Throwable)e);
                }
                LOG.error("Conflict happened, but not expected " + e.getCause().getMessage());
                throw e;
            }
            finally {
                service.shutdown();
            }
        }
    }

    private static void awaitCondition(GetCommitsAfterInstant callback) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        for (long soFar = 0L; soFar <= 5000L && callback.getCommitsAfterInstant() <= 0L; soFar += 500L) {
            Thread.sleep(500L);
        }
        LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - startTime));
    }

    class GetCommitsAfterInstant {
        String basePath;
        String lastSuccessfulCommit;
        HoodieTableMetaClient meta;

        GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) {
            this.basePath = basePath;
            this.lastSuccessfulCommit = lastSuccessfulCommit;
            this.meta = HoodieTestUtils.createMetaClient((HoodieStorage)storage, (String)basePath);
        }

        long getCommitsAfterInstant() {
            HoodieTimeline timeline1 = this.meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(this.lastSuccessfulCommit);
            return timeline1.countInstants();
        }
    }
}

