Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added analytics-core/lib/cloud-store-sdk_2.12-1.4.6.jar
Binary file not shown.
39 changes: 36 additions & 3 deletions analytics-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,17 @@
</dependency> -->
<!-- https://mvnrepository.com/artifact/org.sunbird/cloud-store-sdk -->
<dependency>
<groupId>${CLOUD_STORE_GROUP_ID}</groupId>
<artifactId>${CLOUD_STORE_ARTIFACT_ID}</artifactId>
<version>${CLOUD_STORE_VERSION}</version>
<!-- <groupId>${CLOUD_STORE_GROUP_ID}</groupId>-->
<!-- <artifactId>${CLOUD_STORE_ARTIFACT_ID}</artifactId>-->
<!-- <version>${CLOUD_STORE_VERSION}</version>-->
<!-- <groupId>org.sunbird</groupId>-->
<!-- <artifactId>cloud-store-sdk_2.12</artifactId>-->
<!-- <version>1.4.6</version>-->
<groupId>org.sunbird</groupId>
<artifactId>cloud-store-sdk_2.12</artifactId>
<version>1.4.6</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/cloud-store-sdk_2.12-1.4.6.jar</systemPath>
<exclusions>
<exclusion>
<groupId>com.microsoft.azure</groupId>
Expand All @@ -245,6 +253,31 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.jclouds</groupId>
<artifactId>jclouds-all</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
<version>1.18</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object AzureDataFetcher {

val keys = for(query <- queries) yield {
val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) {
Array("wasb://"+getBucket(query.bucket) + "@" + AppConf.getStorageKey("azure") + ".blob.core.windows.net" + "/" + getPrefix(query.prefix) + query.endDate.getOrElse(""))
Array("wasb://"+getBucket(query.bucket) + "@" + AppConf.getStorageKey + ".blob.core.windows.net" + "/" + getPrefix(query.prefix) + query.endDate.getOrElse(""))
} else {
getKeys(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object CephS3DataFetcher {

val keys = for(query <- queries) yield {
val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) {
Array("s3n://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get)
Array("s3a://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get)
} else {
getKeys(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object OCIDataFetcher {

val keys = for(query <- queries) yield {
val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) {
Array("s3n://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get)
Array("s3a://" + getBucket(query.bucket) + "/" + getPrefix(query.prefix) + query.endDate.get)
} else {
getKeys(query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object S3DataFetcher {

val keys = for(query <- queries) yield {
val paths = if(query.folder.isDefined && query.endDate.isDefined && query.folder.getOrElse("false").equals("true")) {
Array("s3n://"+getBucket(query.bucket)+"/"+getPrefix(query.prefix) + query.endDate.get)
Array("s3a://"+getBucket(query.bucket)+"/"+getPrefix(query.prefix) + query.endDate.get)
} else {
getKeys(query);
}
Expand All @@ -36,7 +36,7 @@ object S3DataFetcher {
}

private def getKeys(query: Query)(implicit fc: FrameworkContext) : Array[String] = {
val storageService = fc.getStorageService("aws");
val storageService = fc.getStorageService("aws", "aws_storage_key", "aws_storage_secret");
val keys = storageService.searchObjects(getBucket(query.bucket), getPrefix(query.prefix), query.startDate, query.endDate, query.delta, query.datePattern.getOrElse("yyyy-MM-dd"))
storageService.getPaths(getBucket(query.bucket), keys).toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ class CustomOCIStorageService(config: StorageConfig) extends BaseStorageService
var blobStore: BlobStore = context.getBlobStore

override def getPaths(container: String, objects: List[Blob]): List[String] = {
objects.map{f => "s3n://" + container + "/" + f.key}
objects.map{f => "s3a://" + container + "/" + f.key}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ class CustomS3StorageService(config: StorageConfig) extends BaseStorageService {
var blobStore: BlobStore = context.getBlobStore

override def getPaths(container: String, objects: List[Blob]): List[String] = {
objects.map{f => "s3n://" + container + "/" + f.key}
objects.map{f => "s3a://" + container + "/" + f.key}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ object S3Provider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext")
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getAwsKey())
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getAwsSecret())
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret)
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey)
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret)
sc.hadoopConfiguration.set("fs.s3a.access.key", key)
sc.hadoopConfiguration.set("fs.s3a.secret.key", secret)
val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint")
if (storageEndpoint.nonEmpty) {
sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint)
sc.hadoopConfiguration.set("fs.s3a.endpoint", storageEndpoint)
}
}
}
Expand All @@ -36,8 +36,8 @@ object AzureProvider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext")
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("azure"))
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("azure"))
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey)
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret)
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + key + ".blob.core.windows.net", secret)
sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
Expand All @@ -47,8 +47,8 @@ object GcpProvider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext")
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("gcloud"))
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("gcloud"))
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey)
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret)
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", key)
Expand All @@ -60,8 +60,8 @@ object GcpProvider extends ICloudStorageProvider {
object OCIProvider extends ICloudStorageProvider {
implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider"
override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = {
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("oci"))
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("oci"))
val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey)
val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret)
JobLogger.log("Configuring OCI Access Key & Secret Key to SparkContext")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key);
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ object CommonUtil {
}

def getS3File(bucket: String, file: String): String = {
"s3n://" + bucket + "/" + file;
"s3a://" + bucket + "/" + file;
}

