From 123641f3a9b3fdd4abe2316faecdf4c46e979ba9 Mon Sep 17 00:00:00 2001 From: Robert B Hamilton Date: Mon, 21 Jan 2019 20:17:11 -0500 Subject: [PATCH 1/3] Changes for Postgresql Copy Export: 1. Support for empty null string --null-string '' 2. Support for non-xml delim --fields-terminated-by'\0x1c' 3. Added line buffering perf --batch 4. optional TEXT mode instead of CSV -Dpostgresql.format.text=true 5. Support for postgres Version 8 -Dpostgresql.targetdb.ver=8 6. Optional disable escape sequences -Dpostgresql.input.israw=true --- .../postgresql/PostgreSQLCopyExportJob.java | 38 +++- .../PostgreSQLCopyExportMapper.java | 189 +++++++++++++----- .../org/apache/sqoop/util/LineBuffer.java | 33 +++ 3 files changed, 206 insertions(+), 54 deletions(-) create mode 100644 src/java/org/apache/sqoop/util/LineBuffer.java diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java index ea2b064e0..13f3b6d11 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java @@ -79,16 +79,46 @@ protected void configureMapper(Job job, String tableName, job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class); } +/* list of chars that cannot be passed via jobconf */ + final static String badXmlString = "\u0000\u0001\u0002\u0003\u0004\u0005" + + "\u0006\u0007\u0008\u000B\u000C\u000E\u000F\u0010\u0011\u0012" + + "\u0013\u0014\u0015\u0016\u0017\u0018\u0019\u001A\u001B\u001C" + + "\u001D\u001E\u001F\uFFFE\uFFFF"; + +/* true if the char is ok to pass via Configuration */ + public static boolean validXml(char s){ + return (badXmlString.indexOf(s)<0); + } + protected void propagateOptionsToJob(Job job) { super.propagateOptionsToJob(job); SqoopOptions opts = context.getOptions(); Configuration conf = job.getConfiguration(); - if (opts.getNullStringValue() != null) { - conf.set("postgresql.null.string", opts.getNullStringValue()); + + /* empty string needs to be passed as a flag */ + if (opts.getNullStringValue().equals("")) { + conf.set("postgresql.null.emptystring","true"); } - setDelimiter("postgresql.input.field.delim", - opts.getInputFieldDelim(), conf); + + /* valid delimiters may not be valid xml chars, so the hadoop conf will fail. + * but we still want to support them so we base64 encode it in that case + * */ + char delim= opts.getInputFieldDelim(); + String delimString=Character.toString(delim); + if(validXml(delim)){ + setDelimiter("postgresql.input.field.delim",delim,conf); + }else{ + conf.set("postgresql.input.field.delim.base64", + java.util.Base64.getEncoder().encodeToString(delimString.getBytes())); + } + + /* use the --batch switch to enable line buffering */ + if (opts.isBatchMode()){ + conf.set("postgresql.export.batchmode","true"); + } + +/* todo: there may still be some case where user wants an invalid xml char for record delim */ setDelimiter("postgresql.input.record.delim", opts.getInputRecordDelim(), conf); setDelimiter("postgresql.input.enclosedby", diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java index cf9a3cd11..82e26b535 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java @@ -36,6 +36,7 @@ import org.postgresql.PGConnection; import org.postgresql.copy.CopyManager; import org.postgresql.copy.CopyIn; +import org.apache.sqoop.util.LineBuffer; /** @@ -52,6 +53,10 @@ public class PostgreSQLCopyExportMapper public static final Log LOG = LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName()); + private boolean bufferMode=false; /* whether or not to use the line buffer */ + private LineBuffer lineBuffer; /* batch up the lines before sending to copy */ + private boolean isRaw=false; /* if isRaw then we won't interprete escapes */ + private Configuration conf; private DBConfiguration dbConf; private Connection conn = null; @@ -63,6 +68,17 @@ public class PostgreSQLCopyExportMapper public PostgreSQLCopyExportMapper() { } + /* Text mode normally interprets escape sequences. Optionally + * turn that off by escaping the escapes + * */ + public String fixEscapes(String s){ + if (isRaw){ + return s.replace("\\","\\\\"); + } else { + return s; + } + } + @Override protected void setup(Context context) @@ -71,6 +87,7 @@ protected void setup(Context context) super.setup(context); conf = context.getConfiguration(); dbConf = new DBConfiguration(conf); + lineBuffer=new LineBuffer(); CopyManager cm = null; try { conn = dbConf.getConnection(); @@ -83,69 +100,141 @@ protected void setup(Context context) throw new IOException(ex); } try { - StringBuilder sql = new StringBuilder(); - sql.append("COPY "); - sql.append(dbConf.getOutputTableName()); - sql.append(" FROM STDIN WITH ("); - sql.append(" ENCODING 'UTF-8' "); - sql.append(", FORMAT csv "); - sql.append(", DELIMITER "); - sql.append("'"); - sql.append(conf.get("postgresql.input.field.delim", ",")); - sql.append("'"); - sql.append(", QUOTE "); - sql.append("'"); - sql.append(conf.get("postgresql.input.enclosedby", "\"")); - sql.append("'"); - sql.append(", ESCAPE "); - sql.append("'"); - sql.append(conf.get("postgresql.input.escapedby", "\"")); - sql.append("'"); - if (conf.get("postgresql.null.string") != null) { - sql.append(", NULL "); - sql.append("'"); - sql.append(conf.get("postgresql.null.string")); - sql.append("'"); - } - sql.append(")"); + /* Set if buffering mode is requested */ + this.bufferMode=("true".equals(conf.get("postgresql.export.batchmode"))); + + /* isRaw means escapes are NOT to be interpreted */ + this.isRaw=("true".equals(conf.get("postgresql.input.israw"))); + + /* add support for delims which are not valid xml. We have base64 encoded them */ + String delimBase64=conf.get("postgresql.input.field.delim.base64"); + String delim=null; + if (delimBase64!=null){ + delim=new String(java.util.Base64.getDecoder().decode(delimBase64)); + } else { + delim=conf.get("postgresql.input.field.delim",","); + } + + /* Some postgres instances out there still using version 8.x */ + StringBuilder sql = new StringBuilder(); + String ver=conf.get("postgresql.targetdb.ver", "9"); + if (ver.equals("8")){ + sql.append("COPY "); + sql.append(dbConf.getOutputTableName()); + sql.append(" FROM STDIN WITH "); + sql.append(" DELIMITER "); + sql.append("'"); + sql.append(delim); + sql.append("'"); + if (! "true".equals(conf.get("postgresql.format.text"))){ + sql.append(" CSV "); + sql.append(" QUOTE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.enclosedby", "\"")); + sql.append("'"); + sql.append(" ESCAPE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.escapedby", "\"")); + sql.append("'"); + } + /* Hadoop config does not permit empty string so we use special switch to designate that */ + if (conf.get("postgresql.null.emptystring")!=null){ + sql.append(" NULL ''"); + }else + if (conf.get("postgresql.null.string") != null) { + sql.append(" NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } + } else { /* intended for version 9.x This has not been fixed for buffering */ + sql.append("COPY "); + sql.append(dbConf.getOutputTableName()); + sql.append(" FROM STDIN WITH ("); + sql.append(" ENCODING 'UTF-8' "); + sql.append(", FORMAT csv "); + sql.append(", DELIMITER "); + sql.append("'"); + sql.append(conf.get("postgresql.input.field.delim", ",")); + sql.append("'"); + sql.append(", QUOTE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.enclosedby", "\"")); + sql.append("'"); + sql.append(", ESCAPE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.escapedby", "\"")); + sql.append("'"); + if (conf.get("postgresql.null.string") != null) { + sql.append(", NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } + sql.append(")"); + } LOG.debug("Starting export with copy: " + sql); copyin = cm.copyIn(sql.toString()); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex); - close(); - throw new IOException(ex); - } + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex); + close(); + throw new IOException(ex); + } } @Override public void map(LongWritable key, Writable value, Context context) throws IOException, InterruptedException { - line.setLength(0); - line.append(value.toString()); - if (value instanceof Text) { - line.append(System.getProperty("line.separator")); - } - try { - byte[]data = line.toString().getBytes("UTF-8"); - copyin.writeToCopy(data, 0, data.length); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to execute copy", ex); - close(); - throw new IOException(ex); - } + if (bufferMode){ + if (lineBuffer.append(fixEscapes(value.toString()))){ + return; + } + /* else buffer is full lets write out */ + try { + byte[]data=lineBuffer.getBytes(); + copyin.writeToCopy(data,0,data.length); + lineBuffer.clear(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute copy", ex); + close(); + throw new IOException(ex); + } + + /* now write the new line that could not be appended because the buffer was full */ + lineBuffer.append(fixEscapes(value.toString())); + } else { /* original unbuffered method */ + line.setLength(0); + line.append(value.toString()); + if (value instanceof Text) { + line.append(System.getProperty("line.separator")); + } + try { + byte[]data = line.toString().getBytes("UTF-8"); + copyin.writeToCopy(data, 0, data.length); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute copy", ex); + close(); + throw new IOException(ex); + } + } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { - try { - copyin.endCopy(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to finalize copy", ex); - throw new IOException(ex); + try { /* write out the final fragment in the buffer */ + if (bufferMode){ + byte[]data=lineBuffer.getBytes(); + copyin.writeToCopy(data,0,data.length); + lineBuffer.clear(); + } + copyin.endCopy(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to finalize copy", ex); + throw new IOException(ex); + } + close(); } - close(); - } void close() throws IOException { if (conn != null) { diff --git a/src/java/org/apache/sqoop/util/LineBuffer.java b/src/java/org/apache/sqoop/util/LineBuffer.java new file mode 100644 index 000000000..474ac552e --- /dev/null +++ b/src/java/org/apache/sqoop/util/LineBuffer.java @@ -0,0 +1,33 @@ +package org.apache.sqoop.util; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* Buffer up lines of text until buffer is full. + * This helper class is just to support a test to + * see if the copyIn mapper for postgres is unbuffered + * + * */ +public class LineBuffer{ + public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName()); + + private StringBuilder sb=new StringBuilder(); + private static int MAXLEN=100000000; + //private static int MAXLEN=50000000; + + public void clear(){ sb.setLength(0); } + public int length(){return sb.length();} + public boolean append(String s){ +// LOG.debug(s); + if (sb.length()+s.length()+1>MAXLEN){return false;} + sb.append(s); + sb.append("\n"); + return true; + } + public String toString(){return sb.toString();} + public byte[] getBytes() { + try { + //LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8"))); + return sb.toString().getBytes("UTF-8"); + }catch(Exception e){e.printStackTrace();return null;} + } +} From 4916a0ed2620a54dd29c57cbb166d0343d3e5b0f Mon Sep 17 00:00:00 2001 From: Robert B Hamilton Date: Mon, 21 Jan 2019 22:18:01 -0500 Subject: [PATCH 2/3] Fixed null pointer error in testExportDirect --- .../sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java index 13f3b6d11..3ae0a59b1 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java @@ -97,7 +97,7 @@ protected void propagateOptionsToJob(Job job) { Configuration conf = job.getConfiguration(); /* empty string needs to be passed as a flag */ - if (opts.getNullStringValue().equals("")) { + if ("".equals(opts.getNullStringValue())) { conf.set("postgresql.null.emptystring","true"); } From be539f183b6ac24c5314c9bee95ce57c2c96803c Mon Sep 17 00:00:00 2001 From: Robert B Hamilton Date: Tue, 22 Jan 2019 12:06:01 -0500 Subject: [PATCH 3/3] cleanups: 1. moved LineBuffer to inner class of the Export Mapper 2. extended support to 9.x direct copy 3. Added test case --- .../PostgreSQLCopyExportMapper.java | 49 ++++++++++++++++--- .../org/apache/sqoop/util/LineBuffer.java | 33 ------------- .../postgresql/PostgresqlExportTest.java | 14 ++++++ 3 files changed, 56 insertions(+), 40 deletions(-) delete mode 100644 src/java/org/apache/sqoop/util/LineBuffer.java diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java index 82e26b535..100d78756 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java @@ -36,7 +36,7 @@ import org.postgresql.PGConnection; import org.postgresql.copy.CopyManager; import org.postgresql.copy.CopyIn; -import org.apache.sqoop.util.LineBuffer; +//import org.apache.sqoop.util.LineBuffer; /** @@ -66,6 +66,36 @@ public class PostgreSQLCopyExportMapper new DelimiterSet(',', '\n', DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false); +/* Buffer up lines of text until buffer is full. + * This helper class is just to support a test to + * see if the copyIn mapper for postgres is unbuffered + * + * */ +static class LineBuffer{ + public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName()); + + private StringBuilder sb=new StringBuilder(); + private static int MAXLEN=100000000; + //private static int MAXLEN=50000000; + + public void clear(){ sb.setLength(0); } + public int length(){return sb.length();} + public boolean append(String s){ +// LOG.debug(s); + if (sb.length()+s.length()+1>MAXLEN){return false;} + sb.append(s); + sb.append("\n"); + return true; + } + public String toString(){return sb.toString();} + public byte[] getBytes() { + try { + //LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8"))); + return sb.toString().getBytes("UTF-8"); + }catch(Exception e){e.printStackTrace();return null;} + } +} + public PostgreSQLCopyExportMapper() { } /* Text mode normally interprets escape sequences. Optionally @@ -155,7 +185,7 @@ protected void setup(Context context) sql.append(", FORMAT csv "); sql.append(", DELIMITER "); sql.append("'"); - sql.append(conf.get("postgresql.input.field.delim", ",")); + sql.append(delim); sql.append("'"); sql.append(", QUOTE "); sql.append("'"); @@ -165,11 +195,16 @@ protected void setup(Context context) sql.append("'"); sql.append(conf.get("postgresql.input.escapedby", "\"")); sql.append("'"); - if (conf.get("postgresql.null.string") != null) { - sql.append(", NULL "); - sql.append("'"); - sql.append(conf.get("postgresql.null.string")); - sql.append("'"); + /* Hadoop config does not permit empty string so we use special switch to designate that */ + if (conf.get("postgresql.null.emptystring")!=null){ + sql.append(", NULL ''"); + }else { + if (conf.get("postgresql.null.string") != null) { + sql.append(", NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } } sql.append(")"); } diff --git a/src/java/org/apache/sqoop/util/LineBuffer.java b/src/java/org/apache/sqoop/util/LineBuffer.java deleted file mode 100644 index 474ac552e..000000000 --- a/src/java/org/apache/sqoop/util/LineBuffer.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.sqoop.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/* Buffer up lines of text until buffer is full. - * This helper class is just to support a test to - * see if the copyIn mapper for postgres is unbuffered - * - * */ -public class LineBuffer{ - public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName()); - - private StringBuilder sb=new StringBuilder(); - private static int MAXLEN=100000000; - //private static int MAXLEN=50000000; - - public void clear(){ sb.setLength(0); } - public int length(){return sb.length();} - public boolean append(String s){ -// LOG.debug(s); - if (sb.length()+s.length()+1>MAXLEN){return false;} - sb.append(s); - sb.append("\n"); - return true; - } - public String toString(){return sb.toString();} - public byte[] getBytes() { - try { - //LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8"))); - return sb.toString().getBytes("UTF-8"); - }catch(Exception e){e.printStackTrace();return null;} - } -} diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java index 457b398aa..5361a427d 100644 --- a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java +++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java @@ -352,6 +352,19 @@ public void testExportDirect() throws IOException, SQLException { "3,Fred,2009-01-23,15,marketing", }); + String[] extra = new String[] {"--direct","--batch"}; + + runExport(getArgv(true, extra)); + + assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); + } + @Test + public void testExportDirectBatch() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + String[] extra = new String[] {"--direct"}; runExport(getArgv(true, extra)); @@ -359,6 +372,7 @@ public void testExportDirect() throws IOException, SQLException { assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); } + @Test public void testExportCustomSchema() throws IOException, SQLException { createTestFile("inputFile", new String[] {