Skip to content

Commit

Permalink
fix: formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
vbhagwat committed Jan 21, 2025
1 parent 79a7222 commit 0c353ba
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
23 changes: 14 additions & 9 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,16 @@ def _write_stream_data_expedia(self, df: StreamTable, to: PushMode):

# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def online_write_with_connector(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
spark: SparkSession,
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[
EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]
]
],
progress: Optional[Callable[[int], Any]],
spark: SparkSession,
) -> None:
"""
Write a batch of features of several entities to the database.
Expand All @@ -303,6 +305,7 @@ def online_write_with_connector(
project = config.project
keyspace: str = self._keyspace
fqtable = f"{project}_{table.name}"

def prepare_rows() -> List[Row]:
"""
Transform data into a list of Spark Row objects for insertion.
Expand Down Expand Up @@ -348,7 +351,9 @@ def prepare_rows() -> List[Row]:
if progress:
progress(1)

def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys, spark_session):
def batch_write_pandas_df(
iterator, spark_serialized_artifacts, join_keys, spark_session
):
for pdf in iterator:
(
feature_view,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel

# Error messages
E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS = (
"Unexpected configuration object (not a CassandraOnlineStoreConfig instance)"
Expand Down Expand Up @@ -326,13 +327,13 @@ def __del__(self):
pass

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
"""
Write a batch of features of several entities to the database.
Expand Down Expand Up @@ -381,6 +382,7 @@ def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]:
# correction for the last missing call to `progress`:
if progress:
progress(1)

def online_read(
self,
config: RepoConfig,
Expand Down

0 comments on commit 0c353ba

Please sign in to comment.