Skip to content
This repository was archived by the owner on Apr 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
**/.DS_Store
.DS_Store
HELP.md
.gradle
build/
Expand Down
6 changes: 3 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
kotlin("jvm") version "1.6.21" apply false
kotlin("plugin.serialization") version "1.6.21" apply false
kotlin("jvm") version "1.8.21" apply false
kotlin("plugin.serialization") version "1.8.21" apply false
id("com.bmuschko.docker-remote-api") version "7.4.0" apply false
}

Expand Down Expand Up @@ -42,7 +42,7 @@ allprojects {
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict", "-opt-in=kotlin.RequiresOptIn")
jvmTarget = "8"
jvmTarget = "1.8"
apiVersion = "1.6"
languageVersion = "1.6"
}
Expand Down
36 changes: 9 additions & 27 deletions catalog-management/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Managing Catalog Resources

This example is a CLI application that uses a secondary configuration, the `app-config.json`, to define a number of
catalog entities to be managed during execution. The Connections/Data Sources/Profile Schema in the app-config.json
are created with the attributes defined in the configuration and are then available to use through the Profiles SDK
This example is a CLI application that uses a secondary configuration, the `app-config.json`, to define a number of configurable steps during execution.
Connections/Data Sources/Profile Schema are defined in the `../main-app/src/main/resources/spec` folder, and are made available to use through the Profiles SDK.
This builds off of the [Local Clients](../local-clients/README.md) example for its initial setup.

(See [RailedCommand.java](./src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java) for config driven Catalog management.)
Expand Down Expand Up @@ -82,27 +81,8 @@ To recreate resources that already exist (used in case of spec configuration cha
}
```

To define the specification for any number of Connections, Data Sources, and Profile Schemas. Resource schema deserializes
into Local Catalog entities.

```json
{
"resources": {
"specs": {
"connections": [
...
],
"dataSources": [
...
],
"profileSchemas": [
...
]
}
}
}
```

The specification for any number of Connections, Data Sources, and Profile Schemas may be defined in any number of yaml
files in the `spec` path.

## Run Locally

Expand All @@ -113,7 +93,7 @@ To run this example locally with local Cortex clients (from the parent directory
```
2. Run the application with Gradle.
```
./gradlew main-app:run --args="catalog-management -p local --config ../catalog-management/src/main/resources/conf/app-conf.json"
./gradlew main-app:run --args="catalog-management -p local --config ../catalog-management/src/main/resources/conf/app-conf.json --spec src/main/resources/spec"
```

