Skip to content

Commit

Permalink
Do not recalculate rules when the rule has not changed, and the value…
Browse files Browse the repository at this point in the history
…s for the same time period has not changed.
  • Loading branch information
piotrczarnas committed Oct 25, 2024
1 parent fea436e commit 9200e7c
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 93 deletions.
26 changes: 26 additions & 0 deletions dqops/src/main/java/com/dqops/checks/AbstractCheckSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import lombok.ToString;
import tech.tablesaw.api.IntColumn;

import java.time.Instant;
import java.util.Objects;

/**
Expand Down Expand Up @@ -121,6 +122,13 @@ public abstract class AbstractCheckSpec<S extends AbstractSensorParametersSpec,
@JsonIgnore
private boolean defaultCheck;

/**
* Special field filled with the timestamp of the last modification of the table-level or column-level policy YAML file, if this check
* was applied from a data quality policy. It is used to detect the modification time of a check to avoid recalculating rules.
*/
@JsonIgnore
private Instant policyLastModified;

/**
* Returns the schedule configuration for running the checks automatically.
* @return Schedule configuration.
Expand Down Expand Up @@ -310,6 +318,24 @@ public void setDefaultCheck(boolean defaultCheck) {
this.defaultCheck = defaultCheck;
}

/**
* Returns the timestamp of the YAML file modification date of the DQ policy file from which this check was copied.
* Filled only when this check was applied form a DQ policy.
* @return DQ policy file modification timestamp.
*/
public Instant getPolicyLastModified() {
return policyLastModified;
}

/**
* Sets the timestamp when the check was modified in the DQ policy (for DQ policy checks only).
* @param policyLastModified DQ policy YAML file modification date.
*/
public void setPolicyLastModified(Instant policyLastModified) {
this.setDirtyIf(!Objects.equals(this.policyLastModified, policyLastModified));
this.policyLastModified = policyLastModified;
}

/**
* Calls a visitor (using a visitor design pattern) that returns a result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.base.Strings;
import lombok.EqualsAndHashCode;

import java.time.Instant;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -168,11 +169,13 @@ public boolean hasAnyConfiguredChecks() {
* @param parentTableSpec The specification of the parent table. Used to verify if the timestamp columns are correctly configured for timeliness checks.
* @param columnDataTypeCategory Detected data type of a column, if we are applying it on a column.
* @param dialectSettings Dialect settings.
* @param policyLastModified The timestamp when the DQ policy file was last modified.
*/
public void copyChecksToContainer(AbstractRootChecksContainerSpec targetChecksContainer,
TableSpec parentTableSpec,
DataTypeCategory columnDataTypeCategory,
ProviderDialectSettings dialectSettings) {
ProviderDialectSettings dialectSettings,
Instant policyLastModified) {
if (this.isDefault()) {
return;
}
Expand Down Expand Up @@ -223,6 +226,7 @@ public void copyChecksToContainer(AbstractRootChecksContainerSpec targetChecksCo

AbstractCheckSpec<?,?,?,?> targetCheckCloned = defaultCheck.deepClone();
targetCheckCloned.setDefaultCheck(true);
targetCheckCloned.setPolicyLastModified(policyLastModified);
FieldInfo targetCategoryCheckFieldInfo = targetCategoryChildMap.getReflectionClassInfo().getFieldByYamlName(defaultChecksEntry.getChildName());
targetCategoryCheckFieldInfo.setFieldValue(targetCheckCloned, targetCategoryContainer);
}
Expand All @@ -244,6 +248,7 @@ public void copyChecksToContainer(AbstractRootChecksContainerSpec targetChecksCo

CustomCheckSpec clonedTargetCustomCheck = (CustomCheckSpec)defaultCategoryCustomCheckKeyValue.getValue().deepClone();
clonedTargetCustomCheck.setDefaultCheck(true);
clonedTargetCustomCheck.setPolicyLastModified(policyLastModified);
targetCategoryCustomChecks.put(customCheckName, clonedTargetCustomCheck);
}
}
Expand All @@ -266,6 +271,7 @@ public void copyChecksToContainer(AbstractRootChecksContainerSpec targetChecksCo

CustomCheckSpec clonedTargetCustomCheck = (CustomCheckSpec)defaultCustomCheckKeyValue.getValue().deepClone();
clonedTargetCustomCheck.setDefaultCheck(true);
clonedTargetCustomCheck.setPolicyLastModified(policyLastModified);
targetCustomChecks.put(customCheckName, clonedTargetCustomCheck);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void applyDefaultChecksOnTableOnly(ConnectionSpec connectionSpec,
continue;
}

defaultChecksPattern.applyOnTable(targetTableSpec, providerDialectSettings);
defaultChecksPattern.applyOnTable(targetTableSpec, providerDialectSettings, tableQualityPolicyWrapper.getLastModified());
}
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public void applyDefaultChecksOnColumn(ConnectionSpec connectionSpec,
continue;
}

