Skip to content

Commit 989de07

Browse files
committed
[FLINK-37959]supported postgres sql 14 all field types.
1 parent 42f91a8 commit 989de07

File tree

5 files changed

+130
-10
lines changed

5 files changed

+130
-10
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ limitations under the License.
3939
<version>${project.version}</version>
4040
</dependency>
4141

42+
<!-- geometry dependencies -->
43+
<dependency>
44+
<groupId>com.esri.geometry</groupId>
45+
<artifactId>esri-geometry-api</artifactId>
46+
<version>${geometry.version}</version>
47+
<exclusions>
48+
<exclusion>
49+
<groupId>com.fasterxml.jackson.core</groupId>
50+
<artifactId>jackson-core</artifactId>
51+
</exclusion>
52+
</exclusions>
53+
</dependency>
54+
4255
<dependency>
4356
<groupId>org.apache.flink</groupId>
4457
<artifactId>flink-connector-postgres-cdc</artifactId>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@
2626
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
2727
import org.apache.flink.table.data.TimestampData;
2828

29+
import com.esri.core.geometry.ogc.OGCGeometry;
30+
import com.fasterxml.jackson.databind.JsonNode;
2931
import com.fasterxml.jackson.databind.ObjectMapper;
3032
import io.debezium.data.Envelope;
3133
import io.debezium.data.geometry.Geography;
3234
import io.debezium.data.geometry.Geometry;
33-
import io.debezium.util.HexConverter;
35+
import io.debezium.data.geometry.Point;
3436
import org.apache.kafka.connect.data.Schema;
3537
import org.apache.kafka.connect.data.Struct;
3638
import org.apache.kafka.connect.source.SourceRecord;
3739

