|
23 | 23 | import com.dtstack.chunjun.converter.IDeserializationConverter; |
24 | 24 | import com.dtstack.chunjun.converter.ISerializationConverter; |
25 | 25 | import com.dtstack.chunjun.throwable.UnsupportedTypeException; |
| 26 | +import com.dtstack.chunjun.util.DateUtil; |
26 | 27 |
|
| 28 | +import org.apache.flink.table.data.DecimalData; |
27 | 29 | import org.apache.flink.table.data.GenericRowData; |
28 | 30 | import org.apache.flink.table.data.RowData; |
29 | 31 | import org.apache.flink.table.data.StringData; |
| 32 | +import org.apache.flink.table.data.TimestampData; |
| 33 | +import org.apache.flink.table.types.logical.DecimalType; |
30 | 34 | import org.apache.flink.table.types.logical.LogicalType; |
31 | 35 | import org.apache.flink.table.types.logical.RowType; |
| 36 | +import org.apache.flink.table.types.logical.TimestampType; |
32 | 37 |
|
| 38 | +import org.apache.commons.lang3.StringUtils; |
| 39 | + |
| 40 | +import java.math.BigDecimal; |
| 41 | +import java.math.BigInteger; |
33 | 42 | import java.sql.Date; |
34 | 43 | import java.sql.Time; |
| 44 | +import java.sql.Timestamp; |
35 | 45 | import java.time.LocalDate; |
| 46 | +import java.time.LocalDateTime; |
36 | 47 | import java.time.LocalTime; |
37 | 48 |
|
38 | 49 | public class S3SqlConverter extends AbstractRowConverter<String[], RowData, String[], LogicalType> { |
@@ -82,12 +93,41 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { |
82 | 93 | return val -> Float.valueOf((String) val); |
83 | 94 | case DOUBLE: |
84 | 95 | return val -> Double.valueOf((String) val); |
| 96 | + case DECIMAL: |
| 97 | + final int precision = ((DecimalType) type).getPrecision(); |
| 98 | + final int scale = ((DecimalType) type).getScale(); |
| 99 | + return val -> |
| 100 | + val instanceof BigInteger |
| 101 | + ? DecimalData.fromBigDecimal( |
| 102 | + new BigDecimal((BigInteger) val, 0), precision, scale) |
| 103 | + : DecimalData.fromBigDecimal( |
| 104 | + StringUtils.isNotEmpty(String.valueOf(val)) |
| 105 | + ? new BigDecimal(String.valueOf(val)) |
| 106 | + : BigDecimal.ZERO, |
| 107 | + precision, |
| 108 | + scale); |
| 109 | + case TIMESTAMP_WITH_TIME_ZONE: |
| 110 | + case TIMESTAMP_WITHOUT_TIME_ZONE: |
| 111 | + return val -> { |
| 112 | + if (val instanceof String) { |
| 113 | + return TimestampData.fromTimestamp(Timestamp.valueOf((String) val)); |
| 114 | + } else if (val instanceof LocalDateTime) { |
| 115 | + return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val)); |
| 116 | + } else { |
| 117 | + return TimestampData.fromTimestamp(((Timestamp) val)); |
| 118 | + } |
| 119 | + }; |
85 | 120 | case CHAR: |
86 | 121 | case VARCHAR: |
87 | 122 | return val -> StringData.fromString((String) val); |
88 | 123 | case DATE: |
89 | | - return val -> |
90 | | - (int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay()); |
| 124 | + return val -> { |
| 125 | + if (StringUtils.isEmpty(String.valueOf(val))) { |
| 126 | + return null; |
| 127 | + } |
| 128 | + Date date = new Date(DateUtil.stringToDate(String.valueOf(val)).getTime()); |
| 129 | + return (int) date.toLocalDate().toEpochDay(); |
| 130 | + }; |
91 | 131 | case TIME_WITHOUT_TIME_ZONE: |
92 | 132 | return val -> |
93 | 133 | (int) |
@@ -124,6 +164,22 @@ protected ISerializationConverter<String[]> createExternalConverter(LogicalType |
124 | 164 | output[index] = |
125 | 165 | Time.valueOf(LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)) |
126 | 166 | .toString(); |
| 167 | + case DECIMAL: |
| 168 | + return (rowData, index, data) -> |
| 169 | + data[index] = |
| 170 | + String.valueOf( |
| 171 | + rowData.getDecimal( |
| 172 | + index, |
| 173 | + ((DecimalType) type).getPrecision(), |
| 174 | + ((DecimalType) type).getScale())); |
| 175 | + case TIMESTAMP_WITHOUT_TIME_ZONE: |
| 176 | + return (rowData, index, data) -> |
| 177 | + data[index] = |
| 178 | + String.valueOf( |
| 179 | + rowData.getTimestamp( |
| 180 | + index, |
| 181 | + ((TimestampType) type).getPrecision()) |
| 182 | + .toTimestamp()); |
127 | 183 | default: |
128 | 184 | throw new UnsupportedTypeException(type.toString()); |
129 | 185 | } |
|
0 commit comments