Skip to content

Commit

Permalink
Fix: code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Ohashiro committed Jan 9, 2025
1 parent 07e57a4 commit 6e3ed2c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,19 @@ def execute(self, context: Context):
)

def get_refresh_status(self, context: Context, event: dict[str, str] | None = None):
"""Push the refresh Id to XCom then runs the Triggers to wait for refresh completion."""
"""Push the refresh Id to XCom then runs the Trigger to wait for refresh completion."""
if event:
if event["status"] == "error":
raise AirflowException(event["message"])

dataset_refresh_id = event["dataset_refresh_id"]

if dataset_refresh_id:
self.xcom_push(
context=context,
key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id",
value=event["dataset_refresh_id"],
value=dataset_refresh_id,
)

dataset_refresh_id = self.xcom_pull(
context=context, key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id"
)
if dataset_refresh_id:
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PowerBITrigger(BaseTrigger):
You can pass an enum named APIVersion which has 2 possible members v1 and beta,
or you can pass a string as `v1.0` or `beta`.
:param dataset_id: The dataset Id to refresh.
:param dataset_refresh_id: The dataset refresh Id
:param dataset_refresh_id: The dataset refresh Id to poll for the status, if not provided a new refresh will be triggered.
:param group_id: The workspace Id where dataset is located.
:param end_time: Time in seconds when trigger should stop polling.
:param check_interval: Time in seconds to wait between each poll.
Expand Down Expand Up @@ -109,12 +109,14 @@ def api_version(self) -> APIVersion | str:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to the PowerBI and polls for the dataset refresh status."""
if not self.dataset_refresh_id:
# Trigger the dataset refresh
dataset_refresh_id = await self.hook.trigger_dataset_refresh(
dataset_id=self.dataset_id,
group_id=self.group_id,
)
self.log.info("Triggered dataset refresh %s", dataset_refresh_id)

if dataset_refresh_id:
self.log.info("Triggered dataset refresh %s", dataset_refresh_id)
yield TriggerEvent(
{
"status": "success",
Expand All @@ -135,6 +137,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
)
return

# The dataset refresh is already triggered. Poll for the dataset refresh status.
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(min=5, multiplier=2),
Expand Down
5 changes: 2 additions & 3 deletions providers/tests/microsoft/azure/operators/test_powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def test_execute_wait_for_termination_with_deferrable(self, connection):
operator.execute(context)

assert isinstance(exc.value.trigger, PowerBITrigger)
assert exc.value.trigger.dataset_refresh_id is None

@mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection)
def test_powerbi_operator_async_get_refresh_status_success(self, connection):
Expand All @@ -104,7 +105,6 @@ def test_powerbi_operator_async_get_refresh_status_success(self, connection):
)
context = {"ti": MagicMock()}
context["ti"].task_id = TASK_ID
context["ti"].xcom_pull = MagicMock(return_value=NEW_REFRESH_REQUEST_ID)

with pytest.raises(TaskDeferred) as exc:
operator.get_refresh_status(
Expand All @@ -113,9 +113,8 @@ def test_powerbi_operator_async_get_refresh_status_success(self, connection):
)

assert isinstance(exc.value.trigger, PowerBITrigger)

assert exc.value.trigger.dataset_refresh_id is NEW_REFRESH_REQUEST_ID
assert context["ti"].xcom_push.call_count == 1
assert context["ti"].xcom_pull.call_count == 1

def test_powerbi_operator_async_execute_complete_success(self):
"""Assert that execute_complete log success message"""
Expand Down

0 comments on commit 6e3ed2c

Please sign in to comment.