Skip to content

Commit bb0a467

Browse files
committed
feat(iam-setup): additional support for iam
1 parent eeb2c88 commit bb0a467

File tree

29 files changed

+2691
-348
lines changed

29 files changed

+2691
-348
lines changed

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/sqlsetup/CreateUsersStep.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,11 @@ SqlSetupResult createUsers(SqlSetupArgs args) throws SQLException {
8080

8181
void createIamUser(SqlSetupArgs args, SqlSetupResult result) throws SQLException {
8282
log.info("Creating IAM-authenticated user: {}", args.getCreateUserUsername());
83+
// Note: The IAM role parameter is null - MySQL uses 'RDS' constant, PostgreSQL uses rds_iam
84+
// role
8385

8486
try (Connection connection = server.dataSource().getConnection()) {
85-
String createUserSql =
86-
dbOps.createIamUserSql(args.getCreateUserUsername(), args.getCreateUserIamRole());
87+
String createUserSql = dbOps.createIamUserSql(args.getCreateUserUsername(), null);
8788
try (PreparedStatement stmt = connection.prepareStatement(createUserSql)) {
8889
stmt.executeUpdate();
8990
}
@@ -94,6 +95,13 @@ void createIamUser(SqlSetupArgs args, SqlSetupResult result) throws SQLException
9495
grantStmt.executeUpdate();
9596
}
9697

98+
// Flush privileges for MySQL to ensure grants take effect immediately
99+
if (setupArgs.getDbType() == DatabaseType.MYSQL) {
100+
try (PreparedStatement flushStmt = connection.prepareStatement("FLUSH PRIVILEGES")) {
101+
flushStmt.executeUpdate();
102+
}
103+
}
104+
97105
result.setUsersCreated(1);
98106
log.info("IAM user '{}' created successfully", args.getCreateUserUsername());
99107
}
@@ -116,6 +124,13 @@ void createTraditionalUser(SqlSetupArgs args, SqlSetupResult result) throws SQLE
116124
grantStmt.executeUpdate();
117125
}
118126