The end of the log output should be similar to:
Expand Down Expand Up @@ -255,7 +235,7 @@ Exit Code: 0
- Use the [Remote Catalog](../docs/catalog.md#remote-catalog) implementation by setting the Cortex URL (`spark.cortex.client.phoenix.url`) to the in-cluster GraphQL API endpoint (`"http://cortex-api.cortex.svc.cluster.local:8080"`) and removing the Local Catalog implementation (`spark.cortex.catalog.impl`).
- Use the [remote storage client](../docs/backendstorage.md#remote-storage-client) implementation by setting the Cortex URL (`spark.cortex.client.phoenix.url`) to the GraphQL API endpoint, and remove the local storage client implementation (`spark.cortex.client.storage.impl`).
- Remove the local Secret client implementation (`spark.cortex.client.secrets.impl`).
- Update the `app_command` arguments to match your Cortex Project and App Config location (`--project`, `--config`).
- Update the `app_command` arguments to match your Cortex Project and App Config location (`--project`, `--config`, `--spec`).

To Build and run the skill:
1. Run the following make commands:
Expand All @@ -274,7 +254,9 @@ make build create-app-image deploy-skill invoke
"--project",
"mytest",
"--config",
"/app/conf/app-conf.json"
"/app/conf/app-conf.json",
"--spec",
"/opt/spark/work-dir/src/main/resources/spec"
],
"app_location": "local:///app/libs/app.jar",
"options": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
package com.c12e.cortex.examples.catalog;

import com.c12e.cortex.examples.local.SessionExample;
import com.c12e.cortex.phoenix.Connection;
import com.c12e.cortex.phoenix.DataSource;
import com.c12e.cortex.phoenix.NotFoundException;
import com.c12e.cortex.phoenix.ProfileSchema;
import com.c12e.cortex.phoenix.*;
import com.c12e.cortex.phoenix.spec.*;
import com.c12e.cortex.profiles.CortexSession;
import com.c12e.shadow.com.fasterxml.jackson.core.JsonParser;
Expand Down Expand Up @@ -61,6 +58,9 @@ public abstract class RailedCommand implements Runnable {
@CommandLine.Option(names = {"-c", "--config"}, description = "Ingestion config file path", required = true)
protected String configFilePath;

@CommandLine.Option(names = {"-s", "--spec"}, description = "Ingestion catalog spec path", required = true)
protected String specPath;

@CommandLine.Spec
protected CommandLine.Model.CommandSpec cmdSpec;

Expand Down Expand Up @@ -115,89 +115,16 @@ protected void safeDelete(Supplier<Boolean> deleteFunction) {
}
}

public RailedCommand() {
SimpleModule module = new SimpleModule();
module.addDeserializer(ProfileSchema.class, new ProfileSchemaDeserializer());

//Add custom serializer to Jackson module
com.c12e.shadow.com.jayway.jsonpath.Configuration.setDefaults(new com.c12e.shadow.com.jayway.jsonpath.Configuration.Defaults() {
private final JsonProvider jsonProvider = new JacksonJsonProvider(JsonMapper.builder()
.addModules(new KotlinModule.Builder().build(), module)
.build());
private final MappingProvider mappingProvider = new JacksonMappingProvider(JsonMapper.builder()
.addModules(new KotlinModule.Builder().build(), module)
.build());

@Override
public JsonProvider jsonProvider() {
return jsonProvider;
}

@Override
public MappingProvider mappingProvider() {
return mappingProvider;
}

@Override
public Set<Option> options() {
return EnumSet.noneOf(Option.class);
}
});
}

/**
* Custom Profile Schema deserializer to convert from app config to Cortex type
*/
public class ProfileSchemaDeserializer extends StdDeserializer<ProfileSchema> {

public ProfileSchemaDeserializer() {
this(null);
}

public ProfileSchemaDeserializer(Class<?> vc) {
super(vc);
}

@Override
public ProfileSchema deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException {
JsonNode node = jp.getCodec().readTree(jp);

List<AttributeSpec> attributes = Arrays.asList(ctxt.readTreeAsValue(node.get("customAttributes"), CustomAttributeSpec[].class));
attributes.addAll(Arrays.asList(ctxt.readTreeAsValue(node.get("bucketAttributes"), BucketAttributeSpec[].class)));

return new ProfileSchema(
node.get("project").asText(),
node.get("name").asText(),
node.has("title") ? node.get("title").asText(null) : null,
node.has("description") ? node.get("description").asText(null) : null,
ctxt.readTreeAsValue(node.get("names"), ProfileNames.class),
ctxt.readTreeAsValue(node.get("primarySource"), DataSourceSelection.class),
Arrays.asList(ctxt.readTreeAsValue(node.get("joins"), JoinSourceSelection[].class)),
node.has("userId") ? node.get("userId").asText(null) : null,
attributes,
Arrays.asList(ctxt.readTreeAsValue(node.get("attributeTags"), AttributeTag[].class))
);
}
}

/**
* Manage catalog entities defined in the app configuration
* @param cortexSession - the Cortex session
* @param config - the app config
* @param project- the project name
*/
public void handleResourceEntities(CortexSession cortexSession, DocumentContext config, String project) {
config.put(CONNECTIONS_PATH + "[*]", "project", project);
config.put(DATA_SOURCES_PATH + "[*]", "project", project);
config.put(PROFILE_SCHEMAS_PATH + "[*]", "project", project);

List<Connection> connections = config.read(CONNECTIONS_PATH, new TypeRef<List<Connection>>() {
});
List<DataSource> dataSources = config.read(DATA_SOURCES_PATH, new TypeRef<List<DataSource>>() {
});
List<ProfileSchema> profileSchemas = config.read(PROFILE_SCHEMAS_PATH, new TypeRef<List<ProfileSchema>>() {
});
public void handleResourceEntities(CortexSession cortexSession, DocumentContext config, LocalCatalog localCatalog, String project) {
Iterable<Connection> connections = localCatalog.listConnections(project);
Iterable<DataSource> dataSources = localCatalog.listDataSources(project);
Iterable<ProfileSchema> profileSchemas = localCatalog.listProfileSchemas(project);

Boolean recreate = config.read("resources.recreate");
//Boolean recreate = true;
Expand Down Expand Up @@ -289,14 +216,16 @@ public final void run() {
SessionExample example = new SessionExample();
CortexSession cortexSession = example.getCortexSession();
DocumentContext config;
LocalCatalog localCatalog;
try {
localCatalog = new LocalCatalog(specPath);
Comment on lines +219 to +221
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not the same thing if you use the Catalog implementation from the CortexSession ? Catalog localCatalog = cortexSession.catalog()? If it is, then it might be preferable since the entrypoint for the user/developer is still the CortexSession

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is actually a separate instance of the LocalCatalog, the instance will act as a source. The example is all about updating/maintaining the resource entities before doing some work on them. The catalog in the CortexSession is a singleton and is our target catalog (may be pointing at a local or remote catalog)

config = JsonPath.parse(Paths.get(configFilePath).toFile());
} catch (IOException e) {
throw new RuntimeException(e);
}

//catalog management
handleResourceEntities(cortexSession, config, project);
handleResourceEntities(cortexSession, config, localCatalog, project);

//set listener for streaming sources
SingleLoopQueryListener queryListener = new SingleLoopQueryListener(cortexSession.spark());
Expand Down
102 changes: 1 addition & 101 deletions catalog-management/src/main/resources/conf/app-conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,106 +10,6 @@
]
},
"resources": {
"recreate": true,
"specs": {
"connections": [
{
"name": "member-base-file",
"title": "Member Base File",
"connectionType": "file",
"contentType": "csv",
"allowRead": true,
"allowWrite": false,
"params": [
{
"name": "uri",
"value": "./src/main/resources/data/members_100_v14.csv"
},
{
"name": "csv/header",
"value": "true"
}
]
}
],
"dataSources": [
{
"attributes": [
"member_id",
"state_code",
"city",
"state",
"zip_code",
"gender",
"segment",
"member_health_plan",
"is_PCP_auto_assigned",
"pcp_tax_id",
"phone",
"do_not_call",
"channel_pref",
"age",
"last_flu_shot_date",
"pcp_name"
],
"connection": {
"name": "member-base-file"
},
"kind": "batch",
"name": "member-base-ds",
"primaryKey": "member_id",
"title": "Member Base Datasource"
}
],
"profileSchemas": [
{
"title": "Member",
"name": "member-profile",
"names": {
"categories": [
"customer"
],
"plural": "members",
"singular": "member",
"title": "Member"
},
"attributeTags": [],
"joins": [],
"primarySource": {
"attributes": [
"state_code",
"city",
"state",
"zip_code",
"gender",
"email",
"segment",
"member_health_plan",
"is_PCP_auto_assigned",
"pcp_tax_id",
"address",
"phone",
"do_not_call",
"channel_pref",
"age",
"last_flu_shot_date",
"pcp_name",
"pcp_address"
],
"name": "member-base-ds",
"profileGroup": "Demographics",
"timestamp": {
"auto": true,
"field": null,
"format": null,
"fixed": null
},
"profileKey": "member_id"
},
"bucketAttributes": [],
"customAttributes": []
}
]
}
"recreate": true
}
}
8 changes: 6 additions & 2 deletions catalog-management/src/main/resources/conf/spark-conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"--project",
"local",
"--config",
"/app/conf/app-conf.json"
"/app/conf/app-conf.json",
"--spec",
"/opt/spark/work-dir/src/main/resources/spec"
],
"app_location": "local:///app/libs/app.jar",
"options": {
Expand All @@ -22,7 +24,9 @@
"spark.cortex.client.secrets.impl": "com.c12e.cortex.examples.local.CustomSecretsClient",
"spark.cortex.client.storage.impl": "com.c12e.cortex.profiles.client.LocalRemoteStorageClient",
"spark.cortex.storage.storageType": "file",
"spark.cortex.storage.file.baseDir": "src/main/resources/data"
"spark.cortex.storage.file.baseDir": "src/main/resources/data",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
}
Expand Down
Empty file.
18 changes: 1 addition & 17 deletions main-app/src/main/resources/spec/datasources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,4 @@ spec:
attributes:
- flu_risk_score
- date
- member_id
---
apiVersion: cognitivescale.io/v1
kind: DataSource
metadata:
name: KPI 1
spec:
kind: batch
primaryKey: timeOfExecution
connection:
name: KPI
attributes:
- timeOfExecution
- value
- startDate
- endDate
- windowDuration
- member_id