Skip to content

Commit

Permalink
Add update_job_instance client method (#67)
Browse files Browse the repository at this point in the history
* add update_job_instance client method

* update docstring

* explicitly register callback

* update changelog

* Bump version: 0.0.14 → 0.0.15

* fix changelog
  • Loading branch information
rohitsanj authored Nov 19, 2022
1 parent 2938dbb commit 781fa83
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.14
current_version = 0.0.15
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
serialize =
{major}.{minor}.{patch}
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.0.15] - 2022-11-18
### Added
- Added `update_job_instance` client method

### Changed
- Refactor create_parameterized_notebook to include job instance attempt information

## [0.0.13] - 2022-11-03
### Added
- Use `backend_path` config value when making API requests to Noteable
Expand Down
2 changes: 1 addition & 1 deletion origami/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.14"
version = "0.0.15"
47 changes: 43 additions & 4 deletions origami/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import functools
import os
import uuid
from asyncio import Future
from collections import defaultdict
from datetime import datetime
Expand Down Expand Up @@ -30,6 +31,8 @@
CustomerJobInstanceReference,
CustomerJobInstanceReferenceInput,
JobInstanceAttempt,
JobInstanceAttemptRequest,
JobInstanceAttemptUpdate,
)
from .defs.kernels import SessionRequestDetails
from .defs.rtu import (
Expand Down Expand Up @@ -337,7 +340,7 @@ async def delete_kernel_session(self, file: Union[UUID, NotebookFile], timeout:
async def create_parameterized_notebook(
self,
notebook_id: UUID,
job_instance_attempt: JobInstanceAttempt = None,
job_instance_attempt: JobInstanceAttemptRequest = None,
timeout: float = None,
) -> CreateParameterizedNotebookResponse:
"""
Expand All @@ -362,7 +365,11 @@ async def create_parameterized_notebook(
parameterized_notebook.content = httpx.get(
parameterized_notebook.presigned_download_url
).content.decode("utf-8")
job_instance_attempt = JobInstanceAttempt.parse_obj(resp_data['job_instance_attempt'])
job_instance_attempt = (
JobInstanceAttempt.parse_obj(resp_data['job_instance_attempt'])
if resp_data['job_instance_attempt']
else None
)

return CreateParameterizedNotebookResponse(
parameterized_notebook=parameterized_notebook, job_instance_attempt=job_instance_attempt
Expand All @@ -385,6 +392,22 @@ async def create_job_instance(
resp.raise_for_status()
return CustomerJobInstanceReference.parse_obj(resp.json())

@_default_timeout_arg
async def update_job_instance(
self,
job_instance_attempt_id: uuid.UUID,
job_instance_attempt_update: JobInstanceAttemptUpdate,
timeout: float = None,
) -> JobInstanceAttempt:
"""Update the status of a job instance attempt by id"""
resp = await self.patch(
f"{self.api_server_uri}/v1/job-instance-attempts/{job_instance_attempt_id}",
content=job_instance_attempt_update.json(exclude_unset=True),
timeout=timeout,
)
resp.raise_for_status()
return JobInstanceAttempt.parse_obj(resp.json())

@property
def in_context(self):
"""Indicates if the client is within an async context generation loop or not."""
Expand Down Expand Up @@ -784,7 +807,18 @@ async def check_success(resp: GenericRTUReplySchema[TopicActionReplyData]):
cell_id,
properties=metadata_update_properties,
)
tracker = FileDeltaReply.register_callback(self, req, check_success)
# Explicitly register the message callback here against the message_type of new_delta_reply
# since using FileDeltaReply.register_callback will register against the transaction_id
# which fails because there is a `new_delta_event` message type that is sent before the
# `new_delta_reply` message type from the same transaction_id.
# This breaks the schema validation because the `new_delta_event` message type does not
# have a `data.success` field.
tracker = self.register_message_callback(
check_success,
channel=self.files_channel(file.id),
message_type="new_delta_reply",
response_schema=FileDeltaReply,
)
await self.send_rtu_request(req)
return await asyncio.wait_for(tracker.next_trigger, timeout)

Expand All @@ -806,7 +840,12 @@ async def check_success(resp: GenericRTUReplySchema[TopicActionReplyData]):
FileDeltaAction.update,
properties=metadata_update_properties,
)
tracker = FileDeltaReply.register_callback(self, req, check_success)
tracker = self.register_message_callback(
check_success,
channel=self.files_channel(file.id),
message_type="new_delta_reply",
response_schema=FileDeltaReply,
)
await self.send_rtu_request(req)
return await asyncio.wait_for(tracker.next_trigger, timeout)

Expand Down
21 changes: 19 additions & 2 deletions origami/defs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __str__(self):
return self.value


class JobInstanceAttempt(BaseModel):
class JobInstanceAttemptRequest(BaseModel):
"""
Represents an attempt to execute a job.
Expand Down Expand Up @@ -162,7 +162,20 @@ class CreateParameterizedNotebookRequest(BaseModel):
"""

notebook_version_id: Optional[uuid.UUID]
job_instance_attempt: Optional[JobInstanceAttempt]
job_instance_attempt: Optional[JobInstanceAttemptRequest]


class JobInstanceAttempt(NoteableAPIModel):
"""Represents a job instance attempt returned by the Noteable API."""

noteable_job_instance_id: Optional[uuid.UUID]
customer_job_instance_reference_id: Optional[uuid.UUID]
status: JobInstanceAttemptStatus
attempt_number: int
parameterized_notebook_id: uuid.UUID

class Config:
orm_mode = True


class CreateParameterizedNotebookResponse(BaseModel):
Expand All @@ -176,3 +189,7 @@ class CreateParameterizedNotebookResponse(BaseModel):
job_instance_attempt: Optional[JobInstanceAttempt] = Field(
description="The job instance attempt associated with the parameterized notebook."
)


class JobInstanceAttemptUpdate(BaseModel):
status: JobInstanceAttemptStatus = Field(description="The status of the job instance attempt.")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[tool.poetry]
name = "noteable-origami"
version = "0.0.14"
version = "0.0.15"
description = "The Noteable API interface"
authors = ["Matt Seal <[email protected]>"]
maintainers = ["Matt Seal <[email protected]>"]
Expand Down

0 comments on commit 781fa83

Please sign in to comment.