/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.api.impl;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockMode;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RuleAlteredJobAPIImpl
extends AbstractPipelineJobAPIImpl
implements RuleAlteredJobAPI {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobAPIImpl.class);

    public List<JobInfo> list() {
        this.checkModeConfig();
        return this.getJobBriefInfos().map(each -> this.getJobInfo(each.getJobName())).collect(Collectors.toList());
    }

    private void checkModeConfig() {
        ModeConfiguration modeConfig = PipelineContext.getModeConfig();
        Preconditions.checkNotNull((Object)modeConfig, (Object)"Mode configuration is required.");
        Preconditions.checkArgument((boolean)"Cluster".equalsIgnoreCase(modeConfig.getType()), (Object)"Mode must be `Cluster`.");
    }

    private Stream<JobBriefInfo> getJobBriefInfos() {
        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"));
    }

    private JobInfo getJobInfo(String jobName) {
        JobInfo result = new JobInfo(jobName);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(result.getJobId());
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        result.setActive(!jobConfigPOJO.isDisabled());
        result.setShardingTotalCount(jobConfig.getJobShardingCount());
        result.setTables(jobConfig.getLogicTables());
        result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
        result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
        result.setJobParameter(jobConfigPOJO.getJobParameter());
        return result;
    }

    public Optional<String> start(RuleAlteredJobConfiguration jobConfig) {
        if (0 == jobConfig.getJobShardingCount()) {
            log.warn("Invalid scaling job config!");
            throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
        }
        log.info("Start scaling job by {}", (Object)jobConfig);
        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        String jobId = jobConfig.getJobId();
        String jobConfigKey = PipelineMetaDataNode.getScalingJobConfigPath(jobId);
        if (repositoryAPI.isExisted(jobConfigKey)) {
            log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", (Object)jobConfigKey);
            return Optional.of(jobId);
        }
        repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), RuleAlteredJob.class.getName());
        repositoryAPI.persist(jobConfigKey, this.createJobConfigText(jobConfig));
        return Optional.of(jobId);
    }

    private String createJobConfigText(RuleAlteredJobConfiguration jobConfig) {
        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
        jobConfigPOJO.setJobName(jobConfig.getJobId());
        jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
        jobConfigPOJO.setJobParameter(YamlEngine.marshal((Object)new RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
        jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
        return YamlEngine.marshal((Object)jobConfigPOJO);
    }

    public Map<Integer, JobProgress> getProgress(String jobId) {
        this.checkModeConfig();
        return this.getProgress(this.getJobConfig(jobId));
    }

    public Map<Integer, JobProgress> getProgress(RuleAlteredJobConfiguration jobConfig) {
        String jobId = jobConfig.getJobId();
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
            JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, (int)each);
            if (null != jobProgress) {
                jobProgress.setActive(!jobConfigPOJO.isDisabled());
            }
            map.put(each, jobProgress);
        }, HashMap::putAll);
    }

    private void verifyManualMode(RuleAlteredJobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
            throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
        }
    }

    private void verifyJobNotCompleted(RuleAlteredJobConfiguration jobConfig) {
        if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfig.getJobShardingCount(), this.getProgress(jobConfig).values())) {
            throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
        }
    }

    public void stopClusterWriteDB(String jobId) {
        this.checkModeConfig();
        log.info("stopClusterWriteDB for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyManualMode(jobConfig);
        this.verifyJobNotStopped(jobConfigPOJO);
        this.verifyJobNotCompleted(jobConfig);
        String databaseName = jobConfig.getDatabaseName();
        this.stopClusterWriteDB(databaseName, jobId);
    }

    public void stopClusterWriteDB(String databaseName, String jobId) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        if (lockContext.isLocked(databaseName)) {
            log.info("stopClusterWriteDB, already stopped");
            return;
        }
        if (lockContext.tryLock(databaseName, LockMode.READ)) {
            log.info("stopClusterWriteDB, tryLockSuccess=true");
            return;
        }
        throw new RuntimeException("Stop source writing failed");
    }

    public void restoreClusterWriteDB(String jobId) {
        this.checkModeConfig();
        log.info("restoreClusterWriteDB for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyManualMode(jobConfig);
        String databaseName = jobConfig.getDatabaseName();
        this.restoreClusterWriteDB(databaseName, jobId);
    }

    public void restoreClusterWriteDB(String databaseName, String jobId) {
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        if (lockContext.isLocked(databaseName)) {
            log.info("restoreClusterWriteDB, before releaseLock, databaseName={}, jobId={}", (Object)databaseName, (Object)jobId);
            lockContext.releaseLock(databaseName);
            return;
        }
        log.info("restoreClusterWriteDB, isLocked false, databaseName={}", (Object)databaseName);
    }

    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
        this.checkModeConfig();
        return DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each -> {
            DataConsistencyCheckAlgorithmInfo result = new DataConsistencyCheckAlgorithmInfo();
            result.setType(each.getType());
            result.setDescription(each.getDescription());
            result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
            return result;
        }).collect(Collectors.toList());
    }

    public boolean isDataConsistencyCheckNeeded(String jobId) {
        log.info("isDataConsistencyCheckNeeded for job {}", (Object)jobId);
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(jobId);
        return this.isDataConsistencyCheckNeeded(jobConfig);
    }

    public boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        return this.isDataConsistencyCheckNeeded(ruleAlteredContext);
    }

    private boolean isDataConsistencyCheckNeeded(RuleAlteredContext ruleAlteredContext) {
        return null != ruleAlteredContext.getDataConsistencyCalculateAlgorithm();
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId) {
        this.checkModeConfig();
        log.info("Data consistency check for job {}", (Object)jobId);
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(this.getElasticJobConfigPOJO(jobId));
        this.verifyDataConsistencyCheck(jobConfig);
        return this.dataConsistencyCheck(jobConfig);
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig) {
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        if (!this.isDataConsistencyCheckNeeded(ruleAlteredContext)) {
            log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
            return Collections.emptyMap();
        }
        return this.dataConsistencyCheck(jobConfig, ruleAlteredContext.getDataConsistencyCalculateAlgorithm());
    }

    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps) {
        this.checkModeConfig();
        log.info("Data consistency check for job {}, algorithmType: {}", (Object)jobId, (Object)algorithmType);
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(this.getElasticJobConfigPOJO(jobId));
        this.verifyDataConsistencyCheck(jobConfig);
        return this.dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
    }

    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig, DataConsistencyCalculateAlgorithm calculator) {
        String jobId = jobConfig.getJobId();
        Map<String, DataConsistencyCheckResult> result = new DataConsistencyChecker(jobConfig).check(calculator);
        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", new Object[]{jobId, calculator.getType(), result});
        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, this.aggregateDataConsistencyCheckResults(jobId, result));
        return result;
    }

    private void verifyDataConsistencyCheck(RuleAlteredJobConfiguration jobConfig) {
        this.verifyManualMode(jobConfig);
    }

    public boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResults) {
        if (checkResults.isEmpty()) {
            return false;
        }
        for (Map.Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
            DataConsistencyCheckResult checkResult = entry.getValue();
            boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
            boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
            if (isCountMatched && isContentMatched) continue;
            log.error("Scaling job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", new Object[]{jobId, entry.getKey(), isCountMatched, isContentMatched});
            return false;
        }
        return true;
    }

    public void switchClusterConfiguration(String jobId) {
        this.checkModeConfig();
        log.info("Switch cluster configuration for job {}", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        RuleAlteredJobConfiguration jobConfig = this.getJobConfig(jobConfigPOJO);
        this.verifyManualMode(jobConfig);
        this.verifyJobNotStopped(jobConfigPOJO);
        this.verifyJobNotCompleted(jobConfig);
        this.switchClusterConfiguration(jobConfig);
    }

    public void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig) {
        Optional<Boolean> checkResult;
        String jobId = jobConfig.getJobId();
        RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
        if (!(!this.isDataConsistencyCheckNeeded(ruleAlteredContext) || (checkResult = repositoryAPI.getJobCheckResult(jobId)).isPresent() && checkResult.get().booleanValue())) {
            throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
        }
        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(jobConfig.getDatabaseName(), jobConfig.getActiveVersion().intValue(), jobConfig.getNewVersion().intValue());
        ShardingSphereEventBus.getInstance().post((Object)taskFinishedEvent);
        RuleAlteredJobSchedulerCenter.updateJobStatus(jobId, JobStatus.FINISHED);
        for (int each : repositoryAPI.getShardingItems(jobId)) {
            repositoryAPI.updateShardingJobStatus(jobId, each, JobStatus.FINISHED);
        }
        RuleAlteredJobCenter.stop(jobId);
        this.stop(jobId);
    }

    public void reset(String jobId) {
        this.checkModeConfig();
        log.info("Scaling job {} reset target table", (Object)jobId);
        JobConfigurationPOJO jobConfigPOJO = this.getElasticJobConfigPOJO(jobId);
        this.verifyJobStopped(jobConfigPOJO);
        try {
            new ScalingEnvironmentManager().cleanupTargetTables(this.getJobConfig(jobConfigPOJO));
        }
        catch (SQLException ex) {
            throw new PipelineJobExecutionException("Reset target table failed for job " + jobId, ex);
        }
    }

    public RuleAlteredJobConfiguration getJobConfig(String jobId) {
        return this.getJobConfig(this.getElasticJobConfigPOJO(jobId));
    }

    private RuleAlteredJobConfiguration getJobConfig(JobConfigurationPOJO jobConfigPOJO) {
        return RuleAlteredJobConfigurationSwapper.swapToObject((String)jobConfigPOJO.getJobParameter());
    }
}

