Skip to content

Commit

Permalink
using single/general event repository for all aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jan 19, 2025
1 parent 5633279 commit 8ad35dc
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 460 deletions.
4 changes: 2 additions & 2 deletions src/adapter/database/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ pub async fn list_events(
pub async fn get_latest_event(
decider_id: &String,
app: &Database,
) -> Result<EventEntity, ErrorMessage> {
) -> Result<Option<EventEntity>, ErrorMessage> {
sqlx::query_as!(
EventEntity,
"SELECT * FROM events WHERE decider_id = $1 ORDER BY events.offset DESC LIMIT 1",
decider_id
)
.fetch_one(&app.db)
.fetch_optional(&app.db)
.await
.map_err(|e| ErrorMessage {
message: e.to_string(),
Expand Down
37 changes: 16 additions & 21 deletions src/adapter/event_stream/saga_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{ack_event, nack_event, stream_events};
use crate::adapter::publisher::order_action_publisher::OrderActionPublisher;
use crate::adapter::repository::restaurant_event_repository::ToRestaurantEvent;
use crate::adapter::repository::event_repository::ToEvent;
use crate::application::api::OrderSagaManager;
use crate::Database;
use log::{debug, error, warn};
Expand All @@ -19,30 +19,25 @@ pub async fn stream_events_to_saga(
Ok(Some(event_entity)) => {
debug!("Processing Event in Saga: {:?}", event_entity);
match event_entity.decider.as_str() {
"Restaurant" => {
match order_saga_manager
.handle(&event_entity.to_restaurant_event()?)
"Restaurant" => match order_saga_manager.handle(&event_entity.to_event()?).await {
Ok(_) => {
debug!("Order Saga executed successfully");
ack_event(
&event_entity.offset,
&"saga".to_string(),
&event_entity.decider_id,
db,
)
.await
{
Ok(_) => {
debug!("Order Saga executed successfully");
ack_event(
&event_entity.offset,
&"saga".to_string(),
&event_entity.decider_id,
db,
)
.map(drop)
}
Err(error) => {
error!("Order Saga failed: {}", error.message);
nack_event(&"saga".to_string(), &event_entity.decider_id, db)
.await
.map(drop)
}
Err(error) => {
error!("Order Saga failed: {}", error.message);
nack_event(&"saga".to_string(), &event_entity.decider_id, db)
.await
.map(drop)
}
}
}
},
_ => {
warn!("Unknown event type: {}", event_entity.event);
ack_event(
Expand Down
7 changes: 3 additions & 4 deletions src/adapter/event_stream/view_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::sync::Arc;

use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{ack_event, nack_event, stream_events};
use crate::adapter::repository::order_event_repository::ToOrderEvent;
use crate::adapter::repository::event_repository::ToEvent;
use crate::adapter::repository::order_view_state_repository::OrderViewStateRepository;
use crate::adapter::repository::restaurant_event_repository::ToRestaurantEvent;
use crate::adapter::repository::restaurant_view_state_repository::RestaurantViewStateRepository;
use crate::application::api::{OrderMaterializedView, RestaurantMaterializedView};
use crate::Database;
Expand All @@ -25,7 +24,7 @@ pub async fn stream_events_to_view(
match event_entity.decider.as_str() {
"Restaurant" => {
match restaurant_materialized_view
.handle(&event_entity.to_restaurant_event()?)
.handle(&event_entity.to_event()?)
.await
{
Ok(_) => {
Expand All @@ -52,7 +51,7 @@ pub async fn stream_events_to_view(
}
"Order" => {
match order_materialized_view
.handle(&event_entity.to_order_event()?)
.handle(&event_entity.to_event()?)
.await
{
Ok(_) => {
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/publisher/order_action_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::repository::order_event_repository::OrderEventRepository;
use crate::adapter::repository::event_repository::AggregateEventRepository;
use crate::application::api::OrderAggregate;
use crate::domain::api::OrderCommand;
use fmodel_rust::saga_manager::ActionPublisher;
use std::sync::Arc;

/// Order action publisher - used by the Saga Manager to publish actions/commands
pub struct OrderActionPublisher<'a> {
pub order_aggregate: Arc<OrderAggregate<'a, OrderEventRepository>>,
pub order_aggregate: Arc<OrderAggregate<'a, AggregateEventRepository>>,
}

/// Fmodel action publisher implementation fot the OrderActionPublisher
Expand Down
173 changes: 49 additions & 124 deletions src/adapter/repository/event_repository.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
use crate::adapter::repository::ToNewEventEntity;
use fmodel_rust::aggregate::EventRepository;
use log::{info, warn};
use uuid::Uuid;

use crate::adapter::database::entity::{EventEntity, NewEventEntity};
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{append_event, get_latest_event, list_events};
use crate::domain::api::{Command, Event, Identifier, OrderEvent, RestaurantEvent};
use crate::Database;

use crate::domain::api::{DeciderName, EventName, Identifier};
use crate::{adapter, Database};
/// EventRepository struct
pub struct AggregateEventRepository {
database: Database,
}

// General Event repository
impl AggregateEventRepository {
/// Create a new EventRepository
pub fn new(database: Database) -> Self {
AggregateEventRepository { database }
}
}

/// EventRepository - implementation of Fmodel EventRepository for Command, Event, Uuid, ErrorMessage
impl EventRepository<Command, Event, Uuid, ErrorMessage> for AggregateEventRepository {
async fn fetch_events(&self, command: &Command) -> Result<Vec<(Event, Uuid)>, ErrorMessage> {
/// EventRepository - implementation of Fmodel EventRepository for C, E, Uuid, ErrorMessage, where C and E are constrained with specific traits
impl<C, E> EventRepository<C, E, Uuid, ErrorMessage> for AggregateEventRepository
where
C: Identifier + Sync,
E: Identifier
+ std::fmt::Debug
+ Sync
+ Send
+ serde::de::DeserializeOwned
+ Clone
+ ToEventEntity,
{
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Uuid)>, ErrorMessage> {
// https://doc.rust-lang.org/rust-by-example/error/iter_result.html#fail-the-entire-operation-with-collect
list_events(&command.identifier(), &self.database)
.await?
Expand All @@ -35,153 +45,68 @@ impl EventRepository<Command, Event, Uuid, ErrorMessage> for AggregateEventRepos
.collect()
}

async fn save(&self, events: &[Event]) -> Result<Vec<(Event, Uuid)>, ErrorMessage> {
//TODO implement this better by going throuh and calculating versions per ID/Stream
let mut latest_version = self.version_provider(events.first().unwrap()).await?;
async fn save(&self, events: &[E]) -> Result<Vec<(E, Uuid)>, ErrorMessage> {
let first_event: &E = events.first().unwrap();
let mut latest_version: Option<Uuid> = <adapter::repository::event_repository::AggregateEventRepository as fmodel_rust::aggregate::EventRepository<C, E, uuid::Uuid, adapter::database::error::ErrorMessage>>::version_provider(self,first_event).await?;
//let mut latest_version: Option<Uuid> = self.version_provider(first_event).await?;
let mut result = Vec::new();

for event in events {
let event_request = event.to_new_event_entity(latest_version)?;
let event_request = event.to_event_entity(latest_version)?;
append_event(&event_request, &self.database).await?;
latest_version = Some(event_request.event_id);
result.push(((*event).to_owned(), event_request.event_id));
}

Ok(result)
}
async fn version_provider(&self, event: &Event) -> Result<Option<Uuid>, ErrorMessage> {
async fn version_provider(&self, event: &E) -> Result<Option<Uuid>, ErrorMessage> {
get_latest_event(&event.identifier(), &self.database)
.await
.map(|event_entity| Some(event_entity.event_id))
.map(|event_entity| event_entity.map(|e| e.event_id))
}
}

/// Map the EventEntity into the domain events
pub trait ToEvent {
fn to_event(&self) -> Result<Event, ErrorMessage>;
pub trait ToEvent<E> {
fn to_event(&self) -> Result<E, ErrorMessage>;
}

/// Map the EventEntity into the domain events
impl ToEvent for EventEntity {
fn to_event(&self) -> Result<Event, ErrorMessage> {
impl<E> ToEvent<E> for EventEntity
where
E: serde::de::DeserializeOwned,
{
fn to_event(&self) -> Result<E, ErrorMessage> {
let value = self.data.clone();
serde_json::from_value(value).map_err(|err| ErrorMessage {
message: err.to_string(),
})
}
}

trait ToEventEntity {
fn to_event_entity(&self, version: Option<Uuid>) -> Result<NewEventEntity, ErrorMessage>;
}
/// Map from domain events of type OrderEvent to EventEntity
impl ToNewEventEntity for Event {
fn to_new_event_entity(&self, version: Option<Uuid>) -> Result<NewEventEntity, ErrorMessage> {
impl<E> ToEventEntity for E
where
E: Identifier + EventName + DeciderName + serde::ser::Serialize,
{
fn to_event_entity(&self, version: Option<Uuid>) -> Result<NewEventEntity, ErrorMessage> {
let data = serde_json::to_value(self).map_err(|err| ErrorMessage {
message: err.to_string(),
})?;
Ok(match self {
Event::Second(event) => match event {
OrderEvent::Created(event) => NewEventEntity {
event: "OrderCreated".to_string(),
event_id: Uuid::new_v4(),
decider: "Order".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
OrderEvent::Prepared(event) => NewEventEntity {
event: "OrderPrepared".to_string(),
event_id: Uuid::new_v4(),
decider: "Order".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
OrderEvent::NotCreated(event) => NewEventEntity {
event: "OrderNotCreated".to_string(),
event_id: Uuid::new_v4(),
decider: "Order".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
OrderEvent::NotPrepared(event) => NewEventEntity {
event: "OrderNotPrepared".to_string(),
event_id: Uuid::new_v4(),
decider: "Order".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
},
Event::First(event) => match event {
RestaurantEvent::Created(event) => NewEventEntity {
event: "RestaurantCreated".to_string(),
event_id: Uuid::new_v4(),
decider: "Restaurant".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
RestaurantEvent::NotCreated(event) => NewEventEntity {
event: "RestaurantNotCreated".to_string(),
event_id: Uuid::new_v4(),
decider: "Restaurant".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
RestaurantEvent::MenuChanged(event) => NewEventEntity {
event: "RestaurantMenuChanged".to_string(),
event_id: Uuid::new_v4(),
decider: "Restaurant".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
RestaurantEvent::MenuNotChanged(event) => NewEventEntity {
event: "RestaurantMenuNotChanged".to_string(),
event_id: Uuid::new_v4(),
decider: "Restaurant".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
RestaurantEvent::OrderPlaced(event) => NewEventEntity {
event: "RestaurantOrderPlaced".to_string(),
event_id: Uuid::new_v4(),
decider: "Restaurant".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
RestaurantEvent::OrderNotPlaced(event) => NewEventEntity {
event: "RestaurantOrderNotPlaced".to_string(),
event_id: Uuid::new_v4(),
decider: "Restaurant".to_string(),
decider_id: event.identifier.0.to_string(),
data,
command_id: None,
previous_id: version,
r#final: false,
},
},

Ok(NewEventEntity {
event: self.event_name(),
event_id: Uuid::new_v4(),
decider: self.decider_name(),
decider_id: self.identifier(),
data,
command_id: None,
previous_id: version,
r#final: false,
})
}
}
11 changes: 0 additions & 11 deletions src/adapter/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
use crate::adapter::database::entity::NewEventEntity;
use crate::adapter::database::error::ErrorMessage;
use uuid::Uuid;

pub mod event_repository;
pub mod order_event_repository;
pub mod order_view_state_repository;
pub mod restaurant_event_repository;
pub mod restaurant_view_state_repository;

/// Map the domain events into the EventEntity
trait ToNewEventEntity {
fn to_new_event_entity(&self, version: Option<Uuid>) -> Result<NewEventEntity, ErrorMessage>;
}
Loading

0 comments on commit 8ad35dc

Please sign in to comment.