40+
import java.nio.ByteBuffer;
3841
import java.util.Collections;
3942
import java.util.HashMap;
4043
import java.util.List;
@@ -111,15 +114,24 @@ protected Map<String, String> getMetadata(SourceRecord record) {
111114
protected Object convertToString(Object dbzObj, Schema schema) {
112115
// the Geometry datatype in PostgreSQL will be converted to
113116
// a String with Json format
114-
if (Geometry.LOGICAL_NAME.equals(schema.name())
117+
if (Point.LOGICAL_NAME.equals(schema.name())
118+
|| Geometry.LOGICAL_NAME.equals(schema.name())
115119
|| Geography.LOGICAL_NAME.equals(schema.name())) {
116120
try {
117121
Struct geometryStruct = (Struct) dbzObj;
118122
byte[] wkb = geometryStruct.getBytes("wkb");
119-
Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32(SRID));
120-
Map<String, Object> geometryInfo = new HashMap<>(2);
121-
geometryInfo.put(HEXEWKB, HexConverter.convertToHexString(wkb));
122-
geometryInfo.put(SRID, srid.orElse(0));
123+
String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
124+
JsonNode originGeoNode = OBJECT_MAPPER.readTree(geoJson);
125+
Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32("srid"));
126+
Map<String, Object> geometryInfo = new HashMap<>();
127+
String geometryType = originGeoNode.get("type").asText();
128+
geometryInfo.put("type", geometryType);
129+
if (geometryType.equals("GeometryCollection")) {
130+
geometryInfo.put("geometries", originGeoNode.get("geometries"));
131+
} else {
132+
geometryInfo.put("coordinates", originGeoNode.get("coordinates"));
133+
}
134+
geometryInfo.put("srid", srid.orElse(0));
123135
return BinaryStringData.fromString(OBJECT_MAPPER.writeValueAsString(geometryInfo));
124136
} catch (Exception e) {
125137
throw new IllegalArgumentException(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,37 @@
2626

2727
/** A utility class for converting Postgres types to Flink types. */
2828
public class PostgresTypeUtils {
29+
private static final String PG_BIT = "bit";
30+
private static final String PG_BIT_ARRAY = "_bit";
31+
32+
private static final String PG_VARBIT = "varbit";
33+
private static final String PG_VARBIT_ARRAY = "_varbit";
34+
35+
private static final String PG_OID = "OID";
36+
37+
private static final String PG_CHAR = "char";
38+
private static final String PG_CHAR_ARRAY = "_char";
39+
40+
private static final String PG_TIMETZ = "timetz";
41+
private static final String PG_TIMETZ_ARRAY = "_timetz";
42+
43+
private static final String PG_INTERVAL = "interval";
44+
private static final String PG_INTERVAL_ARRAY = "_interval";
45+
46+
private static final String PG_JSON = "json";
47+
private static final String PG_JSONB = "jsonb";
48+
private static final String PG_XML = "xml";
49+
private static final String PG_POINT = "point";
50+
private static final String PG_LTREE = "ltree";
51+
private static final String PG_CITEXT = "citext";
52+
private static final String PG_INET = "inet";
53+
private static final String PG_INT4RANGE = "int4range";
54+
private static final String PG_INT8RANGE = "int8range";
55+
private static final String PG_NUMRANGE = "numrange";
56+
private static final String PG_TSTZRANGE = "tstzrange";
57+
private static final String PG_DATERANGE = "daterange";
58+
private static final String PG_ENUM = "enum";
59+
2960
private static final String PG_SMALLSERIAL = "smallserial";
3061
private static final String PG_SERIAL = "serial";
3162
private static final String PG_BIGSERIAL = "bigserial";
@@ -55,8 +86,8 @@ public class PostgresTypeUtils {
5586
private static final String PG_TIME_ARRAY = "_time";
5687
private static final String PG_TEXT = "text";
5788
private static final String PG_TEXT_ARRAY = "_text";
58-
private static final String PG_CHAR = "bpchar";
59-
private static final String PG_CHAR_ARRAY = "_bpchar";
89+
private static final String PG_BPCHAR = "bpchar";
90+
private static final String PG_BPCHAR_ARRAY = "_bpchar";
6091
private static final String PG_CHARACTER = "character";
6192
private static final String PG_CHARACTER_ARRAY = "_character";
6293
private static final String PG_CHARACTER_VARYING = "varchar";
@@ -88,6 +119,13 @@ private static DataType convertFromColumn(Column column) {
88119
switch (typeName) {
89120
case PG_BOOLEAN:
90121
return DataTypes.BOOLEAN();
122+
case PG_BIT:
123+
case PG_VARBIT:
124+
if (precision == 1) {
125+
return DataTypes.BOOLEAN();
126+
} else {
127+
return DataTypes.BINARY(precision);
128+
}
91129
case PG_BOOLEAN_ARRAY:
92130
return DataTypes.ARRAY(DataTypes.BOOLEAN());
93131
case PG_BYTEA:
@@ -106,8 +144,11 @@ private static DataType convertFromColumn(Column column) {
106144
return DataTypes.ARRAY(DataTypes.INT());
107145
case PG_BIGINT:
108146
case PG_BIGSERIAL:
147+
case PG_OID:
148+
case PG_INTERVAL:
109149
return DataTypes.BIGINT();
110150
case PG_BIGINT_ARRAY:
151+
case PG_INTERVAL_ARRAY:
111152
return DataTypes.ARRAY(DataTypes.BIGINT());
112153
case PG_REAL:
113154
return DataTypes.FLOAT();
@@ -130,9 +171,11 @@ private static DataType convertFromColumn(Column column) {
130171
}
131172
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0));
132173
case PG_CHAR:
174+
case PG_BPCHAR:
133175
case PG_CHARACTER:
134176
return DataTypes.CHAR(precision);
135177
case PG_CHAR_ARRAY:
178+
case PG_BPCHAR_ARRAY:
136179
case PG_CHARACTER_ARRAY:
137180
return DataTypes.ARRAY(DataTypes.CHAR(precision));
138181
case PG_CHARACTER_VARYING:
@@ -143,6 +186,10 @@ private static DataType convertFromColumn(Column column) {
143186
case PG_GEOMETRY:
144187
case PG_GEOGRAPHY:
145188
case PG_UUID:
189+
case PG_JSON:
190+
case PG_JSONB:
191+
case PG_XML:
192+
case PG_POINT:
146193
return DataTypes.STRING();
147194
case PG_TEXT_ARRAY:
148195
return DataTypes.ARRAY(DataTypes.STRING());
@@ -155,8 +202,10 @@ private static DataType convertFromColumn(Column column) {
155202
case PG_TIMESTAMPTZ_ARRAY:
156203
return DataTypes.ARRAY(new ZonedTimestampType(scale));
157204
case PG_TIME:
205+
case PG_TIMETZ:
158206
return DataTypes.TIME(scale);
159207
case PG_TIME_ARRAY:
208+
case PG_TIMETZ_ARRAY:
160209
return DataTypes.ARRAY(DataTypes.TIME(scale));
161210
case PG_DATE:
162211
return DataTypes.DATE();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,27 @@ public void testFullTypes() throws Exception {
195195
BinaryStringData.fromString(
196196
"{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187}"),
197197
BinaryStringData.fromString(
198-
"{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}")
198+
"{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326}"),
199+
true,
200+
new byte[] {10},
201+
new byte[] {42},
202+
BinaryStringData.fromString("abc"),
203+
1209600000000L,
204+
BinaryStringData.fromString(
205+
"{\"order_id\": 10248, \"product\": \"Notebook\", \"quantity\": 5}"),
206+
BinaryStringData.fromString(
207+
"{\"product\": \"Pen\", \"order_id\": 10249, \"quantity\": 10}"),
208+
BinaryStringData.fromString(
209+
"<user>\n"
210+
+ " <id>123</id>\n"
211+
+ " <name>Alice</name>\n"
212+
+ " <email>alice@example.com</email>\n"
213+
+ " <preferences>\n"
214+
+ " <theme>dark</theme>\n"
215+
+ " <notifications>true</notifications>\n"
216+
+ " </preferences>\n"
217+
+ " </user>"),
218+
BinaryStringData.fromString("(3.456,7.890)")
199219
};
200220

201221
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -259,5 +279,14 @@ private Object[] recordFields(RecordData record, RowType rowType) {
259279
DataTypes.TIME(0),
260280
DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE),
261281
DataTypes.STRING(),
282+
DataTypes.STRING(),
283+
DataTypes.BOOLEAN(),
284+
DataTypes.BINARY(8),
285+
DataTypes.BINARY(20),
286+
DataTypes.CHAR(3),
287+
DataTypes.BIGINT(),
288+
DataTypes.STRING(),
289+
DataTypes.STRING(),
290+
DataTypes.STRING(),
262291
DataTypes.STRING());
263292
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ CREATE TABLE full_types
4646
default_numeric_c NUMERIC,
4747
geometry_c GEOMETRY(POINT, 3187),
4848
geography_c GEOGRAPHY(MULTILINESTRING),
49+
bit_c BIT(1),
50+
bit_fixed_c BIT(8),
51+
bit_varying_c BIT VARYING(20),
52+
bpchar_c BPCHAR(3),
53+
duration_c INTERVAL,
54+
json_c JSON,
55+
jsonb_c JSONB,
56+
xml_C XML,
57+
location POINT,
4958
PRIMARY KEY (id)
5059
);
5160

@@ -56,4 +65,12 @@ INSERT INTO inventory.full_types
5665
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
5766
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
5867
'2020-07-17', '18:00:22', 500,'SRID=3187;POINT(174.9479 -36.7208)'::geometry,
59-
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
68+
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography,B'1',B'00001010',B'00101010','abc','2 weeks','{"order_id": 10248, "product": "Notebook", "quantity": 5}','{"order_id": 10249, "product": "Pen", "quantity": 10}'::jsonb,'<user>
69+
<id>123</id>
70+
<name>Alice</name>
71+
<email>alice@example.com</email>
72+
<preferences>
73+
<theme>dark</theme>
74+
<notifications>true</notifications>
75+
</preferences>
76+
</user>','(3.456,7.890)'::point);

0 commit comments

Comments
 (0)