127+
// Flush privileges for MySQL to ensure grants take effect immediately
128+
if (setupArgs.getDbType() == DatabaseType.MYSQL) {
129+
try (PreparedStatement flushStmt = connection.prepareStatement("FLUSH PRIVILEGES")) {
130+
flushStmt.executeUpdate();
131+
}
132+
}
133+
119134
result.setUsersCreated(1);
120135
log.info("Traditional user '{}' created successfully", args.getCreateUserUsername());
121136
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/sqlsetup/MySqlDatabaseOperations.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,24 @@ public class MySqlDatabaseOperations implements DatabaseOperations {
3535

3636
@Override
3737
public String createIamUserSql(String username, String iamRole) {
38-
// MySQL - IAM authentication with configurable role
38+
// The iamRole parameter is not used for MySQL (unlike PostgreSQL)
39+
// The actual IAM permissions are managed by AWS IAM policies, not stored in MySQL
3940
String escapedUser = escapeMysqlStringLiteral(username);
40-
String escapedRole = escapeMysqlStringLiteral(iamRole);
41-
return "CREATE USER "
41+
return "CREATE USER IF NOT EXISTS "
4242
+ escapedUser
43-
+ "@'%' IDENTIFIED WITH AWSAuthenticationPlugin AS "
44-
+ escapedRole
45-
+ ";";
43+
+ "@'%' IDENTIFIED WITH AWSAuthenticationPlugin AS 'RDS';";
4644
}
4745

4846
@Override
4947
public String createTraditionalUserSql(String username, String password) {
5048
// MySQL - traditional authentication
5149
String escapedUser = escapeMysqlStringLiteral(username);
5250
String escapedPassword = escapeMysqlStringLiteral(password);
53-
return "CREATE USER " + escapedUser + "@'%' IDENTIFIED BY " + escapedPassword + ";";
51+
return "CREATE USER IF NOT EXISTS "
52+
+ escapedUser
53+
+ "@'%' IDENTIFIED BY "
54+
+ escapedPassword
55+
+ ";";
5456
}
5557

5658
@Override

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/sqlsetup/PostgresDatabaseOperations.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,41 @@ public class PostgresDatabaseOperations implements DatabaseOperations {
3636

3737
@Override
3838
public String createIamUserSql(String username, String iamRole) {
39-
// PostgreSQL - IAM authentication (requires additional setup)
39+
// PostgreSQL RDS - IAM authentication uses the rds_iam role
40+
// The iamRole parameter is not used for PostgreSQL (IAM permissions are managed by AWS IAM)
41+
// The actual IAM permissions are managed by AWS IAM policies, not stored in PostgreSQL
4042
String escapedUser = escapePostgresIdentifier(username);
41-
return "CREATE USER " + escapedUser + " WITH LOGIN;";
43+
return String.format(
44+
"""
45+
DO
46+
$$
47+
BEGIN
48+
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = %s) THEN
49+
CREATE USER %s WITH LOGIN;
50+
END IF;
51+
END
52+
$$;
53+
GRANT rds_iam TO %s;
54+
""",
55+
escapedUser, escapedUser, escapedUser);
4256
}
4357

4458
@Override
4559
public String createTraditionalUserSql(String username, String password) {
4660
String escapedUser = escapePostgresIdentifier(username);
4761
String escapedPassword = escapePostgresStringLiteral(password);
48-
return "CREATE USER " + escapedUser + " WITH PASSWORD " + escapedPassword + ";";
62+
return String.format(
63+
"""
64+
DO
65+
$$
66+
BEGIN
67+
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = %s) THEN
68+
CREATE USER %s WITH PASSWORD %s;
69+
END IF;
70+
END
71+
$$;
72+
""",
73+
escapedUser, escapedUser, escapedPassword);
4974
}
5075

5176
@Override
@@ -61,6 +86,7 @@ public String createCdcUserSql(String cdcUser, String cdcPassword) {
6186
// Properly escape identifiers and string literals to prevent SQL injection
6287
String escapedUser = escapePostgresIdentifier(cdcUser);
6388
String escapedPassword = escapePostgresStringLiteral(cdcPassword);
89+
String escapedUserLiteral = escapePostgresStringLiteral(cdcUser);
6490

6591
return String.format(
6692
"""
@@ -74,7 +100,7 @@ IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = %s) THEN
74100
$$;
75101
ALTER USER %s WITH REPLICATION;
76102
""",
77-
escapedPassword, escapedUser, escapedPassword, escapedUser);
103+
escapedUserLiteral, escapedUser, escapedPassword, escapedUser);
78104
}
79105

80106
@Override

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/sqlsetup/SqlSetupArgs.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ public class SqlSetupArgs {
1717
String cdcUser;
1818
String cdcPassword;
1919
String createUserUsername;
20-
String createUserPassword;
20+
String createUserPassword; // If null, IAM authentication is used
2121
String host;
2222
int port;
2323
String databaseName;
24-
String createUserIamRole; // IAM role for new user creation (required if IAM auth enabled)
2524
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/sqlsetup/config/SqlSetupConfig.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ public SqlSetupArgs createSetupArgs() {
6060
boolean createTables = EnvironmentUtils.getBoolean("CREATE_TABLES", true);
6161
boolean createDatabase = EnvironmentUtils.getBoolean("CREATE_DB", true);
6262
boolean createUser = EnvironmentUtils.getBoolean("CREATE_USER", false);
63-
String createUserIamRole = EnvironmentUtils.getString("IAM_ROLE");
64-
boolean iamAuthEnabled = createUserIamRole != null && !createUserIamRole.trim().isEmpty();
6563
boolean cdcEnabled = EnvironmentUtils.getBoolean("CDC_MCL_PROCESSING_ENABLED", false);
6664
String cdcUser = EnvironmentUtils.getString("CDC_USER", "datahub_cdc");
6765
String cdcPassword = EnvironmentUtils.getString("CDC_PASSWORD", "datahub_cdc");
@@ -78,17 +76,22 @@ public SqlSetupArgs createSetupArgs() {
7876
int port = jdbcInfo.port;
7977

8078
// Set user creation credentials based on CREATE_USER setting
79+
// If CREATE_USER_PASSWORD is not provided, assume IAM authentication
8180
String createUserUsername;
8281
String createUserPassword;
82+
boolean iamAuthEnabled = false;
8383
if (createUser) {
84-
if (iamAuthEnabled) {
85-
// IAM authentication: only set username, no password
86-
createUserUsername = EnvironmentUtils.getString("CREATE_USER_USERNAME");
84+
createUserUsername = EnvironmentUtils.getString("CREATE_USER_USERNAME");
85+
createUserPassword = EnvironmentUtils.getString("CREATE_USER_PASSWORD");
86+
87+
// If password is not provided, use IAM authentication
88+
if (createUserPassword == null || createUserPassword.trim().isEmpty()) {
89+
iamAuthEnabled = true;
8790
createUserPassword = null; // No password for IAM auth
91+
log.info("User creation with IAM authentication detected (no password provided)");
8892
} else {
89-
// Traditional authentication: set both username and password
90-
createUserUsername = EnvironmentUtils.getString("CREATE_USER_USERNAME");
91-
createUserPassword = EnvironmentUtils.getString("CREATE_USER_PASSWORD");
93+
iamAuthEnabled = false;
94+
log.info("User creation with traditional password authentication");
9295
}
9396
} else {
9497
// When CREATE_USER is disabled, these fields are not used
@@ -110,8 +113,7 @@ public SqlSetupArgs createSetupArgs() {
110113
createUserPassword,
111114
host,
112115
port,
113-
databaseName,
114-
createUserIamRole);
116+
databaseName);
115117

116118
// Validate authentication configuration
117119
validateAuthenticationConfig(args);
@@ -173,12 +175,10 @@ void validateAuthenticationConfig(SqlSetupArgs args) {
173175
}
174176

175177
if (args.isIamAuthEnabled()) {
176-
// IAM authentication enabled - validate IAM role is provided
177-
if (args.getCreateUserIamRole() == null || args.getCreateUserIamRole().trim().isEmpty()) {
178-
throw new IllegalStateException(
179-
"IAM user creation is enabled but IAM_ROLE is not specified. "
180-
+ "Either set IAM_ROLE environment variable or disable IAM authentication.");
181-
}
178+
// For both MySQL and PostgreSQL RDS IAM authentication:
179+
// - MySQL: Uses constant 'RDS' in CREATE USER statement
180+
// - PostgreSQL: Uses rds_iam role grant
181+
// The actual IAM permissions are managed by AWS IAM policies, not stored in the database
182182

183183
// Validate username is provided for IAM
184184
if (args.getCreateUserUsername() == null || args.getCreateUserUsername().trim().isEmpty()) {
@@ -188,8 +188,8 @@ void validateAuthenticationConfig(SqlSetupArgs args) {
188188
}
189189

190190
log.info(
191-
"IAM user creation validated: role='{}', username='{}'",
192-
args.getCreateUserIamRole(),
191+
"IAM user creation validated for {}: username='{}' (IAM permissions managed by AWS IAM policies)",
192+
args.getDbType().getValue(),
193193
args.getCreateUserUsername());
194194
} else {
195195
// Traditional authentication - validate username and password are provided

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/BuildIndices.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesPreStep;
99
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesStep;
1010
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.CreateUsageEventIndicesStep;
11+
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.CreateUserStep;
1112
import com.linkedin.gms.factory.config.ConfigurationProvider;
1213
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
1314
import com.linkedin.metadata.entity.AspectDao;
@@ -70,6 +71,8 @@ private List<UpgradeStep> buildSteps(
7071
}
7172

7273
final List<UpgradeStep> steps = new ArrayList<>();
74+
// Setup Elasticsearch users and roles (if enabled)
75+
steps.add(new CreateUserStep(baseElasticSearchComponents, configurationProvider));
7376
// Setup usage event indices and policies
7477
steps.add(new CreateUsageEventIndicesStep(baseElasticSearchComponents, configurationProvider));
7578
// Disable ES write mode/change refresh rate and clone indices
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package com.linkedin.datahub.upgrade.system.elasticsearch.steps;
2+
3+
import com.linkedin.datahub.upgrade.UpgradeContext;
4+
import com.linkedin.datahub.upgrade.UpgradeStep;
5+
import com.linkedin.datahub.upgrade.UpgradeStepResult;
6+
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
7+
import com.linkedin.datahub.upgrade.system.elasticsearch.util.IndexRoleUtils;
8+
import com.linkedin.datahub.upgrade.system.elasticsearch.util.IndexUtils;
9+
import com.linkedin.gms.factory.config.ConfigurationProvider;
10+
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
11+
import com.linkedin.metadata.utils.EnvironmentUtils;
12+
import com.linkedin.upgrade.DataHubUpgradeState;
13+
import io.datahubproject.metadata.context.OperationContext;
14+
import java.util.function.Function;
15+
import lombok.RequiredArgsConstructor;
16+
import lombok.extern.slf4j.Slf4j;
17+
18+
@RequiredArgsConstructor
19+
@Slf4j
20+
public class CreateUserStep implements UpgradeStep {
21+
private final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents;
22+
private final ConfigurationProvider configurationProvider;
23+
24+
@Override
25+
public String id() {
26+
return "CreateElasticsearchUserStep";
27+
}
28+
29+
@Override
30+
public int retryCount() {
31+
return 3;
32+
}
33+
34+
@Override
35+
public boolean skip(UpgradeContext context) {
36+
boolean createUser = EnvironmentUtils.getBoolean("CREATE_USER_ES", false);
37+
if (!createUser) {
38+
log.info("Elasticsearch user creation is disabled, skipping user setup");
39+
}
40+
return !createUser;
41+
}
42+
43+
@Override
44+
public Function<UpgradeContext, UpgradeStepResult> executable() {
45+
return (context) -> {
46+
try {
47+
final String indexPrefix =
48+
configurationProvider.getElasticSearch().getIndex().getFinalPrefix();
49+
50+
// Check for CREATE_USER_ES_USERNAME and CREATE_USER_ES_PASSWORD environment variables first
51+
String username = EnvironmentUtils.getString("CREATE_USER_ES_USERNAME");
52+
String password = EnvironmentUtils.getString("CREATE_USER_ES_PASSWORD");
53+
String iamRoleArn = EnvironmentUtils.getString("CREATE_USER_ES_IAM_ROLE_ARN");
54+
55+
// Determine the authentication mode
56+
boolean usingIam = iamRoleArn != null && !iamRoleArn.isEmpty();
57+
boolean usingUserPassword =
58+
username != null && !username.isEmpty() && password != null && !password.isEmpty();
59+
60+
// Validate that at least one authentication method is configured
61+
if (!usingIam && !usingUserPassword) {
62+
log.warn(
63+
"Either CREATE_USER_ES_IAM_ROLE_ARN or CREATE_USER_ES_USERNAME/CREATE_USER_ES_PASSWORD must be configured");
64+
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
65+
}
66+
67+
String roleName = indexPrefix + "access";
68+
69+
if (esComponents.getSearchClient().getEngineType().isOpenSearch()) {
70+
setupOpenSearchUser(
71+
indexPrefix,
72+
roleName,
73+
username,
74+
password,
75+
iamRoleArn,
76+
usingIam,
77+
usingUserPassword,
78+
context.opContext());
79+
} else {
80+
// Elasticsearch Cloud doesn't support IAM authentication
81+
if (usingIam) {
82+
log.warn("IAM authentication is only supported for AWS OpenSearch Service");
83+
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
84+
}
85+
setupElasticsearchCloudUser(indexPrefix, roleName, username, password);
86+
}
87+
88+
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
89+
} catch (Exception e) {
90+
log.error("CreateElasticsearchUserStep failed.", e);
91+
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
92+
}
93+
};
94+
}
95+
96+
private void setupElasticsearchCloudUser(
97+
String prefix, String roleName, String username, String password) throws Exception {
98+
log.info("Creating Elasticsearch Cloud user and role");
99+
IndexRoleUtils.createElasticsearchCloudUser(esComponents, roleName, username, password, prefix);
100+
}
101+
102+
private void setupOpenSearchUser(
103+
String prefix,
104+
String roleName,
105+
String username,
106+
String password,
107+
String iamRoleArn,
108+
boolean usingIam,
109+
boolean usingUserPassword,
110+
OperationContext operationContext)
111+
throws Exception {
112+
// Check if this is AWS OpenSearch Service
113+
boolean isAwsOpenSearch = IndexUtils.isAwsOpenSearchService(esComponents);
114+
115+
if (isAwsOpenSearch) {
116+
log.info("Detected AWS OpenSearch Service. Creating AWS-specific role.");
117+
118+
// Create the role first (required for both modes)
119+
IndexRoleUtils.createAwsOpenSearchRole(esComponents, roleName, prefix);
120+
121+
if (usingIam) {
122+
// IAM authentication: create role mapping to IAM role
123+
log.info("IAM mode: Creating role mapping for IAM role: {}", iamRoleArn);
124+
IndexRoleUtils.createAwsOpenSearchRoleMapping(esComponents, roleName, iamRoleArn);
125+
}
126+
127+
if (usingUserPassword) {
128+
// Internal user authentication: create internal user
129+
log.info("Internal user mode: Creating internal user: {}", username);
130+
IndexRoleUtils.createAwsOpenSearchUser(
131+
esComponents, username, password, roleName, null, operationContext);
132+
}
133+
} else {
134+
log.warn("Detected self-hosted OpenSearch. Creating user and role not supported.");
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)