/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;

import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RuleAlteredJobWorker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RuleAlteredJobWorker.class);
    private static final RuleAlteredJobWorker INSTANCE = new RuleAlteredJobWorker();
    private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
    private static final AtomicBoolean WORKER_INITIALIZED = new AtomicBoolean(false);

    public static boolean isOnRuleAlteredActionEnabled(RuleConfiguration ruleConfig) {
        if (null == ruleConfig) {
            return false;
        }
        Optional detector = RuleAlteredDetectorFactory.findInstance((RuleConfiguration)ruleConfig);
        return detector.isPresent() && ((RuleAlteredDetector)detector.get()).getOnRuleAlteredActionConfig(ruleConfig).isPresent();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void initWorkerIfNecessary() {
        if (WORKER_INITIALIZED.get()) {
            return;
        }
        AtomicBoolean atomicBoolean = WORKER_INITIALIZED;
        synchronized (atomicBoolean) {
            if (WORKER_INITIALIZED.get()) {
                return;
            }
            log.info("start worker initialization");
            ShardingSphereEventBus.getInstance().register((Object)INSTANCE);
            new FinishedCheckJobExecutor().start();
            new PipelineJobExecutor().start();
            WORKER_INITIALIZED.set(true);
            log.info("worker initialization done");
        }
    }

    public static RuleAlteredContext createRuleAlteredContext(RuleAlteredJobConfiguration jobConfig) {
        YamlRootConfiguration targetRootConfig = RuleAlteredJobWorker.getYamlRootConfig(jobConfig);
        YamlRuleConfiguration yamlRuleConfig = null;
        for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
            if (!jobConfig.getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) continue;
            yamlRuleConfig = each;
            break;
        }
        if (null == yamlRuleConfig) {
            throw new PipelineJobCreationException("could not find altered rule");
        }
        RuleConfiguration ruleConfig = SWAPPER_ENGINE.swapToRuleConfiguration(yamlRuleConfig);
        Optional detector = RuleAlteredDetectorFactory.findInstance((RuleConfiguration)ruleConfig);
        Preconditions.checkState((boolean)detector.isPresent());
        Optional onRuleAlteredActionConfig = ((RuleAlteredDetector)detector.get()).getOnRuleAlteredActionConfig(ruleConfig);
        if (!onRuleAlteredActionConfig.isPresent()) {
            log.error("rule altered action enabled but actor is not configured, ignored, ruleConfig={}", (Object)ruleConfig);
            throw new PipelineJobCreationException("rule altered actor not configured");
        }
        return new RuleAlteredContext(jobConfig.getJobId(), (OnRuleAlteredActionConfiguration)onRuleAlteredActionConfig.get());
    }

    private static YamlRootConfiguration getYamlRootConfig(RuleAlteredJobConfiguration jobConfig) {
        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance((String)jobConfig.getTarget().getType(), (String)jobConfig.getTarget().getParameter());
        if (targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
            return ((ShardingSpherePipelineDataSourceConfiguration)targetDataSourceConfig).getRootConfig();
        }
        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance((String)jobConfig.getSource().getType(), (String)jobConfig.getSource().getParameter());
        return ((ShardingSpherePipelineDataSourceConfiguration)sourceDataSourceConfig).getRootConfig();
    }

    @Subscribe
    public void start(StartScalingEvent event) {
        log.info("Start scaling job by {}", (Object)event);
        if (this.hasUncompletedJobOfSameDatabaseName(event.getDatabaseName())) {
            log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
            return;
        }
        Optional<RuleAlteredJobConfiguration> jobConfig = this.createJobConfig(event);
        if (jobConfig.isPresent()) {
            RuleAlteredJobAPIFactory.getInstance().start(jobConfig.get());
        } else {
            log.info("Switch rule configuration immediately.");
            ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getDatabaseName(), event.getActiveVersion(), event.getNewVersion());
            ShardingSphereEventBus.getInstance().post((Object)taskFinishedEvent);
            ShardingSphereEventBus.getInstance().post((Object)new ScalingReleaseDatabaseLevelLockEvent(event.getDatabaseName()));
        }
    }

    private Optional<RuleAlteredJobConfiguration> createJobConfig(StartScalingEvent event) {
        YamlRootConfiguration sourceRootConfig = this.getYamlRootConfiguration(event.getDatabaseName(), event.getSourceDataSource(), event.getSourceRule());
        YamlRootConfiguration targetRootConfig = this.getYamlRootConfiguration(event.getDatabaseName(), event.getTargetDataSource(), event.getTargetRule());
        HashMap<String, List> alteredRuleYamlClassNameTablesMap = new HashMap<String, List>();
        for (Pair<YamlRuleConfiguration, YamlRuleConfiguration> each : this.groupSourceTargetRuleConfigsByType(sourceRootConfig.getRules(), targetRootConfig.getRules())) {
            YamlRuleConfiguration yamlRuleConfig = null == each.getLeft() ? (YamlRuleConfiguration)each.getRight() : (YamlRuleConfiguration)each.getLeft();
            Optional detector = RuleAlteredDetectorFactory.findInstance((YamlRuleConfiguration)yamlRuleConfig);
            if (!detector.isPresent()) continue;
            List ruleAlteredLogicTables = ((RuleAlteredDetector)detector.get()).findRuleAlteredLogicTables((YamlRuleConfiguration)each.getLeft(), (YamlRuleConfiguration)each.getRight(), sourceRootConfig.getDataSources(), targetRootConfig.getDataSources());
            log.info("type={}, ruleAlteredLogicTables={}", (Object)yamlRuleConfig.getClass().getName(), (Object)ruleAlteredLogicTables);
            if (ruleAlteredLogicTables.isEmpty()) continue;
            alteredRuleYamlClassNameTablesMap.put(yamlRuleConfig.getClass().getName(), ruleAlteredLogicTables);
        }
        if (alteredRuleYamlClassNameTablesMap.isEmpty()) {
            log.error("no altered rule");
            throw new PipelineJobCreationException("no altered rule");
        }
        if (alteredRuleYamlClassNameTablesMap.size() > 1) {
            log.error("more than 1 rule altered");
            throw new PipelineJobCreationException("more than 1 rule altered");
        }
        YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration();
        result.setDatabaseName(event.getDatabaseName());
        result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
        result.setActiveVersion(Integer.valueOf(event.getActiveVersion()));
        result.setNewVersion(Integer.valueOf(event.getNewVersion()));
        result.setSource(this.createYamlPipelineDataSourceConfiguration(sourceRootConfig));
        result.setTarget(this.createYamlPipelineDataSourceConfiguration(targetRootConfig));
        result.extendConfiguration();
        return Optional.of(new RuleAlteredJobConfigurationSwapper().swapToObject(result));
    }

    private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(Collection<YamlRuleConfiguration> sourceRules, Collection<YamlRuleConfiguration> targetRules) {
        Map sourceRulesMap = sourceRules.stream().collect(Collectors.toMap(Object::getClass, Function.identity()));
        Map targetRulesMap = targetRules.stream().collect(Collectors.toMap(Object::getClass, Function.identity()));
        LinkedList<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> result = new LinkedList<Pair<YamlRuleConfiguration, YamlRuleConfiguration>>();
        for (Map.Entry entry : sourceRulesMap.entrySet()) {
            YamlRuleConfiguration targetRule = (YamlRuleConfiguration)targetRulesMap.get(entry.getKey());
            result.add((Pair<YamlRuleConfiguration, YamlRuleConfiguration>)Pair.of((Object)((YamlRuleConfiguration)entry.getValue()), (Object)targetRule));
        }
        for (Map.Entry entry : targetRulesMap.entrySet()) {
            if (sourceRulesMap.containsKey(entry.getKey())) continue;
            result.add((Pair<YamlRuleConfiguration, YamlRuleConfiguration>)Pair.of(null, (Object)((YamlRuleConfiguration)entry.getValue())));
        }
        return result;
    }

    private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(YamlRootConfiguration yamlConfig) {
        ShardingSpherePipelineDataSourceConfiguration config = new ShardingSpherePipelineDataSourceConfiguration(yamlConfig);
        YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
        result.setType(config.getType());
        result.setParameter(config.getParameter());
        return result;
    }

    private YamlRootConfiguration getYamlRootConfiguration(String databaseName, String dataSources, String rules) {
        YamlRootConfiguration result = new YamlRootConfiguration();
        result.setDatabaseName(databaseName);
        Map yamlDataSources = (Map)YamlEngine.unmarshal((String)dataSources, Map.class);
        result.setDataSources(yamlDataSources);
        Collection yamlRuleConfigs = (Collection)YamlEngine.unmarshal((String)rules, Collection.class, (boolean)true);
        result.setRules(yamlRuleConfigs);
        return result;
    }

    public static TaskConfiguration buildTaskConfig(RuleAlteredJobConfiguration jobConfig, int jobShardingItem, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
        return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig, jobShardingItem, onRuleAlteredActionConfig);
    }

    private boolean hasUncompletedJobOfSameDatabaseName(String databaseName) {
        boolean result = false;
        for (JobInfo each : RuleAlteredJobAPIFactory.getInstance().list()) {
            RuleAlteredJobConfiguration jobConfig;
            if (RuleAlteredJobAPIFactory.getInstance().getProgress(each.getJobId()).values().stream().allMatch(progress -> null != progress && progress.getStatus().equals((Object)JobStatus.FINISHED)) || !databaseName.equals((jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject((String)each.getJobParameter())).getDatabaseName())) continue;
            result = true;
            break;
        }
        return result;
    }

    @Subscribe
    public void scalingReleaseDatabaseLevelLock(ScalingReleaseDatabaseLevelLockEvent event) {
        String databaseName = event.getDatabaseName();
        try {
            this.restoreSourceWriting(databaseName);
        }
        catch (RuntimeException ex) {
            log.error("restore source writing failed, databaseName={}", (Object)databaseName, (Object)ex);
        }
        PipelineSimpleLock.getInstance().releaseLock(event.getDatabaseName());
    }

    private void restoreSourceWriting(String databaseName) {
        log.info("restoreSourceWriting, databaseName={}", (Object)databaseName);
        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
        if (lockContext.isLocked(databaseName)) {
            log.info("Source writing is still stopped on database '{}', restore it now", (Object)databaseName);
            lockContext.releaseLock(databaseName);
        }
    }
}

