Sensor for Databricks partition and table changes#28950
Sensor for Databricks partition and table changes#28950harishkrao wants to merge 59 commits intoapache:mainfrom
Conversation
|
@alexott it would be great if you can review the PR and suggest feedback. Thank you for your time. |
alexott
left a comment
There was a problem hiding this comment.
Thank you for your contribution!
I would make it the DatabricksSqlSensor really generic by allowing to pass an arbitrary SQL expression that will trigger sensor if it has non-empty result. And then on top of it we can build Partition & History change sensors.
Also we need:
- Documentation
- Sensor should be declared in the
airflow/providers/databricks/provider.yaml
There was a problem hiding this comment.
You don't need include type to the docstrings, just use :param {argname}: {description}, all information from about expected types get from annotations.
Include this may cause issues when documentation will generated.
There was a problem hiding this comment.
Removed the type. Thanks for the feedback.
There was a problem hiding this comment.
We don't use Google style in docstring, please use reStructuredText
There was a problem hiding this comment.
I think we do not need any default value for timestamp.
If it mandatory field, just make it mandatory and user should provide actual value here
There was a problem hiding this comment.
The reason why I thought it needs to have some default value is to extract history for a time period irrespective of a custom value provided by the user, for example - past 7 days.
|
Also, it makes sense to declare some of properties as templatised. For example, partitions mapping, etc. |
|
Also, for partition sensor it would make sense to allow to specify operations on the partitions, like, allow not only |
@alexott agree, that would be good to implement. Do you have an example to follow for this one? |
|
Yes, just look into DatabricksSqlOperator - it allows to templatize |
9b1b051 to
968e8d1
Compare
73953ce to
0030fc4
Compare
|
@alexott @Taragolis Thank you for taking the time to review the PR and for giving valuable feedback. I have addressed all of the comments. It would be great if you can review the changes. Thank you again! |
6a9e11b to
7781bd7
Compare
There was a problem hiding this comment.
We can combine all of them under the Databricks SQL umbrella
There was a problem hiding this comment.
let hardcode caller instead of passing it as an argument.
There was a problem hiding this comment.
Can we inherit most of the parameters from DatabricksSqlSensor?
There was a problem hiding this comment.
If we pass all parameters to the DatabricksSqlSensor, then we can simply inherit that hook from it.
There was a problem hiding this comment.
Good point, implemented the change.
There was a problem hiding this comment.
it will be always executed this way because we're passing empty string as default. Also, we can continue to rely on two-level and even one level naming, relying on default catalog & default schema...
There was a problem hiding this comment.
this is really just return len(result) > 0
There was a problem hiding this comment.
Same comments as for partition sensor
There was a problem hiding this comment.
what about selecting max(version) instead of count?
There was a problem hiding this comment.
Agree, will change it.
There was a problem hiding this comment.
if time_range doesn't change between calls, we may return true all the time. Why not compare with the latest version instead?
There was a problem hiding this comment.
Does it make sense to report only data changes? Otherwise we'll get not necessary changes like after VACUUM, OPTIMIZE, ...
There was a problem hiding this comment.
I am working on this, will push the changes soon.
There was a problem hiding this comment.
Added these changes in the recent push.
There was a problem hiding this comment.
Same comment as for partition sensor.
There was a problem hiding this comment.
@alexott FYI: The Escaper class was not available on the main branch. So, I added it as part of this PR to be used in our provider.
Also, made some minor changes to exception handling in the Escaper class because it was looking for some user defined Exception classes from pyhive.
There was a problem hiding this comment.
Hmmm, it should be a part of databricks-sql-connector: https://github.com/databricks/databricks-sql-python/blob/main/src/databricks/sql/utils.py#L121
There was a problem hiding this comment.
How do I use the Escaper class in databricks-sql-python within Airflow? Do I add it to setup.py to be installed when Airflow starts?
There was a problem hiding this comment.
Thanks for the help, I imported it via from databricks.sql.utils import ParamEscaper
alexott
left a comment
There was a problem hiding this comment.
There are some minor changes required, but otherwise - looks good
There was a problem hiding this comment.
why starting with _? It then doesn't match other operators
There was a problem hiding this comment.
Changed it and made it consistent with other operators.
There was a problem hiding this comment.
what about timestamps aka datetime.datetime ?
There was a problem hiding this comment.
We do not need it, removed it.
There was a problem hiding this comment.
Not all parameters for __init__ are documented.
There was a problem hiding this comment.
this handles lack of results differently than the generic sensor.
There was a problem hiding this comment.
Unified the handling across classes.
There was a problem hiding this comment.
same here - unify names with base operator
There was a problem hiding this comment.
could be simpler to do operation NOT IN ('CONVERT', 'OPTIMIZE', ...) ?
There was a problem hiding this comment.
Also, need to add FSCK. See full list here: https://docs.delta.io/latest/delta-utility.html#retrieve-delta-table-history
There was a problem hiding this comment.
Yes, added FSCK. Changed it to a NOT IN filter. Additionally, interesting to note that (on running a few commands like) FSCK, OPTIMIZE are not recorded in the history of the Delta table.
There was a problem hiding this comment.
Regarding VACUUM START, ... let's try to use just single % -> VACUUM% to avoid for searching by substring
There was a problem hiding this comment.
Good point, changed it.
There was a problem hiding this comment.
what about making this optional? for example, I want to check changes without taking timestamp into account
There was a problem hiding this comment.
Hmmm, it should be a part of databricks-sql-connector: https://github.com/databricks/databricks-sql-python/blob/main/src/databricks/sql/utils.py#L121
@alexott thank you for taking the time to review! I resolved all the comments (except the escaper class). |
d18d4a7 to
bf87dba
Compare
|
@harishkrao does this PR solves #21381 ? |
bf87dba to
d5dd505
Compare
alexott
left a comment
There was a problem hiding this comment.
See my comment about returning false vs. throwing an exception when there is no results.
But primary request for changes is for adding missing pieces:
- We need documentation be added as well
- Documentation should include examples - add a sensor example to
tests/system/providers/databricks- it will be used for integration tests
@alexott just pushed an example DAG file, similar to the ones for Operators. |
d73e516 to
656814e
Compare
eladkal
left a comment
There was a problem hiding this comment.
Apparently I missed stuff because I reviewed from my phone
I think the code requires some more work. I'm not familiar with Databricks but the sensors seems very complex and I wonder for all of them if logic shouldn't be in hook?
@o-nikolas @josh-fell i appreciate another eye here.
| def get_previous_version(context: Context, lookup_key): | ||
| return context["ti"].xcom_pull(key=lookup_key, include_prior_dates=True) |
There was a problem hiding this comment.
I don't understand the xcom part.
Why the sensor push and pull from xcom on every poke?
There was a problem hiding this comment.
We store metadata on the most recently queried version for that table. And we send/receive them using xcom. On querying the current version from Databricks, we compare it with the one stored in the metadata and take an action accordingly.
There was a problem hiding this comment.
I don't believe there is a guarantee that the most recent XCom will be pulled here. Behind the scenes XCom.get_many() is called and just retrieves the first record. At the mercy of the metadatabase being used there.
Also, what happens in a mapped operator situation? If the XCom key is always the same, it seems possible this can pull an XCom key for an entirely different task since task_ids and map_index is not specified in the xcom_pull() call.
Another question then would be what if the input args are the same (i.e. checking for changes in the same table) but a user simply updates the task_id. Would this sensor yield a false positive that there was indeed a change?
I don't necessarily have answers to these questions on the top of my head, but some things to think about with using XComs in this way.
| def _get_results_table_changes(self, context) -> bool: | ||
| complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name) | ||
| self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
|
|
||
| prev_version = -1 | ||
| if context is not None: | ||
| lookup_key = complete_table_name | ||
| prev_data = self.get_previous_version(lookup_key=lookup_key, context=context) | ||
| self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data)) | ||
| if isinstance(prev_data, int): | ||
| prev_version = prev_data | ||
| elif prev_data is not None: | ||
| raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data)) | ||
| version = self.get_current_table_version(table_name=complete_table_name) | ||
| self.log.debug("Current version: %s", version) | ||
| if version is None: | ||
| return False | ||
| if prev_version < version: | ||
| result = True | ||
| else: | ||
| return False | ||
| if prev_version != version: | ||
| self.set_version(lookup_key=lookup_key, version=version, context=context) | ||
| self.log.debug("Result: %s", result) | ||
| return result | ||
| return False | ||
|
|
||
| def poke(self, context: Context) -> bool: | ||
| return self._get_results_table_changes(context=context) |
There was a problem hiding this comment.
This looks very complicated.
Sensor should ask simple question. Most of the logic for the operations should be functions in the hook (so they can also be utalized for other sensor/custom sensor users can create)
There was a problem hiding this comment.
@alexott can you please weigh in on the design decisions we made to arrive at this pattern?
There was a problem hiding this comment.
+1. Some of these functions look like they could be handy if made generally available as part of a hook.
| output_list.append( | ||
| f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}""" | ||
| ) | ||
| # TODO: Check date types. |
There was a problem hiding this comment.
I can remove this.
| from airflow.utils.context import Context | ||
|
|
||
|
|
||
| class DatabricksSqlSensor(BaseSensorOperator): |
There was a problem hiding this comment.
I'm missing something here.
If this sensor leverages DbApiHook why doesn't it subclass SqlSensor?
There was a problem hiding this comment.
I can change it to inherit the SqlSensor.
@alexott can you please elaborate on the reasoning for why we wrote the sensors with this design? |
| ) as dag: | ||
| # [docs] | ||
| connection_id = "databricks_default" | ||
| sql_endpoint_name = "Starter Warehouse" |
There was a problem hiding this comment.
We usually add test code to setup the resource under test, or at least make it configurable (os env var etc) so that users can setup their own and supply the correct config to test against it.
| from airflow.providers.databricks.sensors.sql import DatabricksSqlSensor | ||
| from airflow.providers.databricks.sensors.table_changes import DatabricksTableChangesSensor | ||
|
|
||
| # [docs] |
There was a problem hiding this comment.
What is the purpose of these tags?
| self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data)) | ||
| if isinstance(prev_data, int): | ||
| prev_version = prev_data | ||
| elif prev_data is not None: | ||
| raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data)) |
There was a problem hiding this comment.
IMHO all this logic should be inside get_previous_version() rather than here.
| version = self.get_current_table_version(table_name=complete_table_name) | ||
| self.log.debug("Current version: %s", version) | ||
| if version is None: | ||
| return False | ||
| if prev_version < version: | ||
| result = True | ||
| else: | ||
| return False | ||
| if prev_version != version: | ||
| self.set_version(lookup_key=lookup_key, version=version, context=context) |
There was a problem hiding this comment.
I don't think I fully understand this logic. The two False cases can certainly be collapsed to be more compact, but also, shouldn't the false case be setting result rather than returning? If they return then the code to store the version in xcom is not executed. You're basically always comparing version with the prev_version default value of -1 from what I can tell.
| self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
|
|
||
| prev_version = -1 | ||
| if context is not None: |
There was a problem hiding this comment.
Are we really worried about context being missing?
If so, then just add a statement like:
if not context:
return FalseThis way the whole main block of code doesn't have to be indented.
Also if context is really missing you may want to throw an exception instead of just returning False.
| def set_version(context: Context, lookup_key, version): | ||
| context["ti"].xcom_push(key=lookup_key, value=version) | ||
|
|
||
| def get_current_table_version(self, table_name): |
There was a problem hiding this comment.
Can any or all of this be pushed into the hook? Validating things like operators doesn't seem like the right thing
| if len(partition_columns) < 1: | ||
| raise AirflowException("Table %s does not have partitions", table_name) | ||
| formatted_opts = "" | ||
| if opts is not None and len(opts) > 0: |
There was a problem hiding this comment.
| if opts is not None and len(opts) > 0: | |
| if opts: |
| from airflow.providers.common.sql.hooks.sql import fetch_all_handler | ||
| from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook | ||
| from airflow.sensors.base import BaseSensorOperator | ||
| from airflow.utils.context import Context |
There was a problem hiding this comment.
Since this import is only used for typing it should be put behind typing.TYPE_CHECKING. One fewer import at runtime. Applicable to all of the other net-new modules in this PR too.
| if len(result) < 1: | ||
| return False | ||
| return True |
There was a problem hiding this comment.
| if len(result) < 1: | |
| return False | |
| return True | |
| return bool(result) |
Small optimization.
| """Sensor to execute SQL statements on a Delta table via Databricks. | ||
|
|
||
| :param databricks_conn_id: Reference to :ref:`Databricks | ||
| connection id<howto/connection:databricks>` (templated), defaults to | ||
| DatabricksSqlHook.default_conn_name | ||
| :param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster. | ||
| If not specified, it should be either specified in the Databricks connection's | ||
| extra parameters, or ``sql_endpoint_name`` must be specified. | ||
| :param sql_endpoint_name: Optional name of Databricks SQL Endpoint. If not specified, ``http_path`` | ||
| must be provided as described above, defaults to None | ||
| :param session_configuration: An optional dictionary of Spark session parameters. If not specified, | ||
| it could be specified in the Databricks connection's extra parameters., defaults to None | ||
| :param http_headers: An optional list of (k, v) pairs | ||
| that will be set as HTTP headers on every request. (templated). | ||
| :param catalog: An optional initial catalog to use. | ||
| Requires DBR version 9.0+ (templated), defaults to "" | ||
| :param schema: An optional initial schema to use. | ||
| Requires DBR version 9.0+ (templated), defaults to "default" | ||
| :param sql: SQL statement to be executed. | ||
| :param handler: Handler for DbApiHook.run() to return results, defaults to fetch_all_handler | ||
| :param client_parameters: Additional parameters internal to Databricks SQL Connector parameters. | ||
| """ |
There was a problem hiding this comment.
There are two sets of docstrings for this sensor's construction. Can you consolidate please?
| def _get_results_table_changes(self, context) -> bool: | ||
| complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name) | ||
| self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
|
|
||
| prev_version = -1 | ||
| if context is not None: | ||
| lookup_key = complete_table_name | ||
| prev_data = self.get_previous_version(lookup_key=lookup_key, context=context) | ||
| self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data)) | ||
| if isinstance(prev_data, int): | ||
| prev_version = prev_data | ||
| elif prev_data is not None: | ||
| raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data)) | ||
| version = self.get_current_table_version(table_name=complete_table_name) | ||
| self.log.debug("Current version: %s", version) | ||
| if version is None: | ||
| return False | ||
| if prev_version < version: | ||
| result = True | ||
| else: | ||
| return False | ||
| if prev_version != version: | ||
| self.set_version(lookup_key=lookup_key, version=version, context=context) | ||
| self.log.debug("Result: %s", result) | ||
| return result | ||
| return False | ||
|
|
||
| def poke(self, context: Context) -> bool: | ||
| return self._get_results_table_changes(context=context) |
There was a problem hiding this comment.
+1. Some of these functions look like they could be handy if made generally available as part of a hook.
| defaults to >=. | ||
| """ | ||
|
|
||
| template_fields: Sequence[str] = ("databricks_conn_id", "catalog", "schema", "table_name") |
There was a problem hiding this comment.
IMO it would be useful to have timestamp as a template field too. I could foresee users wanting to use one of the built-in Jinja variables for this to the task is idempotent (like {{ data_interval_start }} for example). Or, be used as a dynamic input from a previous task.
| def _sql_sensor(self, sql): | ||
| hook = self._get_hook() | ||
| sql_result = hook.run( | ||
| sql, | ||
| handler=self.handler if self.do_xcom_push else None, | ||
| ) | ||
| return sql_result | ||
|
|
There was a problem hiding this comment.
| def _sql_sensor(self, sql): | |
| hook = self._get_hook() | |
| sql_result = hook.run( | |
| sql, | |
| handler=self.handler if self.do_xcom_push else None, | |
| ) | |
| return sql_result |
Same idea here. This method exists in DatabricksSqlSensor.
| def poke(self, context: Context) -> bool: | ||
| return self._get_results() |
There was a problem hiding this comment.
Technically could remove this too.
| def get_previous_version(context: Context, lookup_key): | ||
| return context["ti"].xcom_pull(key=lookup_key, include_prior_dates=True) |
There was a problem hiding this comment.
I don't believe there is a guarantee that the most recent XCom will be pulled here. Behind the scenes XCom.get_many() is called and just retrieves the first record. At the mercy of the metadatabase being used there.
Also, what happens in a mapped operator situation? If the XCom key is always the same, it seems possible this can pull an XCom key for an entirely different task since task_ids and map_index is not specified in the xcom_pull() call.
Another question then would be what if the input args are the same (i.e. checking for changes in the same table) but a user simply updates the task_id. Would this sensor yield a false positive that there was indeed a change?
I don't necessarily have answers to these questions on the top of my head, but some things to think about with using XComs in this way.
| ) | ||
|
|
||
|
|
||
| class TestDatabricksPartitionSensor(unittest.TestCase): |
There was a problem hiding this comment.
There is an ongoing effort to move away from unittest in favor of pytest. Since these tests are net-new, could you change this and the other tests in the PR to pytest please?
| connection_id = "databricks_default" | ||
| sql_endpoint_name = "Starter Warehouse" | ||
|
|
||
| # [START howto_sensor_databricks_sql] |
There was a problem hiding this comment.
These START/END markers are used to include code snippets in guides (generally). It would be great if there was accompanying documentation for these new sensors and ones that take advantage of the snippets being outlined in this DAG. There are a lot of examples in operator guides on how this is done throughout the providers.
| partition_columns = self._sql_sensor(f"DESCRIBE DETAIL {table_name}")[0][7] | ||
| self.log.info("table_info: %s", partition_columns) | ||
| if len(partition_columns) < 1: | ||
| raise AirflowException("Table %s does not have partitions", table_name) |
There was a problem hiding this comment.
| raise AirflowException("Table %s does not have partitions", table_name) | |
| raise AirflowException(f"Table {table_name} does not have partitions") |
Otherwise the message logged will not be what you expect.
| # TODO: Check date types. | ||
| else: | ||
| raise AirflowException( | ||
| "Column %s not part of table partitions: %s", partition_col, partition_columns |
There was a problem hiding this comment.
| "Column %s not part of table partitions: %s", partition_col, partition_columns | |
| f"Column {partition_col} not part of table partitions: {partition_columns}" |
Same here.
|
@eladkal @o-nikolas @josh-fell @alexott thanks for taking the time to review my code and provide feedback. I appreciate it. To incorporate the changes for the 3 sensors, I will break these down into 3 separate PRs so that it is easy to manage and test them individually rather than a bulk of changes in a single PR. |
Closes: #21381
Sensors for Databricks SQL to detect table partitions and new table events.