Skip to content

Commit 7d6ed70

Browse files
committed
Refactor writeNonNullToDB to use modifiableColumnTypes for SQL type determination
# Conflicts: # oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java
1 parent 133f878 commit 7d6ed70

1 file changed

Lines changed: 18 additions & 30 deletions

File tree

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,11 @@
1919
import com.google.common.io.ByteStreams;
2020
import io.cdap.cdap.api.common.Bytes;
2121
import io.cdap.cdap.api.data.format.StructuredRecord;
22-
import io.cdap.cdap.api.data.format.StructuredRecord.Builder;
2322
import io.cdap.cdap.api.data.schema.Schema;
24-
import io.cdap.cdap.api.data.schema.Schema.Field;
25-
import io.cdap.cdap.api.data.schema.Schema.LogicalType;
26-
import io.cdap.cdap.api.data.schema.Schema.Type;
2723
import io.cdap.cdap.etl.api.validation.InvalidStageException;
2824
import io.cdap.plugin.db.ColumnType;
2925
import io.cdap.plugin.db.DBRecord;
3026
import io.cdap.plugin.db.SchemaReader;
31-
import org.apache.hadoop.io.Writable;
32-
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3327

3428
import java.io.IOException;
3529
import java.io.InputStream;
@@ -51,8 +45,8 @@
5145
import java.util.List;
5246