defaultChecksPattern.applyOnColumn(targetTableSpec, targetColumnSpec, providerDialectSettings);
defaultChecksPattern.applyOnColumn(targetTableSpec, targetColumnSpec, providerDialectSettings, columnQualityPolicyWrapper.getLastModified());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import tech.tablesaw.api.*;
import tech.tablesaw.columns.Column;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -249,13 +250,13 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont
.collect(Collectors.toList());
executeSingleTableChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken,
checkExecutionSummary, singleTableChecks, tableSpec, outputSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot,
allRuleEvaluationResultsTable, allErrorsTable, executionStatistics, checksForErrorSampling, collectErrorSamples);
allRuleEvaluationResultsTable, allErrorsTable, executionStatistics, checksForErrorSampling, collectErrorSamples, targetTable.getLastModified());

List<AbstractCheckSpec<?, ?, ?, ?>> tableComparisonChecks = checks.stream().filter(c -> c.isTableComparisonCheck())
.collect(Collectors.toList());
executeTableComparisonChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken,
checkExecutionSummary, tableComparisonChecks, tableSpec, outputSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot,
allRuleEvaluationResultsTable, allErrorsTable, executionStatistics);
allRuleEvaluationResultsTable, allErrorsTable, executionStatistics, targetTable.getLastModified());

