/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.task;

import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class IncrementalTask
extends AbstractLifecycleExecutor
implements PipelineTask,
AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(IncrementalTask.class);
    private final String taskId;
    private final ExecuteEngine incrementalDumperExecuteEngine;
    private final PipelineChannel channel;
    private final Dumper dumper;
    private final Collection<Importer> importers;
    private final IncrementalTaskProgress progress;

    public IncrementalTask(int concurrency, DumperConfiguration dumperConfig, ImporterConfiguration importerConfig, PipelineChannelCreator pipelineChannelCreator, PipelineDataSourceManager dataSourceManager, PipelineTableMetaDataLoader sourceMetaDataLoader, ExecuteEngine incrementalDumperExecuteEngine) {
        this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
        this.taskId = dumperConfig.getDataSourceName();
        this.progress = new IncrementalTaskProgress();
        IngestPosition position = dumperConfig.getPosition();
        this.progress.setPosition(position);
        this.channel = this.createChannel(concurrency, pipelineChannelCreator, this.progress);
        this.dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, this.channel, sourceMetaDataLoader);
        this.importers = this.createImporters(concurrency, importerConfig, dataSourceManager, this.channel);
    }

    protected void doStart() {
        this.progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        Future<?> future = this.incrementalDumperExecuteEngine.submitAll(this.importers, this.getExecuteCallback());
        this.dumper.start();
        this.waitForResult(future);
    }

    private Collection<Importer> createImporters(int concurrency, ImporterConfiguration importerConfig, PipelineDataSourceManager dataSourceManager, PipelineChannel channel) {
        LinkedList<Importer> result = new LinkedList<Importer>();
        for (int i = 0; i < concurrency; ++i) {
            result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel));
        }
        return result;
    }

    private PipelineChannel createChannel(int concurrency, PipelineChannelCreator pipelineChannelCreator, IncrementalTaskProgress progress) {
        return pipelineChannelCreator.createPipelineChannel(concurrency, records -> {
            Record lastHandledRecord = (Record)records.get(records.size() - 1);
            if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
                progress.setPosition(lastHandledRecord.getPosition());
                progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
            }
            progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
        });
    }

    private ExecuteCallback getExecuteCallback() {
        return new ExecuteCallback(){

            @Override
            public void onSuccess() {
                log.info("importer onSuccess, taskId={}", (Object)IncrementalTask.this.taskId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("importer onFailure, taskId={}", (Object)IncrementalTask.this.taskId, (Object)throwable);
                IncrementalTask.this.stop();
            }
        };
    }

    private void waitForResult(Future<?> future) {
        try {
            future.get();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ExecutionException ex) {
            throw new PipelineJobExecutionException(String.format("Task %s execute failed ", this.taskId), ex.getCause());
        }
    }

    protected void doStop() {
        this.dumper.stop();
        for (Importer each : this.importers) {
            each.stop();
        }
    }

    @Override
    public void close() {
        this.channel.close();
    }

    @Generated
    public String toString() {
        return "IncrementalTask(taskId=" + this.getTaskId() + ")";
    }

    @Override
    @Generated
    public String getTaskId() {
        return this.taskId;
    }

    @Generated
    public IncrementalTaskProgress getProgress() {
        return this.progress;
    }
}

