-
Couldn't load subscription status.
- Fork 1.5k
GH-3350: Avoid flushing data to cloud when exception is thrown #3351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
This looks reasonable to me. WDYT? @gszadovszky @Fokko |
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
Outdated
Show resolved
Hide resolved
| this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); | ||
| serializeFooter(footer, out, fileEncryptor, metadataConverter); | ||
| } catch (Exception e) { | ||
| aborted = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not want to swallow the exception, just set the flag and re-throw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably do the same pattern for every public method that may throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not familiar with the direct buffer change, but in InternalParquetRecordWriter, there’s only one place where aborted is marked. Is that the only place that could cause an aborted write? If so, we don’t need to apply the same pattern to every public method in ParquetFileWriter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does look that way. The write function in InternalParquetRecordWriter is the only public function that can throw an exception (except close). So after we mark it as aborted there and abort the file write in the close call, we should cover all cases.
| AutoCloseables.uncheckedClose(parquetFileWriter); | ||
| } finally { | ||
| AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter); | ||
| AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have ParquetFileWriter to handle the "aborted" state, this change can be reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I haven't finish my change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, then. I was too fast. 😄
Ping me when you're ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not familiar with the direct buffer change, but in
InternalParquetRecordWriter, there’s only one place where aborted is marked. Is that the only place that could cause an aborted write? If so, we don’t need to apply the same pattern to every public method inParquetFileWriter.
This is true for your workflow where ParquetFileWriter is only used via InternalParquetRecordWriter. But the latter one is a public class and used directly in other workflows. For a more complete fix it would be nicer to handle this case as well.
| } | ||
| } | ||
|
|
||
| /* Mark the writer as aborted to avoid flushing incomplete data to the cloud. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "to the cloud" is not required. That is only one use-case.
| } catch (IOException e) { | ||
| throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required.
Rationale for this change
Inside the
InternalParquetRecordWriter::Closefinally block, we call close on parquetFileWriter, which may cause incomplete data to be flushed to the cloud if an exception is thrown during the close .What changes are included in this PR?
Remove
parquetFileWriter.closeout of finally block and added a unit test.Are these changes tested?
Yes.
Are there any user-facing changes?
Users wouldn't get incomplet parquet files because of torn writes.
Closes #3350