/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.Configuration;
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.document.Array;
import io.debezium.document.ArrayReader;
import io.debezium.document.ArrayWriter;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
import io.debezium.embedded.AvailableVariables;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.MismatchRecordException;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Iterators;
import io.debezium.util.LoggingContext;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import io.debezium.util.Threads;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConnectorOutputTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorOutputTest.class);
    public static final String DEFAULT_CONNECTOR_PROPERTIES_FILENAME = "connector.properties";
    public static final String DEFAULT_ENV_PROPERTIES_FILENAME = "env.properties";
    public static final String DEFAULT_EXPECTED_RECORDS_FILENAME = "expected-records.json";
    public static final String ENV_CONNECTOR_TIMEOUT_IN_SECONDS = "-connector.timeout.in.seconds";
    public static final String ENV_IGNORE_FIELDS = "ignore.fields";
    public static final Path CONNECTOR_OUTPUT_PATH = Testing.Files.createTestingPath((String)"connector-output");
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath((String)"integration-test-connector-offsets.data").toAbsolutePath();
    public static final String CONTROL_KEY = "connector";
    public static final String CONTROL_RESTART = "restart";
    public static final String CONTROL_STOP = "stop";
    public static final String CONTROL_END = "end";

    protected void addValueComparatorsByFieldPath(BiConsumer<String, VerifyRecord.RecordValueComparator> comparatorsByPath) {
    }

    protected void addValueComparatorsBySchemaName(BiConsumer<String, VerifyRecord.RecordValueComparator> comparatorsByPath) {
    }

    protected TestSpecification usingSpec(String name) {
        return new TestSpecification(name, null, null, null, null);
    }

    protected TestSpecification usingSpec(String name, String directory) {
        String version = System.getProperty("java.version");
        String modulePath = "";
        if (!version.startsWith("1.") && !Paths.get(directory, new String[0]).toAbsolutePath().toString().contains("debezium-embedded")) {
            modulePath = "debezium-embedded/";
        }
        return this.usingSpec(name, Paths.get(modulePath + directory, new String[0]));
    }

    protected TestSpecification usingSpec(String name, Path directory) {
        directory = directory.toAbsolutePath();
        Path configFile = directory.resolve(DEFAULT_CONNECTOR_PROPERTIES_FILENAME);
        Path expectedRecordsFile = directory.resolve(DEFAULT_EXPECTED_RECORDS_FILENAME);
        Path envFile = directory.resolve(DEFAULT_ENV_PROPERTIES_FILENAME);
        return this.usingSpec(name).withConfiguration(configFile.toFile()).withEnvironment(envFile.toFile()).withReadOrWriteTestData(expectedRecordsFile);
    }

    protected TestSpecification usingSpec(String name, String configFile, String expectedRecordsFile, String envFile) {
        return this.usingSpec(name, Paths.get(configFile, new String[0]), Paths.get(expectedRecordsFile, new String[0]), envFile != null ? Paths.get(envFile, new String[0]) : null);
    }

    protected TestSpecification usingSpec(String name, Path configFile, Path expectedRecordsFile, Path envFile) {
        return this.usingSpec(name).withConfiguration(configFile.toFile()).withEnvironment(envFile.toFile()).withReadOrWriteTestData(expectedRecordsFile);
    }

    @Before
    public void cleanOffsetStorage() {
        Testing.Print.enable();
        Testing.Files.delete((Path)CONNECTOR_OUTPUT_PATH);
        Testing.Files.delete((Path)OFFSET_STORE_PATH);
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
    }

    @After
    public void afterEachTestMethod() {
        Testing.Print.disable();
    }

    protected String[] globallyIgnorableFieldNames() {
        return null;
    }

    protected void runConnector(String testName, String directory) {
        this.runConnector(this.usingSpec(testName, directory));
    }

    protected void runConnector(String testName, Path directory) {
        this.runConnector(this.usingSpec(testName, directory));
    }

    protected void runConnector(String testName, String configFile, String expectedRecordsFile, String envFile) {
        this.runConnector(this.usingSpec(testName, configFile, expectedRecordsFile, envFile));
    }

    protected void runConnector(String testName, Path configFile, Path expectedRecordsFile, Path envFile) {
        this.runConnector(this.usingSpec(testName, configFile, expectedRecordsFile, envFile));
    }

    protected void runConnector(TestSpecification spec) {
        this.runConnector(spec, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void runConnector(TestSpecification spec, DebeziumEngine.CompletionCallback callback) {
        LoggingContext.PreviousContext preRunContext = LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"runner", (String)spec.name());
        Configuration environmentConfig = Configuration.copy((Configuration)spec.environment()).build();
        Configuration connectorConfig = spec.config();
        String[] ignorableFieldNames = environmentConfig.getString(ENV_IGNORE_FIELDS, "").split(",");
        Set ignorableFields = Arrays.stream(ignorableFieldNames).map(String::trim).collect(Collectors.toSet());
        String[] globallyIgnorableFieldNames = this.globallyIgnorableFieldNames();
        if (globallyIgnorableFieldNames != null && globallyIgnorableFieldNames.length != 0) {
            ignorableFields.addAll(Arrays.stream(globallyIgnorableFieldNames).map(String::trim).collect(Collectors.toSet()));
        }
        SchemaAndValueConverter keyConverter = new SchemaAndValueConverter(environmentConfig, true);
        SchemaAndValueConverter valueConverter = new SchemaAndValueConverter(environmentConfig, false);
        TestData testData = spec.testData();
        HashMap comparatorsByFieldName = new HashMap();
        this.addValueComparatorsByFieldPath(comparatorsByFieldName::put);
        HashMap comparatorsBySchemaName = new HashMap();
        this.addValueComparatorsBySchemaName(comparatorsBySchemaName::put);
        RuntimeException runError = null;
        EmbeddedEngine.CompletionResult problem = new EmbeddedEngine.CompletionResult(callback);
        try {
            Iterators.PreviewIterator expectedRecords = Iterators.preview(testData.read());
            Consumer<Document> recorder = testData::write;
            Threads.TimeSince timeSinceLastRecord = Threads.timeSince((Clock)Clock.SYSTEM);
            Queue actualRecordHistory = this.fixedSizeQueue(10);
            Queue expectedRecordHistory = this.fixedSizeQueue(10);
            ConsumerCompletion result = new ConsumerCompletion();
            Consumer<RecordChangeEvent> consumer = actualRecord -> {
                block16: {
                    LoggingContext.PreviousContext prev = LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"runner", (String)spec.name());
                    try {
                        Testing.debug((Object)("actual record:    " + SchemaUtil.asString((Object)actualRecord)));
                        timeSinceLastRecord.reset();
                        actualRecordHistory.add((SourceRecord)actualRecord.record());
                        try {
                            Document jsonRecord = this.serializeSourceRecord((SourceRecord)actualRecord.record(), keyConverter, valueConverter);
                            if (jsonRecord != null) {
                                recorder.accept(jsonRecord);
                            }
                        }
                        catch (IOException e) {
                            String msg = "Error converting JSON to SourceRecord";
                            Testing.debug((Object)msg);
                            throw new ConnectException(msg, (Throwable)e);
                        }
                        if (expectedRecords == null) break block16;
                        if (!expectedRecords.hasNext()) {
                            String msg = "Source record found but nothing expected";
                            result.error();
                            Testing.debug((Object)msg);
                            throw new MismatchRecordException(msg, actualRecordHistory, expectedRecordHistory);
                        }
                        Document expected = (Document)expectedRecords.next();
                        if (this.isEndCommand(expected)) {
                            result.error();
                            String msg = "Source record was found but not expected: " + SchemaUtil.asString((Object)actualRecord);
                            Testing.debug((Object)msg);
                            throw new MismatchRecordException(msg, actualRecordHistory, expectedRecordHistory);
                        }
                        if (this.isCommand(expected)) {
                            Testing.debug((Object)("applying command: " + SchemaUtil.asString((Object)expected)));
                            this.applyCommand(expected, result);
                        } else {
                            try {
                                SourceRecord expectedRecord = this.rehydrateSourceRecord(expected, keyConverter, valueConverter);
                                expectedRecordHistory.add(expectedRecord);
                                Testing.debug((Object)("expected record:  " + SchemaUtil.asString((SourceRecord)expectedRecord)));
                                try {
                                    this.assertSourceRecordMatch((SourceRecord)actualRecord.record(), expectedRecord, ignorableFields::contains, comparatorsByFieldName, comparatorsBySchemaName);
                                }
                                catch (AssertionError e) {
                                    result.error();
                                    String msg = "Source record with key " + SchemaUtil.asString((Object)((SourceRecord)actualRecord.record()).key()) + " did not match expected record: " + ((Throwable)((Object)e)).getMessage();
                                    Testing.debug((Object)msg);
                                    throw new MismatchRecordException(e, msg, actualRecordHistory, expectedRecordHistory);
                                }
                            }
                            catch (IOException e) {
                                result.exception();
                                String msg = "Error converting JSON to SourceRecord";
                                Testing.debug((Object)msg);
                                throw new ConnectException(msg, (Throwable)e);
                            }
                        }
                        if (!expectedRecords.hasNext()) {
                            result.stop();
                            String msg = "Stopping connector after no more expected records found";
                            Testing.debug((Object)msg);
                            throw new StopEngineException(msg);
                        }
                        Document nextExpectedRecord = (Document)expectedRecords.peek();
                        if (this.isCommand(nextExpectedRecord)) {
                            this.applyCommand((Document)expectedRecords.next(), result);
                        }
                    }
                    finally {
                        prev.restore();
                    }
                }
            };
            Configuration engineConfig = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.copy((Configuration)connectorConfig).withDefault(environmentConfig)).withDefault(EmbeddedEngineConfig.ENGINE_NAME, spec.name())).withDefault("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).withDefault(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)).build();
            final DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(engineConfig.asProperties()).notifying(consumer).using(this.getClass().getClassLoader()).using((DebeziumEngine.CompletionCallback)problem).build();
            long connectorTimeoutInSeconds = environmentConfig.getLong(ENV_CONNECTOR_TIMEOUT_IN_SECONDS, 10L);
            do {
                Thread timeoutThread = Threads.timeout((String)(spec.name() + "-connector-output"), (long)connectorTimeoutInSeconds, (TimeUnit)TimeUnit.SECONDS, (Threads.TimeSince)timeSinceLastRecord, (Runnable)new Runnable(){

                    @Override
                    public void run() {
                        try {
                            engine.close();
                        }
                        catch (IOException e) {
                            LOGGER.warn("Failed to stop the engine: ", (Throwable)e);
                        }
                    }
                });
                result.uponCompletion(timeoutThread::interrupt);
                timeoutThread.start();
                Testing.debug((Object)"Starting connector");
                result.reset();
                engine.run();
            } while (result.get() == ExecutionResult.RESTART_REQUESTED);
        }
        catch (IOException e) {
            runError = new RuntimeException("Error reading test data: " + e.getMessage(), e);
        }
        catch (RuntimeException t) {
            runError = t;
        }
        finally {
            try {
                testData.close();
            }
            catch (IOException e) {
                if (runError != null) {
                    runError = new RuntimeException("Error closing test data: " + e.getMessage(), e);
                }
            }
            finally {
                try {
                    keyConverter.close();
                }
                finally {
                    try {
                        valueConverter.close();
                    }
                    finally {
                        preRunContext.restore();
                    }
                }
            }
        }
        if (runError != null) {
            throw runError;
        }
        if (!problem.hasError()) return;
        Throwable error = problem.error();
        if (error instanceof AssertionError) {
            Assert.fail((String)problem.message());
            return;
        }
        if (!(error instanceof MismatchRecordException)) {
            if (!(error instanceof RuntimeException)) throw new RuntimeException(error);
            throw (RuntimeException)error;
        }
        MismatchRecordException mismatch = (MismatchRecordException)((Object)error);
        LinkedList<SourceRecord> actualHistory = mismatch.getActualRecords();
        LinkedList<SourceRecord> expectedHistory = mismatch.getExpectedRecords();
        Testing.print((Object)"");
        Testing.print((Object)("FAILURE in connector integration test '" + spec.name() + "' in class " + String.valueOf(this.getClass())));
        Testing.print((Object)(" actual record:   " + SchemaUtil.asString((SourceRecord)actualHistory.getLast())));
        Testing.print((Object)(" expected record: " + SchemaUtil.asString((SourceRecord)expectedHistory.getLast())));
        Testing.print((Object)mismatch.getMessage());
        Testing.print((Object)"");
        AssertionError cause = ((MismatchRecordException)((Object)error)).getError();
        if (cause != null) {
            throw cause;
        }
        Assert.fail((String)problem.message());
    }

    private void applyCommand(Document record, ConsumerCompletion result) {
        if (this.isCommand(record)) {
            Testing.debug((Object)("applying command: " + SchemaUtil.asString((Object)record)));
            String command = record.getString((CharSequence)CONTROL_KEY);
            if (CONTROL_RESTART.equalsIgnoreCase(command)) {
                result.restartRequested();
                String msg = "Stopping connector after record as requested";
                Testing.debug((Object)msg);
                throw new StopEngineException(msg);
            }
            if (CONTROL_STOP.equalsIgnoreCase(command)) {
                result.stop();
                String msg = "Stopping connector after record as requested";
                Testing.debug((Object)msg);
                throw new StopEngineException(msg);
            }
        }
    }

    private boolean isCommand(Document record) {
        return record.has((CharSequence)CONTROL_KEY);
    }

    private boolean isEndCommand(Document record) {
        return this.isCommand(record) && CONTROL_END.equalsIgnoreCase(this.getCommand(record));
    }

    private String getCommand(Document record) {
        return record.getString((CharSequence)CONTROL_KEY);
    }

    private SourceRecord rehydrateSourceRecord(Document record, SchemaAndValueConverter keyConverter, SchemaAndValueConverter valueConverter) throws IOException {
        Document sourcePartitionDoc = record.getDocument((CharSequence)"sourcePartition");
        Document sourceOffsetDoc = record.getDocument((CharSequence)"sourceOffset");
        String topic = record.getString((CharSequence)"topic");
        Integer kafkaPartition = record.getInteger((CharSequence)"kafkaPartition");
        Document keySchema = record.getDocument((CharSequence)"keySchema");
        Document valueSchema = record.getDocument((CharSequence)"valueSchema");
        Document key = record.getDocument((CharSequence)"key");
        Document value = record.getDocument((CharSequence)"value");
        Document keyAndSchemaDoc = Document.create((CharSequence)"schema", (Object)keySchema, (CharSequence)"payload", (Object)key);
        Document valueAndSchemaDoc = Document.create((CharSequence)"schema", (Object)valueSchema, (CharSequence)"payload", (Object)value);
        SchemaAndValue keyWithSchema = keyConverter.deserialize(topic, keyAndSchemaDoc);
        SchemaAndValue valueWithSchema = valueConverter.deserialize(topic, valueAndSchemaDoc);
        Map<String, ?> sourcePartition = this.toMap(sourcePartitionDoc);
        Map<String, ?> sourceOffset = this.toMap(sourceOffsetDoc);
        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keyWithSchema.schema(), keyWithSchema.value(), valueWithSchema.schema(), valueWithSchema.value());
    }

    private Document serializeSourceRecord(SourceRecord record, SchemaAndValueConverter keyConverter, SchemaAndValueConverter valueConverter) throws IOException {
        Document keyAndSchema = keyConverter.serialize(record.topic(), record.keySchema(), record.key());
        Document valueAndSchema = valueConverter.serialize(record.topic(), record.valueSchema(), record.value());
        Document sourcePartition = Document.create().putAll(record.sourcePartition());
        Document sourceOffset = Document.create().putAll(record.sourceOffset());
        Document parent = Document.create();
        parent.set((CharSequence)"sourcePartition", (Object)sourcePartition);
        parent.set((CharSequence)"sourceOffset", (Object)sourceOffset);
        parent.set((CharSequence)"topic", (Object)record.topic());
        parent.set((CharSequence)"kafkaPartition", (Object)record.kafkaPartition());
        parent.set((CharSequence)"keySchema", (Object)keyAndSchema.getDocument((CharSequence)"schema"));
        parent.set((CharSequence)"key", (Object)keyAndSchema.getDocument((CharSequence)"payload"));
        parent.set((CharSequence)"valueSchema", (Object)valueAndSchema.getDocument((CharSequence)"schema"));
        parent.set((CharSequence)"value", (Object)valueAndSchema.getDocument((CharSequence)"payload"));
        return parent;
    }

    private void assertSourceRecordMatch(SourceRecord actual, SourceRecord expected, Predicate<String> ignoreFields, Map<String, VerifyRecord.RecordValueComparator> comparatorsByName, Map<String, VerifyRecord.RecordValueComparator> comparatorsBySchemaName) {
        try {
            VerifyRecord.isValid((SourceRecord)actual);
        }
        catch (AssertionError e) {
            throw new AssertionError((Object)("Actual source record is not valid: " + ((Throwable)((Object)e)).getMessage()));
        }
        try {
            VerifyRecord.isValid((SourceRecord)expected);
        }
        catch (AssertionError e) {
            throw new AssertionError((Object)("Expected source record is not valid: " + ((Throwable)((Object)e)).getMessage()));
        }
        VerifyRecord.assertEquals((SourceRecord)actual, (SourceRecord)expected, ignoreFields, comparatorsByName, comparatorsBySchemaName);
    }

    private Map<String, ?> toMap(Document doc) {
        HashMap result = new HashMap();
        doc.forEach(field -> result.put(field.getName().toString(), field.getValue().asObject()));
        return result;
    }

    private <T> Queue<T> fixedSizeQueue(final int size) {
        return new LinkedList<T>(){
            private static final long serialVersionUID = 1L;

            @Override
            public boolean add(T o) {
                super.add(o);
                while (this.size() > size) {
                    super.remove();
                }
                return true;
            }
        };
    }

    protected static File replaceVariables(InputStream stream, AvailableVariables variables) throws IOException {
        File tmpFile = Testing.Files.createTestingFile();
        try (FileOutputStream ostream = new FileOutputStream(tmpFile);){
            IoUtil.readLines((InputStream)stream, line -> {
                String newLine = Strings.replaceVariables((String)line, variables::variableForName);
                try {
                    ostream.write(newLine.getBytes(StandardCharsets.UTF_8));
                }
                catch (IOException e) {
                    throw new RuntimeException("Error writing to file '" + String.valueOf(tmpFile) + "'", e);
                }
            }, (Charset)StandardCharsets.UTF_8);
        }
        return tmpFile;
    }

    public static class TestSpecification {
        private final String name;
        private final Configuration config;
        private final Configuration env;
        private final Function<TestSpecification, TestData> dataSupplier;
        private final AvailableVariables variables;
        private final AtomicReference<TestData> cachedData = new AtomicReference();

        public TestSpecification(String name, Configuration config, Configuration env, Function<TestSpecification, TestData> dataSupplier, AvailableVariables variables) {
            this.name = name != null ? name : "";
            this.variables = (variables != null ? variables : AvailableVariables.empty()).and(this.builtInVariables());
            Configuration configuration = config != null ? config.withReplacedVariables(this.variables::variableForName) : (this.config = Configuration.empty());
            this.env = env != null ? env.withReplacedVariables(this.variables::variableForName) : Configuration.empty();
            this.dataSupplier = dataSupplier != null ? dataSupplier : spec -> () -> Iterators.empty();
        }

        private AvailableVariables builtInVariables() {
            Map builtIns = Collect.hashMapOf((Object)"dbz.test.name", (Object)this.name());
            System.getProperties().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> builtIns.put(key.toString(), value.toString())));
            return builtIns::get;
        }

        public String name() {
            return this.name;
        }

        public Configuration config() {
            return this.config;
        }

        public Configuration environment() {
            return this.env;
        }

        public TestData testData() {
            if (this.cachedData.get() == null) {
                this.cachedData.compareAndSet(null, this.dataSupplier.apply(this));
            }
            return this.cachedData.get();
        }

        public AvailableVariables variables() {
            return this.variables;
        }

        public TestSpecification withName(String name) {
            return new TestSpecification(name, this.config, this.env, this.dataSupplier, this.variables);
        }

        public TestSpecification withConfiguration(Configuration config) {
            return new TestSpecification(this.name, config, this.env, this.dataSupplier, this.variables);
        }

        public TestSpecification withConfiguration(File file) {
            TestSpecification testSpecification;
            FileInputStream stream = new FileInputStream(file);
            try {
                testSpecification = this.withConfiguration(stream);
            }
            catch (Throwable throwable) {
                try {
                    try {
                        ((InputStream)stream).close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    Assert.fail((String)("Failed to read the configuration file '" + String.valueOf(file) + "': " + e.getMessage()));
                    return null;
                }
            }
            ((InputStream)stream).close();
            return testSpecification;
        }

        public TestSpecification withConfiguration(InputStream stream) {
            Properties props = new Properties();
            try {
                props.load(stream);
            }
            catch (IOException e) {
                Assert.fail((String)("Failed to read the configuration file from the input stream': " + e.getMessage()));
            }
            return this.withConfiguration(Configuration.from((Properties)props));
        }

        public TestSpecification withEnvironment(Configuration env) {
            return new TestSpecification(this.name, this.config, env, this.dataSupplier, this.variables);
        }

        public TestSpecification withEnvironment(File file) {
            TestSpecification testSpecification;
            FileInputStream stream = new FileInputStream(file);
            try {
                testSpecification = this.withEnvironment(stream);
            }
            catch (Throwable throwable) {
                try {
                    try {
                        ((InputStream)stream).close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    Assert.fail((String)("Failed to read the environment file '" + String.valueOf(file) + "': " + e.getMessage()));
                    return null;
                }
            }
            ((InputStream)stream).close();
            return testSpecification;
        }

        public TestSpecification withEnvironment(InputStream stream) {
            Properties props = new Properties();
            try {
                props.load(stream);
            }
            catch (IOException e) {
                Assert.fail((String)("Failed to read the environment input stream: " + e.getMessage()));
                return null;
            }
            return this.withEnvironment(Configuration.from((Properties)props));
        }

        public TestSpecification withTestData(Function<TestSpecification, TestData> dataSupplier) {
            return new TestSpecification(this.name, this.config, this.env, dataSupplier, this.variables);
        }

        public TestSpecification withReadOrWriteTestData(Path path) {
            return this.withReadOrWriteTestData(path.toFile());
        }

        public TestSpecification withReadOrWriteTestData(File file) {
            if (file.exists()) {
                return this.readJsonTestData(file);
            }
            return this.writeJsonTestData(file);
        }

        public TestSpecification readJsonTestData(Path path) {
            return this.readJsonTestData(path.toFile());
        }

        public TestSpecification readJsonTestData(File file) {
            return this.readJsonTestData(() -> new FileInputStream(file));
        }

        public TestSpecification readJsonTestData(InputStreamSupplier stream) {
            Function<TestSpecification, TestData> supplier = spec -> {
                AvailableVariables variables = spec.variables();
                return () -> {
                    File tmpFile = ConnectorOutputTest.replaceVariables(stream.get(), variables);
                    Array arrayOfDocuments = ArrayReader.defaultReader().readArray(tmpFile);
                    return Iterators.readOnly((Iterator)arrayOfDocuments.iterator(), entry -> {
                        Value value = entry.getValue();
                        return value.asDocument();
                    });
                };
            };
            return new TestSpecification(this.name, this.config, this.env, supplier, this.variables);
        }

        public TestSpecification writeJsonTestData(Path path) {
            return this.writeJsonTestData(path.toFile());
        }

        public TestSpecification writeJsonTestData(File file) {
            return this.writeJsonTestData(() -> new FileOutputStream(file));
        }

        public TestSpecification writeJsonTestData(final OutputStreamSupplier stream) {
            Function<TestSpecification, TestData> supplier = spec -> new TestData(){
                private List<Document> recorded = new ArrayList<Document>();

                @Override
                public Iterator<Document> read() {
                    return null;
                }

                @Override
                public void write(Document sourceRecord) {
                    this.recorded.add(sourceRecord);
                }

                @Override
                public void close() throws IOException {
                    List<Document> docs = this.recorded;
                    Array arrayOfDocs = Array.create(docs);
                    try (OutputStream str = stream.get();){
                        ArrayWriter.prettyWriter().write(arrayOfDocs, str);
                    }
                    TestData.super.close();
                }
            };
            return new TestSpecification(this.name, this.config, this.env, supplier, this.variables);
        }

        public TestSpecification withVariables(AvailableVariables variables) {
            return new TestSpecification(this.name, this.config, this.env, this.dataSupplier, variables);
        }

        public TestSpecification withVariables(VariableSupplier variableSupplier) {
            try {
                Map<String, String> variables = variableSupplier.get(this.config);
                return this.withVariables(variables::get);
            }
            catch (Throwable t) {
                t.printStackTrace(System.err);
                Assert.fail((String)("Unable to read variables using configuration: " + String.valueOf(this.config)));
                return null;
            }
        }
    }

    private static class SchemaAndValueConverter
    implements AutoCloseable {
        private final JsonConverter jsonConverter = new JsonConverter();
        private final JsonSerializer jsonSerializer = new JsonSerializer();
        private final JsonDeserializer jsonDeserializer = new JsonDeserializer();
        private final ObjectMapper mapper = new ObjectMapper();
        private final DocumentReader jsonReader = DocumentReader.defaultReader();

        SchemaAndValueConverter(Configuration config, boolean isKey) {
            this.jsonConverter.configure(config.asMap(), isKey);
            this.jsonSerializer.configure(config.asMap(), isKey);
            this.jsonDeserializer.configure(config.asMap(), isKey);
        }

        public SchemaAndValue deserialize(String topic, Document doc) throws IOException {
            String jsonString = doc.toString();
            JsonNode jsonNode = this.mapper.readTree(jsonString);
            byte[] rawValue = this.jsonSerializer.serialize(topic, jsonNode);
            return this.jsonConverter.toConnectData(topic, rawValue);
        }

        public Document serialize(String topic, Schema schema, Object value) throws IOException {
            byte[] rawBytes = this.jsonConverter.fromConnectData(topic, schema, value);
            JsonNode jsonNode = this.jsonDeserializer.deserialize(topic, rawBytes);
            String jsonStr = this.mapper.writeValueAsString((Object)jsonNode);
            return this.jsonReader.read(jsonStr);
        }

        @Override
        public void close() {
            try {
                this.jsonSerializer.close();
            }
            finally {
                this.jsonDeserializer.close();
            }
        }
    }

    @FunctionalInterface
    public static interface TestData
    extends AutoCloseable {
        public Iterator<Document> read() throws IOException;

        default public void write(Document sourceRecord) {
        }

        @Override
        default public void close() throws IOException {
        }
    }

    private static class ConsumerCompletion {
        private final AtomicReference<Runnable> uponCompletion = new AtomicReference();
        private final AtomicReference<ExecutionResult> result = new AtomicReference();

        private ConsumerCompletion() {
        }

        public void stop() {
            this.setResult(ExecutionResult.STOPPED);
        }

        public void error() {
            this.setResult(ExecutionResult.ERROR);
        }

        public void exception() {
            this.setResult(ExecutionResult.EXCEPTION);
        }

        public void restartRequested() {
            this.setResult(ExecutionResult.RESTART_REQUESTED);
        }

        private void setResult(ExecutionResult result) {
            try {
                Runnable r = this.uponCompletion.getAndSet(null);
                if (r != null) {
                    r.run();
                }
            }
            finally {
                this.result.compareAndSet(null, result);
            }
        }

        public void uponCompletion(Runnable r) {
            this.uponCompletion.set(r);
        }

        public ExecutionResult get() {
            return this.result.get();
        }

        public void reset() {
            this.result.set(null);
        }
    }

    private static enum ExecutionResult {
        ERROR,
        EXCEPTION,
        STOPPED,
        RESTART_REQUESTED;

    }

    @FunctionalInterface
    protected static interface OutputStreamSupplier {
        public OutputStream get() throws IOException;
    }

    @FunctionalInterface
    protected static interface InputStreamSupplier {
        public InputStream get() throws IOException;
    }

    @FunctionalInterface
    protected static interface VariableSupplier {
        public Map<String, String> get(Configuration var1) throws Exception;
    }
}