5347
/**
54-
* Oracle Source implementation {@link DBWritable} and
55-
* {@link Writable}.
48+
* Oracle Source implementation {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} and
49+
* {@link org.apache.hadoop.io.Writable}.
5650
*/
5751
public class OracleSourceDBRecord extends DBRecord {
5852

@@ -83,11 +77,11 @@ protected SchemaReader getSchemaReader() {
8377
public void readFields(ResultSet resultSet) throws SQLException {
8478
Schema schema = getSchema();
8579
ResultSetMetaData metadata = resultSet.getMetaData();
86-
Builder recordBuilder = StructuredRecord.builder(schema);
80+
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
8781

8882
// All LONG or LONG RAW columns have to be retrieved from the ResultSet prior to all the other columns.
8983
// Otherwise, we will face java.sql.SQLException: Stream has already been closed
90-
for (Field field : schema.getFields()) {
84+
for (Schema.Field field : schema.getFields()) {
9185
// Index of a field in the schema may not be same in the ResultSet,
9286
// hence find the field by name in the given resultSet
9387
int columnIndex = resultSet.findColumn(field.getName());
@@ -97,7 +91,7 @@ public void readFields(ResultSet resultSet) throws SQLException {
9791
}
9892

9993
// Read fields of other types
100-
for (Field field : schema.getFields()) {
94+
for (Schema.Field field : schema.getFields()) {
10195
// Index of a field in the schema may not be same in the ResultSet,
10296
// hence find the field by name in the given resultSet
10397
int columnIndex = resultSet.findColumn(field.getName());
@@ -110,7 +104,7 @@ record = recordBuilder.build();
110104
}
111105

112106
@Override
113-
protected void handleField(ResultSet resultSet, Builder recordBuilder, Field field,
107+
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
114108
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
115109
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) {
116110
handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
@@ -123,13 +117,7 @@ protected void handleField(ResultSet resultSet, Builder recordBuilder, Field fie
123117
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
124118
String fieldName, int fieldIndex) throws SQLException {
125119
int sqlIndex = fieldIndex + 1;
126-
ColumnType foundColumnType = columnTypes.stream()
127-
.filter(columnType -> columnType.getName().equals(fieldName))
128-
.findFirst() // Efficiently gets the first match wrapped in an Optional
129-
.orElseThrow(() -> new IllegalArgumentException(
130-
String.format("Unable to find the column type for field '%s'", fieldName)));
131-
132-
int sqlType = foundColumnType.getType();
120+
int sqlType = modifiableColumnTypes.get(fieldIndex).getType();
133121

134122
// TIMESTAMP and TIMESTAMPTZ types needs to be handled using the specific oracle types to ensure that the data
135123
// inserted matches with the provided value. As Oracle driver internally alters the values provided
@@ -138,7 +126,7 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
138126
// More details here : https://docs.oracle.com/cd/E13222_01/wls/docs91/jdbc_drivers/oracle.html
139127
// Handle the case when TimestampTZ type is set to CDAP String type or Timestamp type
140128
if (sqlType == OracleSourceSchemaReader.TIMESTAMP_TZ) {
141-
if (Type.STRING.equals(fieldSchema.getType())) {
129+
if (Schema.Type.STRING.equals(fieldSchema.getType())) {
142130
// Deprecated: Handle the case when the TimestampTZ is mapped to CDAP String type
143131
String timestampString = record.get(fieldName);
144132
Object timestampTZ = createOracleTimestampWithTimeZone(stmt.getConnection(), timestampString);
@@ -152,21 +140,21 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
152140
stmt.setObject(sqlIndex, timestampWithTimeZone);
153141
}
154142
} else if (sqlType == OracleSourceSchemaReader.TIMESTAMP_LTZ) {
155-
if (LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) {
143+
if (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) {
156144
// Deprecated: Handle the case when the TimestampLTZ is mapped to CDAP Timestamp type
157145
ZonedDateTime timestamp = record.getTimestamp(fieldName);
158146
String timestampString = Timestamp.valueOf(timestamp.toLocalDateTime()).toString();
159147
Object timestampWithTimeZone = createOracleTimestampWithLocalTimeZone(stmt.getConnection(), timestampString);
160148
stmt.setObject(sqlIndex, timestampWithTimeZone);
161-
} else if (LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
149+
} else if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
162150
// Handle the case when the TimestampLTZ is mapped to CDAP Datetime type
163151
LocalDateTime localDateTime = record.getDateTime(fieldName);
164152
String timestampString = Timestamp.valueOf(localDateTime).toString();
165153
Object timestampWithTimeZone = createOracleTimestampWithLocalTimeZone(stmt.getConnection(), timestampString);
166154
stmt.setObject(sqlIndex, timestampWithTimeZone);
167155
}
168156
} else if (sqlType == Types.TIMESTAMP) {
169-
if (LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
157+
if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
170158
// Handle the case when Timestamp is mapped to CDAP Datetime type.
171159
LocalDateTime localDateTime = record.getDateTime(fieldName);
172160
String timestampString = Timestamp.valueOf(localDateTime).toString();
@@ -269,7 +257,7 @@ private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLE
269257
}
270258
}
271259

272-
private void handleOracleSpecificType(ResultSet resultSet, Builder recordBuilder, Field field,
260+
private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
273261
int columnIndex, int sqlType, int precision, int scale)
274262
throws SQLException {
275263
Schema nonNullSchema = field.getSchema().isNullable() ?
@@ -282,7 +270,7 @@ private void handleOracleSpecificType(ResultSet resultSet, Builder recordBuilder
282270
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
283271
break;
284272
case OracleSourceSchemaReader.TIMESTAMP_TZ:
285-
if (Type.STRING.equals(nonNullSchema.getType())) {
273+
if (Schema.Type.STRING.equals(nonNullSchema.getType())) {
286274
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
287275
} else {
288276
// In case of TimestampTZ datatype the getTimestamp(index, Calendar) method call does not
@@ -310,7 +298,7 @@ private void handleOracleSpecificType(ResultSet resultSet, Builder recordBuilder
310298
case Types.TIMESTAMP:
311299
// Since Oracle Timestamp type does not have any timezone information, it should be converted into the
312300
// CDAP Datetime type.
313-
if (LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) {
301+
if (Schema.LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) {
314302
Timestamp timestamp = resultSet.getTimestamp(columnIndex);
315303
if (timestamp != null) {
316304
recordBuilder.setDateTime(field.getName(), timestamp.toLocalDateTime());
@@ -327,7 +315,7 @@ private void handleOracleSpecificType(ResultSet resultSet, Builder recordBuilder
327315
// super.setField sets this '0000-12-31 09:00:00.000Z[UTC]' in the recordBuilder which is incorrect and the
328316
// correct value should be '0001-01-01 09:00:00.000Z[UTC]'.
329317
Object timeStampObj = resultSet.getObject(columnIndex);
330-
if (LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) {
318+
if (Schema.LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) {
331319
Timestamp timestampLTZ = resultSet.getTimestamp(columnIndex);
332320
if (timestampLTZ != null) {
333321
recordBuilder.setDateTime(field.getName(),
@@ -363,7 +351,7 @@ private void handleOracleSpecificType(ResultSet resultSet, Builder recordBuilder
363351
if (precision == 0) {
364352
Schema nonNullableSchema = field.getSchema().isNullable() ?
365353
field.getSchema().getNonNullable() : field.getSchema();
366-
if (LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) {
354+
if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) {
367355
// Handle the field using the schema set in the output schema
368356
BigDecimal decimal = resultSet.getBigDecimal(columnIndex, getScale(field.getSchema()));
369357
recordBuilder.setDecimal(field.getName(), decimal);
@@ -394,8 +382,8 @@ private boolean isLongOrLongRaw(int columnType) {
394382
return columnType == OracleSourceSchemaReader.LONG || columnType == OracleSourceSchemaReader.LONG_RAW;
395383
}
396384

397-
private void readField(int columnIndex, ResultSetMetaData metadata, ResultSet resultSet, Field field,
398-
Builder recordBuilder) throws SQLException {
385+
private void readField(int columnIndex, ResultSetMetaData metadata, ResultSet resultSet, Schema.Field field,
386+
StructuredRecord.Builder recordBuilder) throws SQLException {
399387
int sqlType = metadata.getColumnType(columnIndex);
400388
int sqlPrecision = metadata.getPrecision(columnIndex);
401389
int sqlScale = metadata.getScale(columnIndex);

0 commit comments

Comments
 (0)