if (outputSensorReadoutsSnapshot.getTableDataChanges().hasChanges() && !dummySensorExecution) {
allNormalizedSensorResultsTable.removeColumns(severityColumnTemporary); // removed, it was temporary
Expand Down Expand Up @@ -326,14 +327,15 @@ public void executeSingleTableChecks(
Table allErrorsTable,
TableChecksExecutionStatistics executionStatistics,
List<AbstractCheckSpec<?, ?, ?, ?>> checksNotPassedForErrorCollection,
Boolean collectErrorSamples) {
Boolean collectErrorSamples,
Instant tableYamlLastModified) {
if (checks.isEmpty()) {
return;
}

List<SensorPrepareResult> allPreparedSensors = this.prepareSensors(
checks, executionContext, userHome, tableSpec, userTimeWindowFilters, progressListener,
allErrorsTable, checkExecutionSummary, executionStatistics, jobCancellationToken);
allErrorsTable, checkExecutionSummary, executionStatistics, jobCancellationToken, tableYamlLastModified);

GroupedSensorsCollection groupedSensorsCollection = new GroupedSensorsCollection(this.dqoSensorLimitsConfigurationProperties.getMaxMergedQueries());
groupedSensorsCollection.addAllPreparedSensors(allPreparedSensors);
Expand Down Expand Up @@ -487,7 +489,8 @@ public void executeTableComparisonChecks(
CheckResultsSnapshot checkResultsSnapshot,
Table allRuleEvaluationResultsTable,
Table allErrorsTable,
TableChecksExecutionStatistics executionStatistics) {
TableChecksExecutionStatistics executionStatistics,
Instant tableYamlLastModified) {
if (checks.isEmpty()) {
return;
}
Expand All @@ -498,7 +501,7 @@ public void executeTableComparisonChecks(

List<SensorPrepareResult> allPreparedSensorsOnComparedTable = this.prepareSensors(
checks, executionContext, userHome, tableSpec, userTimeWindowFilters, progressListener,
allErrorsTable, checkExecutionSummary, executionStatistics, jobCancellationToken);
allErrorsTable, checkExecutionSummary, executionStatistics, jobCancellationToken, tableYamlLastModified);

GroupedSensorsCollection groupedSensorsCollectionOnComparedTable = new GroupedSensorsCollection(this.dqoSensorLimitsConfigurationProperties.getMaxMergedQueries());
groupedSensorsCollectionOnComparedTable.addAllPreparedSensors(allPreparedSensorsOnComparedTable);
Expand All @@ -509,7 +512,7 @@ public void executeTableComparisonChecks(

List<SensorPrepareResult> allPreparedSensorsOnReferenceTables = this.prepareComparisonSensorsOnReferenceTable(
checks, executionContext, userHome, userTimeWindowFilters, progressListener,
checkExecutionSummary, executionStatistics, jobCancellationToken);
checkExecutionSummary, executionStatistics, jobCancellationToken, tableYamlLastModified);

GroupedSensorsCollection groupedSensorsCollectionOnReferenceTables = new GroupedSensorsCollection(this.dqoSensorLimitsConfigurationProperties.getMaxMergedQueries());
groupedSensorsCollectionOnReferenceTables.addAllPreparedSensors(allPreparedSensorsOnReferenceTables);
Expand Down Expand Up @@ -634,6 +637,7 @@ public void executeTableComparisonChecks(
* @param checkExecutionSummary Check execution summary where results are added.
* @param executionStatistics Execution statistics - counts of checks and errors.
* @param jobCancellationToken Job cancellation token - to cancel the preparation by the user.
* @param tableYamlLastModified The timestamp when the table YAML was last modified.
* @return List of prepared sensors.
*/
public List<SensorPrepareResult> prepareSensors(Collection<AbstractCheckSpec<?, ?, ?, ?>> checks,
Expand All @@ -645,7 +649,8 @@ public List<SensorPrepareResult> prepareSensors(Collection<AbstractCheckSpec<?,
Table allErrorsTable,
CheckExecutionSummary checkExecutionSummary,
TableChecksExecutionStatistics executionStatistics,
JobCancellationToken jobCancellationToken) {
JobCancellationToken jobCancellationToken,
Instant tableYamlLastModified) {
List<SensorPrepareResult> sensorPrepareResults = new ArrayList<>();
int sensorResultId = 0;

Expand All @@ -657,7 +662,7 @@ public List<SensorPrepareResult> prepareSensors(Collection<AbstractCheckSpec<?,
executionStatistics.incrementExecutedChecksCount(1);

try {
SensorExecutionRunParameters sensorRunParameters = createSensorRunParameters(userHome, targetTableSpec, checkSpec, userTimeWindowFilters);
SensorExecutionRunParameters sensorRunParameters = createSensorRunParameters(userHome, targetTableSpec, checkSpec, userTimeWindowFilters, tableYamlLastModified);
if (!sensorRunParameters.isSuccess()) {
this.userErrorLogger.logCheck(sensorRunParameters.toString() + " failed to capture the initial configuration, error: " +
(sensorRunParameters.getSensorConfigurationException() != null ?
Expand Down Expand Up @@ -747,6 +752,7 @@ public List<SensorPrepareResult> prepareSensors(Collection<AbstractCheckSpec<?,
* @param checkExecutionSummary Check execution summary where results are added.
* @param executionStatistics Execution statistics - counts of checks and errors.
* @param jobCancellationToken Job cancellation token - to cancel the preparation by the user.
* @param tableYamlLastModified The timestamp when teh table YAML was last modified.
* @return List of prepared sensors.
*/
public List<SensorPrepareResult> prepareComparisonSensorsOnReferenceTable(Collection<AbstractCheckSpec<?, ?, ?, ?>> checks,
Expand All @@ -756,7 +762,8 @@ public List<SensorPrepareResult> prepareComparisonSensorsOnReferenceTable(Collec
CheckExecutionProgressListener progressListener,
CheckExecutionSummary checkExecutionSummary,
TableChecksExecutionStatistics executionStatistics,
JobCancellationToken jobCancellationToken) {
JobCancellationToken jobCancellationToken,
Instant tableYamlLastModified) {
List<SensorPrepareResult> sensorPrepareResults = new ArrayList<>();
int sensorResultId = 0;

Expand All @@ -766,7 +773,7 @@ public List<SensorPrepareResult> prepareComparisonSensorsOnReferenceTable(Collec
}

try {
SensorExecutionRunParameters sensorRunParameters = createSensorRunParametersToReferenceTable(userHome, checkSpec, userTimeWindowFilters);
SensorExecutionRunParameters sensorRunParameters = createSensorRunParametersToReferenceTable(userHome, checkSpec, userTimeWindowFilters, tableYamlLastModified);
if (!sensorRunParameters.isSuccess()) {
this.userErrorLogger.logCheck(sensorRunParameters.toString() + " failed to capture the initial configuration, error: " +
(sensorRunParameters.getSensorConfigurationException() != null ?
Expand Down Expand Up @@ -950,12 +957,14 @@ public List<SensorExecutionResult> executeSensors(GroupedSensorsCollection group
* @param tableSpec Target table to query.
* @param checkSpec Check specification.
* @param userTimeWindowFilters Optional user time window filters, to run the checks for a given time period.
* @param tableYamlLastModified The timestamp when the table YAML was last modified.
* @return Sensor run parameters.
*/
public SensorExecutionRunParameters createSensorRunParameters(UserHome userHome,
TableSpec tableSpec,
AbstractCheckSpec<?,?,?,?> checkSpec,
TimeWindowFilterParameters userTimeWindowFilters) {
TimeWindowFilterParameters userTimeWindowFilters,
Instant tableYamlLastModified) {
try {
HierarchyId checkHierarchyId = checkSpec.getHierarchyId();
ConnectionWrapper connectionWrapper = userHome.findConnectionFor(checkHierarchyId);
Expand Down Expand Up @@ -1027,7 +1036,7 @@ public SensorExecutionRunParameters createSensorRunParameters(UserHome userHome,

SensorExecutionRunParameters sensorRunParameters = this.sensorExecutionRunParametersFactory.createSensorParameters(
userHome, connectionSpec, tableSpec, columnSpec, checkSpec, customCheckDefinitionSpec, checkType, dataGroupingConfigurationForComparison,
tableComparisonConfigurationSpec, timeSeriesConfigurationSpec, userTimeWindowFilters, dialectSettings);
tableComparisonConfigurationSpec, timeSeriesConfigurationSpec, userTimeWindowFilters, dialectSettings, tableYamlLastModified);
sensorRunParameters.appendAdditionalFilter(extraComparisonFilter);
return sensorRunParameters;
}
Expand All @@ -1044,11 +1053,13 @@ public SensorExecutionRunParameters createSensorRunParameters(UserHome userHome,
* @param userHome User home with the metadata.
* @param checkSpec Table comparison check specification on the compared table.
* @param userTimeWindowFilters Optional user time window filters, to run the checks for a given time period.
* @param tableYamlLastModified The timestamp when the table YAML as last modified.
* @return Sensor run parameters.
*/
public SensorExecutionRunParameters createSensorRunParametersToReferenceTable(UserHome userHome,
AbstractCheckSpec<?,?,?,?> checkSpec,
TimeWindowFilterParameters userTimeWindowFilters) {
TimeWindowFilterParameters userTimeWindowFilters,
Instant tableYamlLastModified) {
try {
HierarchyId checkHierarchyId = checkSpec.getHierarchyId();
TableWrapper comparedTableWrapper = userHome.findTableFor(checkHierarchyId);
Expand Down Expand Up @@ -1145,7 +1156,7 @@ public SensorExecutionRunParameters createSensorRunParametersToReferenceTable(Us

SensorExecutionRunParameters sensorRunParameters = this.sensorExecutionRunParametersFactory.createSensorParameters(
userHome, referencedConnectionSpec, referencedTableSpec, referencedColumnSpec, checkSpec, customCheckDefinitionSpec, checkType, referencedTableGroupingConfiguration,
tableComparisonConfigurationSpec, timeSeriesConfigurationSpec, timeWindowConfigurationFromComparedTable, referencedDialectSettings);
tableComparisonConfigurationSpec, timeSeriesConfigurationSpec, timeWindowConfigurationFromComparedTable, referencedDialectSettings, tableYamlLastModified);
sensorRunParameters.setTableComparisonConfiguration(tableComparisonConfigurationSpec);
sensorRunParameters.setReferenceColumnName(referencedColumnName);
sensorRunParameters.appendAdditionalFilter(tableComparisonConfigurationSpec.getReferenceTableFilter());
Expand Down
Loading

0 comments on commit 9200e7c

Please sign in to comment.