feat: add error and success handlers to BulkWriter#458
feat: add error and success handlers to BulkWriter#458thebrianchen wants to merge 13 commits intomasterfrom
Conversation
843cd2e to
d92d1da
Compare
| * | ||
| * <p>The writes in the batch are not applied atomically and can be applied out of order. | ||
| */ | ||
| ApiFuture<List<BatchWriteResult>> bulkCommit() { |
There was a problem hiding this comment.
I tried extracting a commitHelper() from bulkCommit() and commit(), but couldn't figure out how to properly parameterize everything.
Aside from reducing code duplication, the main reason I want to do this is because moving this over requires a getter for writes and exposing committed as protected. If we could move the logic for these parts into UpdateBuilder, we could make both of those variables private.
There was a problem hiding this comment.
I tried to come up with something here as well but mostly failed. The two Proto classes don't share a common ancestor that would help us here, which rules out extracting a common function that operates on a shared base type.
My one suggestion would be to have a prepareCommit() method that returns a list of Writes and sets committed to true. I, however, don't think that this will actually save any code or make the code more readable. I am not sure if that is worth pursuing.
There was a problem hiding this comment.
Ok, if that's the case I think we should just not try and refactor then.
dceab85 to
906165a
Compare
Codecov Report
@@ Coverage Diff @@
## master #458 +/- ##
============================================
+ Coverage 73.40% 73.71% +0.31%
- Complexity 1086 1091 +5
============================================
Files 65 66 +1
Lines 5786 5828 +42
Branches 723 724 +1
============================================
+ Hits 4247 4296 +49
+ Misses 1313 1306 -7
Partials 226 226
Continue to review full report at Codecov.
|
schmidt-sebastian
left a comment
There was a problem hiding this comment.
This looks really good!
| * | ||
| * <p>The writes in the batch are not applied atomically and can be applied out of order. | ||
| */ | ||
| ApiFuture<List<BatchWriteResult>> bulkCommit() { |
There was a problem hiding this comment.
I tried to come up with something here as well but mostly failed. The two Proto classes don't share a common ancestor that would help us here, which rules out extracting a common function that operates on a shared base type.
My one suggestion would be to have a prepareCommit() method that returns a list of Writes and sets committed to true. I, however, don't think that this will actually save any code or make the code more readable. I am not sure if that is worth pursuing.
| * Resolves the individual operations in the batch with the results and removes the entry from the | ||
| * pendingOperations map if the result is not retryable. | ||
| */ | ||
| void newProcessResults(List<BatchWriteResult> results) { |
There was a problem hiding this comment.
Is this the final name for this function? :)
| * @param error The error object from the failed BulkWriter operation attempt. | ||
| * @return Whether to retry the operation or not. | ||
| */ | ||
| boolean shouldRetryListener(BulkWriterError error); |
There was a problem hiding this comment.
This is not a listener, this is the function that is invoked by the listener. I think this should be onError, shouldRetry, or shouldRetryOnError.
| operationType, | ||
| failedAttempts); | ||
|
|
||
| boolean shouldRetry = errorListener.shouldRetryListener(bulkWriterError); |
There was a problem hiding this comment.
This should be executed on the User-specified executor (via FirestoreOptions - the same as
)There was a problem hiding this comment.
Done here and in the success callback.
| return ApiFutures.transformAsync( | ||
| ApiFutures.catchingAsync( | ||
| batch.bulkCommit(), | ||
| Exception.class, |
There was a problem hiding this comment.
This should probably be a Throwable so we can also catch RuntimeException.
There was a problem hiding this comment.
Based on Java docs, RunTimeException extends Exception. The main issue with using Throwable is that BatchWriteResult takes in Exception, and I would have to plumb that through.
| bulkWriter.addWriteErrorListener( | ||
| new WriteErrorCallback() { | ||
| public boolean shouldRetryListener(BulkWriterError error) { | ||
| errorListenerCalled[0] = true; |
There was a problem hiding this comment.
Maybe assert that we received an INTERNAL error here.
| bulkWriter.addWriteErrorListener( | ||
| new WriteErrorCallback() { | ||
| public boolean shouldRetryListener(BulkWriterError error) { | ||
| throw new NullPointerException("Test code threw NullPointerException"); |
There was a problem hiding this comment.
This is probably not a good exception type since this is likely thrown by our code as well. I would advise to use something different - maybe UnsupportedOperationException?
| bulkWriter.addWriteResultListener( | ||
| new WriteResultCallback() { | ||
| public void onResult(DocumentReference documentReference, WriteResult result) { | ||
| throw new NullPointerException("Test code threw NullPointerException"); |
There was a problem hiding this comment.
Same comment as above.
| * bulkWriter.addWriteResultListener( | ||
| * new WriteResultCallback() { | ||
| * public void onResult(DocumentReference documentReference, WriteResult result) { | ||
| * System.out.println( | ||
| * "Successfully executed write on document: " | ||
| * + documentReference | ||
| * + " at: " | ||
| * + result.getUpdateTime()); | ||
| * } | ||
| * }); |
There was a problem hiding this comment.
You should probably just use Lamdba syntax here.
| * System.out.println("Failed write at document: " + error.getDocumentReference()); | ||
| * return false; | ||
| * } | ||
| * }); |
There was a problem hiding this comment.
You should probably use Lambda syntax here. If you stick with the current example, you need to add the function signature.
| * @return Whether to retry the operation or not. | ||
| */ | ||
| boolean shouldRetryListener(BulkWriterError error); | ||
| boolean shouldRetryError(BulkWriterError error); |
There was a problem hiding this comment.
I am thinking more and more that this should be onError. Otherwise, users are going to ask us how they can get the error result and why we don't expose it in WriteResultCallback. Note also that the term result is a bit ambiguous as it doesn't define whether "error" is a "result".
We should maybe also ask for a third opinion of whether the classes are "callbacks". On Android, we seem to call the outermost class "Listener". The method inside is prefixed with "on" (see EventListener).
There was a problem hiding this comment.
Changed to onError. As for WriteResult, we discussed naming for the node implementation and decided on WriteResult over WriteSuccess since a WriteResult is returned from the operation.
Would it be too sadistic to ask Ben to take a look at this PR for Java code style/convention aftewards?
There was a problem hiding this comment.
We should definitely ask Ben :)
| successListener.onResult(documentReference, result); | ||
| public ApiFuture<WriteResult> apply(WriteResult result) | ||
| throws ExecutionException, InterruptedException { | ||
| invokeUserSuccessCallback(documentReference, result).get(); |
There was a problem hiding this comment.
This looks like it may block the GRPC thread while the user code is running. Instead, you should do one of the following:
- Run your code
- Run user code in user thread, followed by a continuation that dispatches back on your old exectur
- Finish running your own code
Realistically, though, no one is going to get mad if you run a little bit of your own code on the use executor (as long as it is non-blocking and doesn't throw). For that reason, you can likely just use the user executor when the callback is first invoked (e.g. replace the directExecutor call in executeWrite() if deemed safe).
There was a problem hiding this comment.
The main issue with dispatching the code onto the user executor is that we're no longer guaranteed that the write is enqueued onto the BatchQueue before flush() is called. Instead, after our discussion, I exposed two user executors that the user could pass in to run their callbacks on. We're blocking on the user-callbacks, but not on the user or grpc threads.
There was a problem hiding this comment.
Summary of conversation w/ Ben: We should not block grpc or gax threads. Instead, we should be creating a thread on each firestore instance that awaits user callbacks.
Solution: Created a separate thread for waiting on the callback. Fixed async issue by fixing bug in performFlush(), where we were incorrectly resolving the flushComplete future early if no retries were scheduled.
| * bulkWriter.addWriteResultListener( | ||
| * new WriteResultCallback() { | ||
| * public void onResult(DocumentReference documentReference, WriteResult result) { | ||
| * (DocumentReference documentReference, WriteResult result -> { |
There was a problem hiding this comment.
Missing ")".
We should make sure these compile as users are going to copy them.
There was a problem hiding this comment.
Added. Checked that code compiles.
|
Continued in #483. |
Porting of several node PRs: 1, 2, 3.
Another fix I made was reducing the
MAX_BATCH_SIZEfrom 500 to 20. I had to do this after refactoring UpdateBuilder, since the previous code imposed the same limits on WriteBatch/Transactions, causing the system tests to fail.I've tried to order the commits to make it more reviewable, but after finishing it up the first time, I went back and did more cleanup.