/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.check.consistency;

import com.google.common.base.Preconditions;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.mode.manager.ContextManager;

public final class DataConsistencyChecker {
    private final RuleAlteredJobConfiguration jobConfig;
    private final Collection<String> logicTableNames;
    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;

    public DataConsistencyChecker(RuleAlteredJobConfiguration jobConfig) {
        this.jobConfig = jobConfig;
        this.logicTableNames = jobConfig.splitLogicTableNames();
        ShardingSphereDatabase database = (ShardingSphereDatabase)PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabases().get(jobConfig.getDatabaseName());
        this.tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert((Map)database.getSchemas()));
    }

    public Map<String, DataConsistencyCheckResult> check(DataConsistencyCalculateAlgorithm calculator) {
        Map<String, DataConsistencyCountCheckResult> countCheckResult = this.checkCount();
        Map contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched) ? this.checkData(calculator) : Collections.emptyMap();
        LinkedHashMap<String, DataConsistencyCheckResult> result = new LinkedHashMap<String, DataConsistencyCheckResult>(countCheckResult.size());
        for (Map.Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
            result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
        }
        return result;
    }

    /*
     * Exception decompiling
     */
    private Map<String, DataConsistencyCountCheckResult> checkCount() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private DataConsistencyCountCheckResult checkCount(String table, PipelineDataSourceWrapper sourceDataSource, PipelineDataSourceWrapper targetDataSource, ThreadPoolExecutor executor) {
        try {
            Future<Long> sourceFuture = executor.submit(() -> this.count((DataSource)sourceDataSource, table, sourceDataSource.getDatabaseType()));
            Future<Long> targetFuture = executor.submit(() -> this.count((DataSource)targetDataSource, table, targetDataSource.getDatabaseType()));
            long sourceCount = sourceFuture.get();
            long targetCount = targetFuture.get();
            return new DataConsistencyCountCheckResult(sourceCount, targetCount);
        }
        catch (InterruptedException | ExecutionException ex) {
            throw new PipelineDataConsistencyCheckFailedException(String.format("Count check failed for table '%s'", table), ex);
        }
    }

    private String getJobIdDigest(String jobId) {
        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
    }

    /*
     * Exception decompiling
     */
    private long count(DataSource dataSource, String tableName, DatabaseType databaseType) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private Map<String, DataConsistencyContentCheckResult> checkData(DataConsistencyCalculateAlgorithm calculator) {
        this.decoratePipelineDataSourceConfiguration(calculator, this.jobConfig.getSource());
        PipelineDataSourceConfiguration sourceDataSourceConfig = this.jobConfig.getSource();
        this.decoratePipelineDataSourceConfiguration(calculator, this.jobConfig.getTarget());
        PipelineDataSourceConfiguration targetDataSourceConfig = this.jobConfig.getTarget();
        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build((String)("job-" + this.getJobIdDigest(this.jobConfig.getJobId()) + "-data-check-%d"));
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory);
        JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(this.jobConfig).getInputRateLimitAlgorithm();
        HashMap<String, DataConsistencyContentCheckResult> result = new HashMap<String, DataConsistencyContentCheckResult>(this.logicTableNames.size(), 1.0f);
        try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
             PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig);){
            String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
            String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
            for (String each : this.logicTableNames) {
                ShardingSphereTable table = this.getTableMetaData(this.jobConfig.getDatabaseName(), each);
                if (null == table) {
                    throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
                }
                Set<String> columnNames = table.getColumns().keySet();
                String uniqueKey = (String)table.getPrimaryKeyColumns().get(0);
                DataConsistencyCalculateParameter sourceParameter = this.buildParameter(sourceDataSource, this.tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
                DataConsistencyCalculateParameter targetParameter = this.buildParameter(targetDataSource, this.tableNameSchemaNameMapping, each, columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
                Iterator sourceCalculatedResults = calculator.calculate(sourceParameter).iterator();
                Iterator targetCalculatedResults = calculator.calculate(targetParameter).iterator();
                boolean contentMatched = true;
                while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
                    Object targetCalculatedResult;
                    if (null != inputRateLimitAlgorithm) {
                        inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, (Number)1);
                    }
                    Future<Object> sourceFuture = executor.submit(sourceCalculatedResults::next);
                    Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
                    Object sourceCalculatedResult = sourceFuture.get();
                    contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult = targetFuture.get());
                    if (contentMatched) continue;
                    break;
                }
                result.put(each, new DataConsistencyContentCheckResult(contentMatched));
            }
        }
        catch (InterruptedException | SQLException | ExecutionException ex) {
            throw new PipelineDataConsistencyCheckFailedException("Data check failed", ex);
        }
        finally {
            executor.shutdown();
            executor.shutdownNow();
        }
        return result;
    }

    private void decoratePipelineDataSourceConfiguration(DataConsistencyCalculateAlgorithm calculator, PipelineDataSourceConfiguration dataSourceConfig) {
        this.checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), dataSourceConfig.getDatabaseType().getType());
    }

    private void checkDatabaseTypeSupported(Collection<String> supportedDatabaseTypes, String databaseType) {
        if (!supportedDatabaseTypes.contains(databaseType)) {
            throw new PipelineDataConsistencyCheckFailedException("Database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
        }
    }

    private ShardingSphereTable getTableMetaData(String databaseName, String logicTableName) {
        ContextManager contextManager = PipelineContext.getContextManager();
        Preconditions.checkNotNull((Object)contextManager, (Object)"ContextManager null");
        ShardingSphereDatabase database = (ShardingSphereDatabase)contextManager.getMetaDataContexts().getMetaData().getDatabases().get(databaseName);
        if (null == database) {
            throw new RuntimeException("Can not get meta data by database name " + databaseName);
        }
        String schemaName = this.tableNameSchemaNameMapping.getSchemaName(logicTableName);
        ShardingSphereSchema schema = (ShardingSphereSchema)database.getSchemas().get(schemaName);
        if (null == schema) {
            throw new RuntimeException("Can not get schema by schema name " + schemaName + ", logicTableName=" + logicTableName);
        }
        return schema.get(logicTableName);
    }

    private DataConsistencyCalculateParameter buildParameter(PipelineDataSourceWrapper sourceDataSource, TableNameSchemaNameMapping tableNameSchemaNameMapping, String tableName, Collection<String> columnNames, String sourceDatabaseType, String targetDatabaseType, String uniqueKey) {
        return new DataConsistencyCalculateParameter(sourceDataSource, tableNameSchemaNameMapping, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
    }
}

