diff --git a/analytics-core/lib/cloud-store-sdk_2.12-1.4.6.jar b/analytics-core/lib/cloud-store-sdk_2.12-1.4.6.jar new file mode 100644 index 0000000..725eedf Binary files /dev/null and b/analytics-core/lib/cloud-store-sdk_2.12-1.4.6.jar differ diff --git a/analytics-core/pom.xml b/analytics-core/pom.xml index fc6a228..c1b5f33 100644 --- a/analytics-core/pom.xml +++ b/analytics-core/pom.xml @@ -219,9 +219,17 @@ --> - ${CLOUD_STORE_GROUP_ID} - ${CLOUD_STORE_ARTIFACT_ID} - ${CLOUD_STORE_VERSION} + + + + + + + org.sunbird + cloud-store-sdk_2.12 + 1.4.6 + system + ${project.basedir}/lib/cloud-store-sdk_2.12-1.4.6.jar com.microsoft.azure @@ -245,6 +253,31 @@ + + org.apache.jclouds + jclouds-all + 2.2.1 + + + com.jamesmurty.utils + java-xmlbuilder + + + org.yaml + snakeyaml + + + + + org.apache.tika + tika-core + 1.18 + + + org.apache.hadoop + hadoop-azure + 2.7.3 + com.microsoft.azure azure-storage diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/AzureDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/AzureDataFetcher.scala index d46a49d..307bd64 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/AzureDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/AzureDataFetcher.scala @@ -14,7 +14,7 @@ object AzureDataFetcher { val keys = for(query <- queries) yield { val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) { - Array("wasb://"+getBucket(query.bucket) + "@" + AppConf.getStorageKey("azure") + ".blob.core.windows.net" + "/" + getPrefix(query.prefix) + query.endDate.getOrElse("")) + Array("wasb://"+getBucket(query.bucket) + "@" + AppConf.getStorageKey + ".blob.core.windows.net" + "/" + getPrefix(query.prefix) + query.endDate.getOrElse("")) } else { getKeys(query); } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/CephS3DataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/CephS3DataFetcher.scala index 190f3f3..dc77dc2 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/CephS3DataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/CephS3DataFetcher.scala @@ -16,7 +16,7 @@ object CephS3DataFetcher { val keys = for(query <- queries) yield { val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) { - Array("s3n://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get) + Array("s3a://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get) } else { getKeys(query); } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/OCIDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/OCIDataFetcher.scala index 9133534..a42218a 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/OCIDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/OCIDataFetcher.scala @@ -14,7 +14,7 @@ object OCIDataFetcher { val keys = for(query <- queries) yield { val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) { - Array("s3n://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get) + Array("s3a://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get) } else { getKeys(query); } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/S3DataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/S3DataFetcher.scala index 51db71e..acacdb4 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/S3DataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/S3DataFetcher.scala @@ -21,7 +21,7 @@ object S3DataFetcher { val keys = for(query <- queries) yield { val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) { - Array("s3n://"+getBucket(query.bucket)+"/"+getPrefix(query.prefix) + query.endDate.get) + Array("s3a://"+getBucket(query.bucket)+"/"+getPrefix(query.prefix) + query.endDate.get) } else { getKeys(query); } @@ -36,7 +36,7 @@ object S3DataFetcher { } private def getKeys(query: Query)(implicit fc: FrameworkContext) : Array[String] = { - val storageService = fc.getStorageService("aws"); + val storageService = fc.getStorageService("aws", "aws_storage_key", "aws_storage_secret"); val keys = storageService.searchObjects(getBucket(query.bucket), getPrefix(query.prefix), query.startDate, query.endDate, query.delta, query.datePattern.getOrElse("yyyy-MM-dd")) storageService.getPaths(getBucket(query.bucket), keys).toArray } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomOCIStorageService.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomOCIStorageService.scala index a306197..c14a970 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomOCIStorageService.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomOCIStorageService.scala @@ -27,6 +27,6 @@ class CustomOCIStorageService(config: StorageConfig) extends BaseStorageService var blobStore: BlobStore = context.getBlobStore override def getPaths(container: String, objects: List[Blob]): List[String] = { - objects.map{f => "s3n://" + container + "/" + f.key} + objects.map{f => "s3a://" + container + "/" + f.key} } } \ No newline at end of file diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomS3StorageService.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomS3StorageService.scala index 976baaf..95a358d 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomS3StorageService.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/storage/CustomS3StorageService.scala @@ -12,6 +12,6 @@ class CustomS3StorageService(config: StorageConfig) extends BaseStorageService { var blobStore: BlobStore = context.getBlobStore override def getPaths(container: String, objects: List[Blob]): List[String] = { - objects.map{f => "s3n://" + container + "/" + f.key} + objects.map{f => "s3a://" + container + "/" + f.key} } } \ No newline at end of file diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala index db02ac5..628978c 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -21,13 +21,13 @@ object S3Provider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext") - val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getAwsKey()) - val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getAwsSecret()) - sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key) - sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret) + sc.hadoopConfiguration.set("fs.s3a.access.key", key) + sc.hadoopConfiguration.set("fs.s3a.secret.key", secret) val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") if (storageEndpoint.nonEmpty) { - sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint) + sc.hadoopConfiguration.set("fs.s3a.endpoint", storageEndpoint) } } } @@ -36,8 +36,8 @@ object AzureProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext") - val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("azure")) - val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("azure")) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret) sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.azure.account.key." + key + ".blob.core.windows.net", secret) sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") @@ -47,8 +47,8 @@ object GcpProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext") - val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("gcloud")) - val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("gcloud")) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret) sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", key) @@ -60,8 +60,8 @@ object GcpProvider extends ICloudStorageProvider { object OCIProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { - val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("oci")) - val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("oci")) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret) JobLogger.log("Configuring OCI Access Key & Secret Key to SparkContext") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key); sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret); diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index c692742..2221fef 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -700,7 +700,7 @@ object CommonUtil { } def getS3File(bucket: String, file: String): String = { - "s3n://" + bucket + "/" + file; + "s3a://" + bucket + "/" + file; } def getS3FileWithoutPrefix(bucket: String, file: String): String = { @@ -726,21 +726,21 @@ object CommonUtil { def setStorageConf(store: String, accountKey: Option[String], accountSecret: Option[String])(implicit sc: SparkContext): Configuration = { store.toLowerCase() match { case "s3" => - sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getConfig(accountKey.getOrElse("aws_storage_key"))); - sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret"))); + sc.hadoopConfiguration.set("fs.s3a.access.key", AppConf.getConfig(accountKey.getOrElse("aws_storage_key"))); + sc.hadoopConfiguration.set("fs.s3a.secret.key", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret"))); case "azure" => sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.azure.account.key." + AppConf.getConfig(accountKey.getOrElse("azure_storage_key")) + ".blob.core.windows.net", AppConf.getConfig(accountSecret.getOrElse("azure_storage_secret"))) case "gcloud" => sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") - sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud")) - sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud")) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret) sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) case "oci" => - sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getConfig(accountKey.getOrElse("aws_storage_key"))); - sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret"))); - // sc.hadoopConfiguration.set("fs.s3n.endpoint", AppConf.getConfig(accountSecret.getOrElse("cloud_storage_endpoint_with_protocol"))); + sc.hadoopConfiguration.set("fs.s3a.access.key", AppConf.getConfig(accountKey.getOrElse("aws_storage_key"))); + sc.hadoopConfiguration.set("fs.s3a.secret.key", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret"))); + sc.hadoopConfiguration.set("fs.s3a.endpoint", AppConf.getConfig(accountSecret.getOrElse("cloud_storage_endpoint_with_protocol"))); case _ => // Do nothing } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/DatasetUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/DatasetUtil.scala index c6646e3..8aa8c83 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/DatasetUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/DatasetUtil.scala @@ -57,13 +57,13 @@ class DatasetExt(df: Dataset[Row]) { val filePrefix = storageConfig.store.toLowerCase() match { case "s3" => - "s3n://" + "s3a://" case "azure" => "wasb://" case "gcloud" => "gs://" case "oci" => - "s3n://" + "s3a://" case _ => "" } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/MergeUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/MergeUtil.scala index 8013d23..1b415e3 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/MergeUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/MergeUtil.scala @@ -30,8 +30,8 @@ class MergeUtil { }) mergeConfig.merge.files.foreach(filePaths => { val isPrivate = mergeConfig.reportFileAccess.getOrElse(true) - val storageKey= if(isPrivate) "azure_storage_key" else "druid_storage_account_key" - val storageSecret= if(isPrivate) "azure_storage_secret" else "druid_storage_account_secret" + val storageKey= if(isPrivate) "cloud_storage_key" else "druid_storage_account_key" + val storageSecret= if(isPrivate) "cloud_storage_secret" else "druid_storage_account_secret" val metricLabels= mergeConfig.metricLabels.getOrElse(List()) val path = new Path(filePaths("reportPath")) val postContainer= mergeConfig.postContainer.getOrElse(AppConf.getConfig("druid.report.default.container")) @@ -191,7 +191,7 @@ class MergeUtil { def fetchBlobFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = { val storageService = if (isPrivate) - fc.getStorageService("azure", "azure_storage_key", "azure_storage_secret") + fc.getStorageService("azure", "cloud_storage_key", "cloud_storage_secret") else { fc.getStorageService("azure", "druid_storage_account_key", "druid_storage_account_secret") } @@ -205,7 +205,7 @@ class MergeUtil { def fetchOSSFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = { val storageService = if (isPrivate) - fc.getStorageService("oci", "aws_storage_key", "aws_storage_secret") + fc.getStorageService("oci", "cloud_storage_key", "cloud_storage_secret") else { fc.getStorageService("oci", "druid_storage_account_key", "druid_storage_account_secret") } diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestDataFetcher.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestDataFetcher.scala index c7974d8..c56d4c2 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestDataFetcher.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestDataFetcher.scala @@ -119,7 +119,7 @@ class TestDataFetcher extends SparkSpec with Matchers with MockFactory { it should "cover the missing branches in S3DataFetcher, AzureDataFetcher and DruidDataFetcher" in { implicit val fc = new FrameworkContext(); var query = JSONUtils.deserialize[Query]("""{"bucket":"test-container","prefix":"test/","folder":"true","endDate":"2020-01-10"}""") - S3DataFetcher.getObjectKeys(Array(query)).head should be ("s3n://test-container/test/2020-01-10") + S3DataFetcher.getObjectKeys(Array(query)).head should be ("s3a://test-container/test/2020-01-10") AzureDataFetcher.getObjectKeys(Array(query)).head should be ("wasb://test-container@azure-test-key.blob.core.windows.net/test/2020-01-10") query = JSONUtils.deserialize[Query]("""{"bucket":"test-container","prefix":"test/","folder":"true","endDate":"2020-01-10","excludePrefix":"test"}""") diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala index ef89e47..ab1772e 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala @@ -317,8 +317,8 @@ class TestCommonUtil extends BaseSpec { azureStorageConf.get("fs.azure.account.key.azure-test-key.blob.core.windows.net") should be("azure-test-secret") val s3StorageConf = CommonUtil.setStorageConf("s3", Option("aws_storage_key"), Option("aws_storage_secret")) - s3StorageConf.get("fs.s3n.awsAccessKeyId") should be("aws-test-key") - s3StorageConf.get("fs.s3n.awsSecretAccessKey") should be("aws-test-secret") + s3StorageConf.get("fs.s3a.access.key") should be("aws-test-key") + s3StorageConf.get("fs.s3a.secret.key") should be("aws-test-secret") val fileUtil = new HadoopFileUtil; val copiedFile = fileUtil.copy("src/test/resources/sample_telemetry.log", "src/test/resources/sample_telemetry.json", sc.hadoopConfiguration) @@ -332,7 +332,7 @@ class TestCommonUtil extends BaseSpec { azureUrl should be ("wasb://telemetry-data-store@azure-test-key.blob.core.windows.net/report/archival-data/batch-001/2021-21-*.csv.gz") val s3Url = CommonUtil.getBlobUrl("s3", "report/archival-data/batch-001/2021-21-*.csv.gz", "telemetry-data-store") - s3Url should be ("s3n://telemetry-data-store/report/archival-data/batch-001/2021-21-*.csv.gz") + s3Url should be ("s3a://telemetry-data-store/report/archival-data/batch-001/2021-21-*.csv.gz") sc.stop() }