Skip to content

Commit

Permalink
upgrading to the latest fmodel: domain error, orchestrating ES aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jan 18, 2025
1 parent d34b087 commit 5633279
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 65 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ authors = ["Ivan Dugalic <[email protected]>"]


[dependencies]
fmodel-rust = "0.7.0"
# fmodel-rust = "0.7.0"
fmodel-rust = { git = "https://github.com/fraktalio/fmodel-rust.git" }
actix-cors = "0.7.0"
actix-web = "4.9.0"
actix-rt = "2.10.0"
Expand Down
16 changes: 9 additions & 7 deletions src/adapter/repository/event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use uuid::Uuid;

use crate::adapter::database::entity::{EventEntity, NewEventEntity};
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{append_event, list_events};
use crate::adapter::database::queries::{append_event, get_latest_event, list_events};
use crate::domain::api::{Command, Event, Identifier, OrderEvent, RestaurantEvent};
use crate::Database;

Expand Down Expand Up @@ -35,12 +35,9 @@ impl EventRepository<Command, Event, Uuid, ErrorMessage> for AggregateEventRepos
.collect()
}

async fn save(
&self,
events: &[Event],
latest_version: &Option<Uuid>,
) -> Result<Vec<(Event, Uuid)>, ErrorMessage> {
let mut latest_version = latest_version.to_owned();
async fn save(&self, events: &[Event]) -> Result<Vec<(Event, Uuid)>, ErrorMessage> {
//TODO implement this better by going throuh and calculating versions per ID/Stream
let mut latest_version = self.version_provider(events.first().unwrap()).await?;
let mut result = Vec::new();

for event in events {
Expand All @@ -52,6 +49,11 @@ impl EventRepository<Command, Event, Uuid, ErrorMessage> for AggregateEventRepos

Ok(result)
}
async fn version_provider(&self, event: &Event) -> Result<Option<Uuid>, ErrorMessage> {
get_latest_event(&event.identifier(), &self.database)
.await
.map(|event_entity| Some(event_entity.event_id))
}
}

/// Map the EventEntity into the domain events
Expand Down
16 changes: 9 additions & 7 deletions src/adapter/repository/order_event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use uuid::Uuid;

use crate::adapter::database::entity::{EventEntity, NewEventEntity};
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{append_event, list_events};
use crate::adapter::database::queries::{append_event, get_latest_event, list_events};
use crate::domain::api::{Identifier, OrderCommand, OrderEvent};
use crate::Database;

Expand Down Expand Up @@ -38,12 +38,9 @@ impl EventRepository<OrderCommand, OrderEvent, Uuid, ErrorMessage> for OrderEven
.collect()
}

