Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package za.co.absa.hermes.datasetComparison

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.commons.spark.SchemaUtils
import za.co.absa.hermes.datasetComparison.config.{DatasetComparisonConfig, TypesafeConfig}
import za.co.absa.hermes.utils.HelperFunctions

/**
Expand All @@ -36,14 +35,12 @@ import za.co.absa.hermes.utils.HelperFunctions
* @param optionalSchema Optional schema to cherry-pick columns form the two dataframes to compare. For example, if you
* have a timestamp column that will never be the same, you provide a schema without that timestamp
* and it will not be compared.
* @param sparkSession Implicit spark session.
*/
class DatasetComparator(dataFrameReference: DataFrame,
dataFrameActual: DataFrame,
keys: Set[String] = Set.empty[String],
config: DatasetComparisonConfig = new TypesafeConfig(None),
optionalSchema: Option[StructType] = None)
(implicit sparkSession: SparkSession) {
config: DatasetComparisonConfig = DatasetComparisonConfig.default,
optionalSchema: Option[StructType] = None){

/**
* Case class created for the single purpose of holding a pair of reference and tested data in any form together.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

package za.co.absa.hermes.datasetComparison.config
package za.co.absa.hermes.datasetComparison

import com.typesafe.config.{Config, ConfigFactory}
import scala.util.{Failure, Success, Try}

abstract class DatasetComparisonConfig {
val errorColumnName: String
val actualPrefix: String
val expectedPrefix: String
val allowDuplicates: Boolean
case class DatasetComparisonConfig(errorColumnName: String,
actualPrefix: String,
expectedPrefix: String,
allowDuplicates: Boolean) {

def validate(): Try[DatasetComparisonConfig] = {

Expand All @@ -38,9 +38,9 @@ abstract class DatasetComparisonConfig {
}

for {
_errColumnName <- validateColumnName(errorColumnName, "errorColumnName")
_actualPrefix <- validateColumnName(actualPrefix, "actualPrefix")
_expectedPrefix <- validateColumnName(expectedPrefix, "expectedPrefix")
_ <- validateColumnName(errorColumnName, "errorColumnName")
_ <- validateColumnName(actualPrefix, "actualPrefix")
_ <- validateColumnName(expectedPrefix, "expectedPrefix")
} yield this
}

Expand All @@ -52,3 +52,21 @@ abstract class DatasetComparisonConfig {
| Allow duplicities in dataframes (allowDuplicates) -> "$allowDuplicates"""".stripMargin
}
}

object DatasetComparisonConfig {
def fromTypeSafeConfig(path: Option[String] = None): DatasetComparisonConfig = {
val conf: Config = path match {
case Some(x) => ConfigFactory.load(x)
case None => ConfigFactory.load()
}

val errorColumnName: String = conf.getString("dataset-comparison.errColumn")
val actualPrefix: String = conf.getString("dataset-comparison.actualPrefix")
val expectedPrefix: String = conf.getString("dataset-comparison.expectedPrefix")
val allowDuplicates: Boolean = conf.getBoolean("dataset-comparison.allowDuplicates")

DatasetComparisonConfig(errorColumnName, actualPrefix, expectedPrefix, allowDuplicates)
}

def default: DatasetComparisonConfig = fromTypeSafeConfig()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructType}
import za.co.absa.hermes.datasetComparison.cliUtils.{CliParameters, CliParametersParser}
import za.co.absa.hermes.datasetComparison.config.{DatasetComparisonConfig, TypesafeConfig}
import za.co.absa.hermes.datasetComparison.dataFrame.Utils
import za.co.absa.hermes.utils.SparkCompatibility

Expand Down Expand Up @@ -89,7 +88,7 @@ object DatasetComparisonJob {
}

def getConfig(configPath: Option[String]): DatasetComparisonConfig = {
val config = new TypesafeConfig(configPath).validate().get
val config = DatasetComparisonConfig.fromTypeSafeConfig(configPath).validate().get
scribe.info(config.getLoggableString)
config
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{StringType, StructType}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import za.co.absa.hermes.datasetComparison.cliUtils.CliParameters
import za.co.absa.hermes.datasetComparison.config.ManualConfig
import za.co.absa.hermes.datasetComparison.dataFrame.{Parameters, Utils}
import za.co.absa.hermes.utils.SparkTestBase

class DatasetComparatorSuite extends FunSuite with SparkTestBase with BeforeAndAfterAll {
private val manualConfig = DatasetComparisonConfig("errCol", "actual", "expected", true)

test("Test a positive comparison") {
val cliOptions = new CliParameters(
Parameters("csv", Map("delimiter" -> ","), getClass.getResource("/dataSample2.csv").toString),
Expand All @@ -37,13 +38,6 @@ class DatasetComparatorSuite extends FunSuite with SparkTestBase with BeforeAndA
val df1 = Utils.loadDataFrame(cliOptions.referenceDataParameters)
val df2 = Utils.loadDataFrame(cliOptions.actualDataParameters)

val manualConfig = new ManualConfig(
"errCol",
"actual",
"expected",
true
)

val expectedResult = ComparisonResult(
10, 10, 0, 0, 10,
List(
Expand Down Expand Up @@ -75,13 +69,6 @@ class DatasetComparatorSuite extends FunSuite with SparkTestBase with BeforeAndA
val df1 = Utils.loadDataFrame(cliOptions.referenceDataParameters)
val df2 = Utils.loadDataFrame(cliOptions.actualDataParameters)

val manualConfig = new ManualConfig(
"errCol",
"actual",
"expected",
true
)

val expectedResult = ComparisonResult(
10, 10, 0, 0, 10,
List(
Expand Down Expand Up @@ -117,13 +104,6 @@ class DatasetComparatorSuite extends FunSuite with SparkTestBase with BeforeAndA
val df1 = Utils.loadDataFrame(cliOptions.referenceDataParameters)
val df2 = Utils.loadDataFrame(cliOptions.actualDataParameters)

val manualConfig = new ManualConfig(
"errCol",
"actual",
"expected",
true
)

val schema = new StructType()
.add("_c01", StringType, true)
.add("_c1", StringType, true)
Expand All @@ -148,13 +128,6 @@ class DatasetComparatorSuite extends FunSuite with SparkTestBase with BeforeAndA
val df1 = Utils.loadDataFrame(cliOptions.referenceDataParameters)
val df2 = Utils.loadDataFrame(cliOptions.actualDataParameters)

val manualConfig = new ManualConfig(
"errCol",
"actual",
"expected",
true
)

val result = new DatasetComparator(df1, df2, cliOptions.keys, manualConfig).compare
assert(9 == result.refRowCount)
assert(10 == result.newRowCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,35 @@
* limitations under the License.
*/

package za.co.absa.hermes.datasetComparison.config
package za.co.absa.hermes.datasetComparison

import org.scalatest.FunSuite

class TypeSafeConfigSuite extends FunSuite {
class DatasetComparisonConfigSuite extends FunSuite {
test("Manual Config Correct") {
val conf = DatasetComparisonConfig("errCol", "_actual", "_expected", false)
assert(conf.validate().isSuccess)
assert("errCol" == conf.errorColumnName)
assert("_actual" == conf.actualPrefix)
assert("_expected" == conf.expectedPrefix)
assert(!conf.allowDuplicates)
Comment on lines +25 to +28
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking values of fields directly set to a case class is not wrong per se, but unnecessary in my opinion.

The validation checking makes sense.

}

test("Manual Config Bad Column Name") {
val conf = DatasetComparisonConfig("errCol", "_actua l", "_expected", false)
assert(conf.validate().isFailure)
Comment on lines +32 to +33
Copy link

@dk1844 dk1844 May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the checking that validation failure occurs as expected could be done for all the fields

    assert(DatasetComparisonConfig("errCol", "_actua l", "_expected", false).validate().isFailure)
    assert(DatasetComparisonConfig("er{r}Col", "_actual", "_expected", false).validate().isFailure)
    assert(DatasetComparisonConfig("errCol", "_actual", "_expe=cted", false).validate().isFailure)

}

test("Default Config Loaded") {
val conf = new TypesafeConfig(None)
val conf = DatasetComparisonConfig.default
assert("errCol" == conf.errorColumnName)
assert("actual" == conf.actualPrefix)
assert("expected" == conf.expectedPrefix)
Comment on lines 38 to 40
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we have discussed this already before, but I still find this suboptimal. Sorry.

Suggested change
assert("errCol" == conf.errorColumnName)
assert("actual" == conf.actualPrefix)
assert("expected" == conf.expectedPrefix)
assert(conf.errorColumnName == "errCol")
assert(conf.actualPrefix == "actual")
assert(conf.expectedPrefix == "expected")

diverging from the conventional order results in incorrect scala-test based messages. E.g.:

assert("expectedValue" == "actualValue")

will generate output:

"[expected]Value" did not equal "[actual]Value"
ScalaTestFailureLocation: za.co.absa.hermes.datasetComparison.DatasetComparisonConfigSuite$$anonfun$1 at (DatasetComparisonConfigSuite.scala:25)
Expected :"[actual]Value"
Actual   :"[expected]Value"

This results in incorrect error messages and possible confusion of the reader.

(this issue appears elsewhere in the code, too)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay, I will switch it. It is just hard to kick the habit 👍

assert(!conf.allowDuplicates)
}

test("Config with provided path loaded") {
val conf = new TypesafeConfig(Some("confData/application.conf"))
val conf = DatasetComparisonConfig.fromTypeSafeConfig(Some("confData/application.conf"))

assert("errCol2" == conf.errorColumnName)
assert("actual2" == conf.actualPrefix)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ object InfoFileComparisonConfig {
case None => ConfigFactory.load()
}

import collection.JavaConversions._
import collection.JavaConverters._

val versionMetaKeys = conf.getStringList("info-file-comparison.atum-models.versionMetaKeys").toList
val keysToIgnore = conf.getStringList("info-file-comparison.atum-models.ignoredMetaKeys").toList
val versionMetaKeys = conf.getStringList("info-file-comparison.atum-models.versionMetaKeys").asScala.toList
val keysToIgnore = conf.getStringList("info-file-comparison.atum-models.ignoredMetaKeys").asScala.toList
InfoFileComparisonConfig(versionMetaKeys, keysToIgnore)
}

def empty: InfoFileComparisonConfig = InfoFileComparisonConfig(List.empty[String], List.empty[String])
val empty: InfoFileComparisonConfig = InfoFileComparisonConfig(List.empty[String], List.empty[String])
}