def getS3FileWithoutPrefix(bucket: String, file: String): String = {
Expand All @@ -726,21 +726,21 @@ object CommonUtil {
def setStorageConf(store: String, accountKey: Option[String], accountSecret: Option[String])(implicit sc: SparkContext): Configuration = {
store.toLowerCase() match {
case "s3" =>
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getConfig(accountKey.getOrElse("aws_storage_key")));
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret")));
sc.hadoopConfiguration.set("fs.s3a.access.key", AppConf.getConfig(accountKey.getOrElse("aws_storage_key")));
sc.hadoopConfiguration.set("fs.s3a.secret.key", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret")));
case "azure" =>
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.azure.account.key." + AppConf.getConfig(accountKey.getOrElse("azure_storage_key")) + ".blob.core.windows.net", AppConf.getConfig(accountSecret.getOrElse("azure_storage_secret")))
case "gcloud" =>
sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud"))
sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey)
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret)
sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id"))
case "oci" =>
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getConfig(accountKey.getOrElse("aws_storage_key")));
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret")));
// sc.hadoopConfiguration.set("fs.s3n.endpoint", AppConf.getConfig(accountSecret.getOrElse("cloud_storage_endpoint_with_protocol")));
sc.hadoopConfiguration.set("fs.s3a.access.key", AppConf.getConfig(accountKey.getOrElse("aws_storage_key")));
sc.hadoopConfiguration.set("fs.s3a.secret.key", AppConf.getConfig(accountSecret.getOrElse("aws_storage_secret")));
sc.hadoopConfiguration.set("fs.s3a.endpoint", AppConf.getConfig(accountSecret.getOrElse("cloud_storage_endpoint_with_protocol")));
case _ =>
// Do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ class DatasetExt(df: Dataset[Row]) {

val filePrefix = storageConfig.store.toLowerCase() match {
case "s3" =>
"s3n://"
"s3a://"
case "azure" =>
"wasb://"
case "gcloud" =>
"gs://"
case "oci" =>
"s3n://"
"s3a://"
case _ =>
""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class MergeUtil {
})
mergeConfig.merge.files.foreach(filePaths => {
val isPrivate = mergeConfig.reportFileAccess.getOrElse(true)
val storageKey= if(isPrivate) "azure_storage_key" else "druid_storage_account_key"
val storageSecret= if(isPrivate) "azure_storage_secret" else "druid_storage_account_secret"
val storageKey= if(isPrivate) "cloud_storage_key" else "druid_storage_account_key"
val storageSecret= if(isPrivate) "cloud_storage_secret" else "druid_storage_account_secret"
val metricLabels= mergeConfig.metricLabels.getOrElse(List())
val path = new Path(filePaths("reportPath"))
val postContainer= mergeConfig.postContainer.getOrElse(AppConf.getConfig("druid.report.default.container"))
Expand Down Expand Up @@ -191,7 +191,7 @@ class MergeUtil {
def fetchBlobFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = {

val storageService = if (isPrivate)
fc.getStorageService("azure", "azure_storage_key", "azure_storage_secret")
fc.getStorageService("azure", "cloud_storage_key", "cloud_storage_secret")
else {
fc.getStorageService("azure", "druid_storage_account_key", "druid_storage_account_secret")
}
Expand All @@ -205,7 +205,7 @@ class MergeUtil {
def fetchOSSFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = {

val storageService = if (isPrivate)
fc.getStorageService("oci", "aws_storage_key", "aws_storage_secret")
fc.getStorageService("oci", "cloud_storage_key", "cloud_storage_secret")
else {
fc.getStorageService("oci", "druid_storage_account_key", "druid_storage_account_secret")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class TestDataFetcher extends SparkSpec with Matchers with MockFactory {
it should "cover the missing branches in S3DataFetcher, AzureDataFetcher and DruidDataFetcher" in {
implicit val fc = new FrameworkContext();
var query = JSONUtils.deserialize[Query]("""{"bucket":"test-container","prefix":"test/","folder":"true","endDate":"2020-01-10"}""")
S3DataFetcher.getObjectKeys(Array(query)).head should be ("s3n://test-container/test/2020-01-10")
S3DataFetcher.getObjectKeys(Array(query)).head should be ("s3a://test-container/test/2020-01-10")
AzureDataFetcher.getObjectKeys(Array(query)).head should be ("wasb://test-container@azure-test-key.blob.core.windows.net/test/2020-01-10")

query = JSONUtils.deserialize[Query]("""{"bucket":"test-container","prefix":"test/","folder":"true","endDate":"2020-01-10","excludePrefix":"test"}""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ class TestCommonUtil extends BaseSpec {
azureStorageConf.get("fs.azure.account.key.azure-test-key.blob.core.windows.net") should be("azure-test-secret")

val s3StorageConf = CommonUtil.setStorageConf("s3", Option("aws_storage_key"), Option("aws_storage_secret"))
s3StorageConf.get("fs.s3n.awsAccessKeyId") should be("aws-test-key")
s3StorageConf.get("fs.s3n.awsSecretAccessKey") should be("aws-test-secret")
s3StorageConf.get("fs.s3a.access.key") should be("aws-test-key")
s3StorageConf.get("fs.s3a.secret.key") should be("aws-test-secret")

val fileUtil = new HadoopFileUtil;
val copiedFile = fileUtil.copy("src/test/resources/sample_telemetry.log", "src/test/resources/sample_telemetry.json", sc.hadoopConfiguration)
Expand All @@ -332,7 +332,7 @@ class TestCommonUtil extends BaseSpec {
azureUrl should be ("wasb://telemetry-data-store@azure-test-key.blob.core.windows.net/report/archival-data/batch-001/2021-21-*.csv.gz")

val s3Url = CommonUtil.getBlobUrl("s3", "report/archival-data/batch-001/2021-21-*.csv.gz", "telemetry-data-store")
s3Url should be ("s3n://telemetry-data-store/report/archival-data/batch-001/2021-21-*.csv.gz")
s3Url should be ("s3a://telemetry-data-store/report/archival-data/batch-001/2021-21-*.csv.gz")

sc.stop()
}
Expand Down