Optimize deferrable execution mode for DbtCloudRunJobOperator#31188
Optimize deferrable execution mode for DbtCloudRunJobOperator#31188josh-fell merged 1 commit intoapache:mainfrom
DbtCloudRunJobOperator#31188Conversation
| self.log.info("Job run %s has completed successfully.", str(self.run_id)) | ||
| return self.run_id | ||
| elif job_run_status in ( | ||
| DbtCloudJobRunStatus.CANCELLED.value, |
There was a problem hiding this comment.
How do we want to handle the cancelled state? If the user has manually cancelled the job and does not want further processing, we should not retry such cancelled jobs. By default, Airflow will retry such tasks. We could raise AirflowFailException in such cases so that Airflow does not retry those tasks.
There was a problem hiding this comment.
Great question. I think this could be handled in a separate PR though.
I could see hard-failing the task on user cancellation being unexpected or expected. Perhaps this could be a new parameter to control how cancelled runs are handled?
| method_name="execute_complete", | ||
| ) | ||
| job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id) | ||
| job_run_status = self.hook.get_job_run_status(**job_run_info) |
There was a problem hiding this comment.
should we log the state here?
There was a problem hiding this comment.
This hook method does have logging lines of
self.log.info("Getting the status of job run %s.", str(run_id))and
self.log.info(
"Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name
)which should handle the state logging.
| method_name="execute_complete", | ||
| ) | ||
| job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id) | ||
| job_run_status = self.hook.get_job_run_status(**job_run_info) |
There was a problem hiding this comment.
This hook method does have logging lines of
self.log.info("Getting the status of job run %s.", str(run_id))and
self.log.info(
"Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name
)which should handle the state logging.
| self.log.info("Job run %s has completed successfully.", str(self.run_id)) | ||
| return self.run_id | ||
| elif job_run_status in ( | ||
| DbtCloudJobRunStatus.CANCELLED.value, |
There was a problem hiding this comment.
Great question. I think this could be handled in a separate PR though.
I could see hard-failing the task on user cancellation being unexpected or expected. Perhaps this could be a new parameter to control how cancelled runs are handled?
In deferrable mode for DbtCloudRunJobOperator, we should first check if job is in terminal state or not in the execute method and only defer if that is not in terminal state. This way we don’t run an unnecessary deferral cycle if the condition is already true.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.