async fn save(
&self,
events: &[OrderEvent],
latest_version: &Option<Uuid>,
) -> Result<Vec<(OrderEvent, Uuid)>, ErrorMessage> {
let mut latest_version = latest_version.to_owned();
async fn save(&self, events: &[OrderEvent]) -> Result<Vec<(OrderEvent, Uuid)>, ErrorMessage> {
//TODO implement this better by going throuh and calculating versions per ID/Stream
let mut latest_version = self.version_provider(events.first().unwrap()).await?;
let mut result = Vec::new();

for event in events {
Expand All @@ -55,6 +52,11 @@ impl EventRepository<OrderCommand, OrderEvent, Uuid, ErrorMessage> for OrderEven

Ok(result)
}
async fn version_provider(&self, event: &OrderEvent) -> Result<Option<Uuid>, ErrorMessage> {
get_latest_event(&event.identifier(), &self.database)
.await
.map(|event_entity| Some(event_entity.event_id))
}
}

/// Map the EventEntity into the Order domain events
Expand Down
14 changes: 11 additions & 3 deletions src/adapter/repository/restaurant_event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use uuid::Uuid;

use crate::adapter::database::entity::{EventEntity, NewEventEntity};
use crate::adapter::database::error::ErrorMessage;
use crate::adapter::database::queries::{append_event, list_events};
use crate::adapter::database::queries::{append_event, get_latest_event, list_events};
use crate::domain::api::{Identifier, RestaurantCommand, RestaurantEvent};
use crate::Database;

Expand Down Expand Up @@ -43,9 +43,9 @@ impl EventRepository<RestaurantCommand, RestaurantEvent, Uuid, ErrorMessage>
async fn save(
&self,
events: &[RestaurantEvent],
latest_version: &Option<Uuid>,
) -> Result<Vec<(RestaurantEvent, Uuid)>, ErrorMessage> {
let mut latest_version = latest_version.to_owned();
//TODO implement this better by going throuh and calculating versions per ID/Stream
let mut latest_version = self.version_provider(events.first().unwrap()).await?;
let mut result = Vec::new();

for event in events {
Expand All @@ -57,6 +57,14 @@ impl EventRepository<RestaurantCommand, RestaurantEvent, Uuid, ErrorMessage>

Ok(result)
}
async fn version_provider(
&self,
event: &RestaurantEvent,
) -> Result<Option<Uuid>, ErrorMessage> {
get_latest_event(&event.identifier(), &self.database)
.await
.map(|event_entity| Some(event_entity.event_id))
}
}

/// Map the EventEntity into the Restaurant domain events
Expand Down
4 changes: 2 additions & 2 deletions src/application/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub type OrderAggregate<'a, R> = EventSourcedAggregate<
Option<Order>,
OrderEvent,
R,
Decider<'a, OrderCommand, Option<Order>, OrderEvent>,
Decider<'a, OrderCommand, Option<Order>, OrderEvent, ErrorMessage>,
Uuid,
ErrorMessage,
>;
Expand All @@ -31,7 +31,7 @@ pub type RestaurantAggregate<'a, R> = EventSourcedAggregate<
Option<Restaurant>,
RestaurantEvent,
R,
Decider<'a, RestaurantCommand, Option<Restaurant>, RestaurantEvent>,
Decider<'a, RestaurantCommand, Option<Restaurant>, RestaurantEvent, ErrorMessage>,
Uuid,
ErrorMessage,
>;
Expand Down
32 changes: 16 additions & 16 deletions src/domain/order_decider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,35 @@ pub fn order_decider<'a>() -> OrderDecider<'a> {
decide: Box::new(|command, state| match command {
OrderCommand::Create(command) => {
if state.is_some() {
vec![OrderEvent::NotCreated(OrderNotCreated {
Ok(vec![OrderEvent::NotCreated(OrderNotCreated {
identifier: command.identifier.to_owned(),
restaurant_identifier: command.restaurant_identifier.to_owned(),
line_items: command.line_items.to_owned(),
reason: Reason("Order already exists".to_string()),
})]
})])
} else {
vec![OrderEvent::Created(OrderCreated {
Ok(vec![OrderEvent::Created(OrderCreated {
identifier: command.identifier.to_owned(),
restaurant_identifier: command.restaurant_identifier.to_owned(),
status: OrderStatus::Created,
line_items: command.line_items.to_owned(),
})]
})])
}
}
OrderCommand::MarkAsPrepared(command) => {
if state
.clone()
.is_some_and(|s| OrderStatus::Created == s.status)
{
vec![OrderEvent::Prepared(OrderPrepared {
Ok(vec![OrderEvent::Prepared(OrderPrepared {
identifier: command.identifier.to_owned(),
status: OrderStatus::Prepared,
})]
})])
} else {
vec![OrderEvent::NotPrepared(OrderNotPrepared {
Ok(vec![OrderEvent::NotPrepared(OrderNotPrepared {
identifier: command.identifier.to_owned(),
reason: Reason("Order in the wrong status previously".to_string()),
})]
})])
}
}
}),
Expand Down Expand Up @@ -125,23 +125,23 @@ mod order_decider_tests {
let new_events = decider.compute_new_events(&[], &create_order_command);
assert_eq!(
new_events,
[OrderEvent::Created(OrderCreated {
Ok(vec![OrderEvent::Created(OrderCreated {
identifier: identifier.clone(),
restaurant_identifier: restaurant_identifier.clone(),
status: OrderStatus::Created,
line_items: line_items.clone(),
})]
})])
);
// ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE
let new_state = decider.compute_new_state(None, &create_order_command);
assert_eq!(
new_state,
Some(Order {
Ok(Some(Order {
identifier: identifier.clone(),
restaurant_identifier: restaurant_identifier.clone(),
status: OrderStatus::Created,
line_items: line_items.clone(),
})
}))
);

// The command to create an order - MarkOrderAsPrepared
Expand All @@ -159,10 +159,10 @@ mod order_decider_tests {
let new_events = decider.compute_new_events(&old_events, &mark_order_as_prepared);
assert_eq!(
new_events,
[OrderEvent::Prepared(OrderPrepared {
Ok(vec![OrderEvent::Prepared(OrderPrepared {
identifier: identifier.clone(),
status: OrderStatus::Prepared,
})]
})])
);

// ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE
Expand All @@ -175,12 +175,12 @@ mod order_decider_tests {
let new_state = decider.compute_new_state(Some(old_state), &mark_order_as_prepared);
assert_eq!(
new_state,
Some(Order {
Ok(Some(Order {
identifier: identifier.clone(),
restaurant_identifier: restaurant_identifier.clone(),
status: OrderStatus::Prepared,
line_items: line_items.clone(),
})
}))
);
}
}
48 changes: 25 additions & 23 deletions src/domain/restaurant_decider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,50 @@ pub fn restaurant_decider<'a>() -> RestaurantDecider<'a> {
decide: Box::new(|command, state| match command {
RestaurantCommand::CreateRestaurant(command) => {
if state.is_some() {
vec![RestaurantEvent::NotCreated(RestaurantNotCreated {
Ok(vec![RestaurantEvent::NotCreated(RestaurantNotCreated {
identifier: command.identifier.to_owned(),
name: command.name.to_owned(),
menu: command.menu.to_owned(),
reason: Reason("Restaurant already exists".to_string()),
})]
})])
} else {
vec![RestaurantEvent::Created(RestaurantCreated {
Ok(vec![RestaurantEvent::Created(RestaurantCreated {
identifier: command.identifier.to_owned(),
name: command.name.to_owned(),
menu: command.menu.to_owned(),
})]
})])
}
}
RestaurantCommand::ChangeMenu(command) => {
if state.is_some() {
vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged {
Ok(vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged {
identifier: command.identifier.to_owned(),
menu: command.menu.to_owned(),
})]
})])
} else {
vec![RestaurantEvent::MenuNotChanged(RestaurantMenuNotChanged {
identifier: command.identifier.to_owned(),
menu: command.menu.to_owned(),
reason: Reason("Restaurant does not exist".to_string()),
})]
Ok(vec![RestaurantEvent::MenuNotChanged(
RestaurantMenuNotChanged {
identifier: command.identifier.to_owned(),
menu: command.menu.to_owned(),
reason: Reason("Restaurant does not exist".to_string()),
},
)])
}
}
RestaurantCommand::PlaceOrder(command) => {
if state.is_some() {
vec![RestaurantEvent::OrderPlaced(OrderPlaced {
Ok(vec![RestaurantEvent::OrderPlaced(OrderPlaced {
identifier: command.identifier.to_owned(),
order_identifier: command.order_identifier.to_owned(),
line_items: command.line_items.to_owned(),
})]
})])
} else {
vec![RestaurantEvent::OrderNotPlaced(OrderNotPlaced {
Ok(vec![RestaurantEvent::OrderNotPlaced(OrderNotPlaced {
identifier: command.identifier.to_owned(),
order_identifier: command.order_identifier.to_owned(),
line_items: command.line_items.to_owned(),
reason: Reason("Restaurant does not exist".to_string()),
})]
})])
}
}
}),
Expand Down Expand Up @@ -148,29 +150,29 @@ mod restaurant_decider_tests {
let new_events = decider.compute_new_events(&[], &create_restaurant_command);
assert_eq!(
new_events,
[RestaurantEvent::Created(RestaurantCreated {
Ok(vec![RestaurantEvent::Created(RestaurantCreated {
identifier: restaurant_identifier.clone(),
name: RestaurantName("Restaurant 1".to_string()),
menu: RestaurantMenu {
menu_id: menu_id.clone(),
items: menu_items.clone(),
cuisine: RestaurantMenuCuisine::Vietnamese,
},
})]
})])
);
// ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE
let new_state = decider.compute_new_state(None, &create_restaurant_command);
assert_eq!(
new_state,
Some(Restaurant {
Ok(Some(Restaurant {
identifier: restaurant_identifier.clone(),
name: RestaurantName("Restaurant 1".to_string()),
menu: RestaurantMenu {
menu_id: menu_id.clone(),
items: menu_items.clone(),
cuisine: RestaurantMenuCuisine::Vietnamese,
},
})
}))
);

// The command to create an order - MarkOrderAsPrepared
Expand All @@ -196,14 +198,14 @@ mod restaurant_decider_tests {
let new_events = decider.compute_new_events(&old_events, &change_restaurant_menu);
assert_eq!(
new_events,
[RestaurantEvent::MenuChanged(RestaurantMenuChanged {
Ok(vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged {
identifier: restaurant_identifier.clone(),
menu: RestaurantMenu {
menu_id: menu_id.clone(),
items: menu_items.clone(),
cuisine: RestaurantMenuCuisine::Japanese,
},
})]
})])
);

// ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE
Expand All @@ -219,15 +221,15 @@ mod restaurant_decider_tests {
let new_state = decider.compute_new_state(Some(old_state), &change_restaurant_menu);
assert_eq!(
new_state,
Some(Restaurant {
Ok(Some(Restaurant {
identifier: restaurant_identifier.clone(),
name: RestaurantName("Restaurant 1".to_string()),
menu: RestaurantMenu {
menu_id: menu_id.clone(),
items: menu_items.clone(),
cuisine: RestaurantMenuCuisine::Japanese,
},
})
}))
);
}
}
Loading

0 comments on commit 5633279

Please sign in to comment.