From 56332799866a7aa35619b04d222741bf7b4b8c55 Mon Sep 17 00:00:00 2001 From: Ivan Dugalic Date: Sat, 18 Jan 2025 18:57:58 +0100 Subject: [PATCH] upgrading to the latest fmodel: domain error, orchestrating ES aggregate --- Cargo.lock | 5 +- Cargo.toml | 3 +- src/adapter/repository/event_repository.rs | 16 ++++--- .../repository/order_event_repository.rs | 16 ++++--- .../repository/restaurant_event_repository.rs | 14 ++++-- src/application/api.rs | 4 +- src/domain/order_decider.rs | 32 ++++++------- src/domain/restaurant_decider.rs | 48 ++++++++++--------- src/main.rs | 21 ++++++-- 9 files changed, 94 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd09abb..6a3a57f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,9 +741,8 @@ dependencies = [ [[package]] name = "fmodel-rust" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d283d663418830a965e432be915b0a106f1282215e4aeeb677d52b4ceacabb7f" +version = "0.7.1" +source = "git+https://github.com/fraktalio/fmodel-rust.git#33613d740739ad64b1a2bc40447be09d59e510da" dependencies = [ "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 91ec3aa..5e6c199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ authors = ["Ivan Dugalic "] [dependencies] -fmodel-rust = "0.7.0" +# fmodel-rust = "0.7.0" +fmodel-rust = { git = "https://github.com/fraktalio/fmodel-rust.git" } actix-cors = "0.7.0" actix-web = "4.9.0" actix-rt = "2.10.0" diff --git a/src/adapter/repository/event_repository.rs b/src/adapter/repository/event_repository.rs index 887e1c1..3b3e5d2 100644 --- a/src/adapter/repository/event_repository.rs +++ b/src/adapter/repository/event_repository.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use crate::adapter::database::entity::{EventEntity, NewEventEntity}; use crate::adapter::database::error::ErrorMessage; -use crate::adapter::database::queries::{append_event, list_events}; +use crate::adapter::database::queries::{append_event, get_latest_event, list_events}; use crate::domain::api::{Command, Event, Identifier, OrderEvent, RestaurantEvent}; use crate::Database; @@ -35,12 +35,9 @@ impl EventRepository for AggregateEventRepos .collect() } - async fn save( - &self, - events: &[Event], - latest_version: &Option, - ) -> Result, ErrorMessage> { - let mut latest_version = latest_version.to_owned(); + async fn save(&self, events: &[Event]) -> Result, ErrorMessage> { + //TODO implement this better by going throuh and calculating versions per ID/Stream + let mut latest_version = self.version_provider(events.first().unwrap()).await?; let mut result = Vec::new(); for event in events { @@ -52,6 +49,11 @@ impl EventRepository for AggregateEventRepos Ok(result) } + async fn version_provider(&self, event: &Event) -> Result, ErrorMessage> { + get_latest_event(&event.identifier(), &self.database) + .await + .map(|event_entity| Some(event_entity.event_id)) + } } /// Map the EventEntity into the domain events diff --git a/src/adapter/repository/order_event_repository.rs b/src/adapter/repository/order_event_repository.rs index 9e44108..a5b00e7 100644 --- a/src/adapter/repository/order_event_repository.rs +++ b/src/adapter/repository/order_event_repository.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use crate::adapter::database::entity::{EventEntity, NewEventEntity}; use crate::adapter::database::error::ErrorMessage; -use crate::adapter::database::queries::{append_event, list_events}; +use crate::adapter::database::queries::{append_event, get_latest_event, list_events}; use crate::domain::api::{Identifier, OrderCommand, OrderEvent}; use crate::Database; @@ -38,12 +38,9 @@ impl EventRepository for OrderEven .collect() } - async fn save( - &self, - events: &[OrderEvent], - latest_version: &Option, - ) -> Result, ErrorMessage> { - let mut latest_version = latest_version.to_owned(); + async fn save(&self, events: &[OrderEvent]) -> Result, ErrorMessage> { + //TODO implement this better by going throuh and calculating versions per ID/Stream + let mut latest_version = self.version_provider(events.first().unwrap()).await?; let mut result = Vec::new(); for event in events { @@ -55,6 +52,11 @@ impl EventRepository for OrderEven Ok(result) } + async fn version_provider(&self, event: &OrderEvent) -> Result, ErrorMessage> { + get_latest_event(&event.identifier(), &self.database) + .await + .map(|event_entity| Some(event_entity.event_id)) + } } /// Map the EventEntity into the Order domain events diff --git a/src/adapter/repository/restaurant_event_repository.rs b/src/adapter/repository/restaurant_event_repository.rs index 16d1ffa..504ccd4 100644 --- a/src/adapter/repository/restaurant_event_repository.rs +++ b/src/adapter/repository/restaurant_event_repository.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use crate::adapter::database::entity::{EventEntity, NewEventEntity}; use crate::adapter::database::error::ErrorMessage; -use crate::adapter::database::queries::{append_event, list_events}; +use crate::adapter::database::queries::{append_event, get_latest_event, list_events}; use crate::domain::api::{Identifier, RestaurantCommand, RestaurantEvent}; use crate::Database; @@ -43,9 +43,9 @@ impl EventRepository async fn save( &self, events: &[RestaurantEvent], - latest_version: &Option, ) -> Result, ErrorMessage> { - let mut latest_version = latest_version.to_owned(); + //TODO implement this better by going throuh and calculating versions per ID/Stream + let mut latest_version = self.version_provider(events.first().unwrap()).await?; let mut result = Vec::new(); for event in events { @@ -57,6 +57,14 @@ impl EventRepository Ok(result) } + async fn version_provider( + &self, + event: &RestaurantEvent, + ) -> Result, ErrorMessage> { + get_latest_event(&event.identifier(), &self.database) + .await + .map(|event_entity| Some(event_entity.event_id)) + } } /// Map the EventEntity into the Restaurant domain events diff --git a/src/application/api.rs b/src/application/api.rs index df54186..c182010 100644 --- a/src/application/api.rs +++ b/src/application/api.rs @@ -20,7 +20,7 @@ pub type OrderAggregate<'a, R> = EventSourcedAggregate< Option, OrderEvent, R, - Decider<'a, OrderCommand, Option, OrderEvent>, + Decider<'a, OrderCommand, Option, OrderEvent, ErrorMessage>, Uuid, ErrorMessage, >; @@ -31,7 +31,7 @@ pub type RestaurantAggregate<'a, R> = EventSourcedAggregate< Option, RestaurantEvent, R, - Decider<'a, RestaurantCommand, Option, RestaurantEvent>, + Decider<'a, RestaurantCommand, Option, RestaurantEvent, ErrorMessage>, Uuid, ErrorMessage, >; diff --git a/src/domain/order_decider.rs b/src/domain/order_decider.rs index 75b9688..523108d 100644 --- a/src/domain/order_decider.rs +++ b/src/domain/order_decider.rs @@ -25,19 +25,19 @@ pub fn order_decider<'a>() -> OrderDecider<'a> { decide: Box::new(|command, state| match command { OrderCommand::Create(command) => { if state.is_some() { - vec![OrderEvent::NotCreated(OrderNotCreated { + Ok(vec![OrderEvent::NotCreated(OrderNotCreated { identifier: command.identifier.to_owned(), restaurant_identifier: command.restaurant_identifier.to_owned(), line_items: command.line_items.to_owned(), reason: Reason("Order already exists".to_string()), - })] + })]) } else { - vec![OrderEvent::Created(OrderCreated { + Ok(vec![OrderEvent::Created(OrderCreated { identifier: command.identifier.to_owned(), restaurant_identifier: command.restaurant_identifier.to_owned(), status: OrderStatus::Created, line_items: command.line_items.to_owned(), - })] + })]) } } OrderCommand::MarkAsPrepared(command) => { @@ -45,15 +45,15 @@ pub fn order_decider<'a>() -> OrderDecider<'a> { .clone() .is_some_and(|s| OrderStatus::Created == s.status) { - vec![OrderEvent::Prepared(OrderPrepared { + Ok(vec![OrderEvent::Prepared(OrderPrepared { identifier: command.identifier.to_owned(), status: OrderStatus::Prepared, - })] + })]) } else { - vec![OrderEvent::NotPrepared(OrderNotPrepared { + Ok(vec![OrderEvent::NotPrepared(OrderNotPrepared { identifier: command.identifier.to_owned(), reason: Reason("Order in the wrong status previously".to_string()), - })] + })]) } } }), @@ -125,23 +125,23 @@ mod order_decider_tests { let new_events = decider.compute_new_events(&[], &create_order_command); assert_eq!( new_events, - [OrderEvent::Created(OrderCreated { + Ok(vec![OrderEvent::Created(OrderCreated { identifier: identifier.clone(), restaurant_identifier: restaurant_identifier.clone(), status: OrderStatus::Created, line_items: line_items.clone(), - })] + })]) ); // ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE let new_state = decider.compute_new_state(None, &create_order_command); assert_eq!( new_state, - Some(Order { + Ok(Some(Order { identifier: identifier.clone(), restaurant_identifier: restaurant_identifier.clone(), status: OrderStatus::Created, line_items: line_items.clone(), - }) + })) ); // The command to create an order - MarkOrderAsPrepared @@ -159,10 +159,10 @@ mod order_decider_tests { let new_events = decider.compute_new_events(&old_events, &mark_order_as_prepared); assert_eq!( new_events, - [OrderEvent::Prepared(OrderPrepared { + Ok(vec![OrderEvent::Prepared(OrderPrepared { identifier: identifier.clone(), status: OrderStatus::Prepared, - })] + })]) ); // ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE @@ -175,12 +175,12 @@ mod order_decider_tests { let new_state = decider.compute_new_state(Some(old_state), &mark_order_as_prepared); assert_eq!( new_state, - Some(Order { + Ok(Some(Order { identifier: identifier.clone(), restaurant_identifier: restaurant_identifier.clone(), status: OrderStatus::Prepared, line_items: line_items.clone(), - }) + })) ); } } diff --git a/src/domain/restaurant_decider.rs b/src/domain/restaurant_decider.rs index 6bb64f7..5737a61 100644 --- a/src/domain/restaurant_decider.rs +++ b/src/domain/restaurant_decider.rs @@ -26,48 +26,50 @@ pub fn restaurant_decider<'a>() -> RestaurantDecider<'a> { decide: Box::new(|command, state| match command { RestaurantCommand::CreateRestaurant(command) => { if state.is_some() { - vec![RestaurantEvent::NotCreated(RestaurantNotCreated { + Ok(vec![RestaurantEvent::NotCreated(RestaurantNotCreated { identifier: command.identifier.to_owned(), name: command.name.to_owned(), menu: command.menu.to_owned(), reason: Reason("Restaurant already exists".to_string()), - })] + })]) } else { - vec![RestaurantEvent::Created(RestaurantCreated { + Ok(vec![RestaurantEvent::Created(RestaurantCreated { identifier: command.identifier.to_owned(), name: command.name.to_owned(), menu: command.menu.to_owned(), - })] + })]) } } RestaurantCommand::ChangeMenu(command) => { if state.is_some() { - vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged { + Ok(vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged { identifier: command.identifier.to_owned(), menu: command.menu.to_owned(), - })] + })]) } else { - vec![RestaurantEvent::MenuNotChanged(RestaurantMenuNotChanged { - identifier: command.identifier.to_owned(), - menu: command.menu.to_owned(), - reason: Reason("Restaurant does not exist".to_string()), - })] + Ok(vec![RestaurantEvent::MenuNotChanged( + RestaurantMenuNotChanged { + identifier: command.identifier.to_owned(), + menu: command.menu.to_owned(), + reason: Reason("Restaurant does not exist".to_string()), + }, + )]) } } RestaurantCommand::PlaceOrder(command) => { if state.is_some() { - vec![RestaurantEvent::OrderPlaced(OrderPlaced { + Ok(vec![RestaurantEvent::OrderPlaced(OrderPlaced { identifier: command.identifier.to_owned(), order_identifier: command.order_identifier.to_owned(), line_items: command.line_items.to_owned(), - })] + })]) } else { - vec![RestaurantEvent::OrderNotPlaced(OrderNotPlaced { + Ok(vec![RestaurantEvent::OrderNotPlaced(OrderNotPlaced { identifier: command.identifier.to_owned(), order_identifier: command.order_identifier.to_owned(), line_items: command.line_items.to_owned(), reason: Reason("Restaurant does not exist".to_string()), - })] + })]) } } }), @@ -148,7 +150,7 @@ mod restaurant_decider_tests { let new_events = decider.compute_new_events(&[], &create_restaurant_command); assert_eq!( new_events, - [RestaurantEvent::Created(RestaurantCreated { + Ok(vec![RestaurantEvent::Created(RestaurantCreated { identifier: restaurant_identifier.clone(), name: RestaurantName("Restaurant 1".to_string()), menu: RestaurantMenu { @@ -156,13 +158,13 @@ mod restaurant_decider_tests { items: menu_items.clone(), cuisine: RestaurantMenuCuisine::Vietnamese, }, - })] + })]) ); // ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE let new_state = decider.compute_new_state(None, &create_restaurant_command); assert_eq!( new_state, - Some(Restaurant { + Ok(Some(Restaurant { identifier: restaurant_identifier.clone(), name: RestaurantName("Restaurant 1".to_string()), menu: RestaurantMenu { @@ -170,7 +172,7 @@ mod restaurant_decider_tests { items: menu_items.clone(), cuisine: RestaurantMenuCuisine::Vietnamese, }, - }) + })) ); // The command to create an order - MarkOrderAsPrepared @@ -196,14 +198,14 @@ mod restaurant_decider_tests { let new_events = decider.compute_new_events(&old_events, &change_restaurant_menu); assert_eq!( new_events, - [RestaurantEvent::MenuChanged(RestaurantMenuChanged { + Ok(vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged { identifier: restaurant_identifier.clone(), menu: RestaurantMenu { menu_id: menu_id.clone(), items: menu_items.clone(), cuisine: RestaurantMenuCuisine::Japanese, }, - })] + })]) ); // ### StateStored flavour ### - Test the decider: given STATE, when COMMAND, then NEW STATE @@ -219,7 +221,7 @@ mod restaurant_decider_tests { let new_state = decider.compute_new_state(Some(old_state), &change_restaurant_menu); assert_eq!( new_state, - Some(Restaurant { + Ok(Some(Restaurant { identifier: restaurant_identifier.clone(), name: RestaurantName("Restaurant 1".to_string()), menu: RestaurantMenu { @@ -227,7 +229,7 @@ mod restaurant_decider_tests { items: menu_items.clone(), cuisine: RestaurantMenuCuisine::Japanese, }, - }) + })) ); } } diff --git a/src/main.rs b/src/main.rs index e43e549..96954c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ use crate::domain::restaurant_view::restaurant_view; use actix_cors::Cors; use actix_web::middleware::Logger; use actix_web::{http::header, web, App, HttpServer}; +use adapter::database::error::ErrorMessage; use dotenv::dotenv; use env_logger::{init_from_env, Env}; use fmodel_rust::aggregate::EventSourcedAggregate; @@ -95,12 +96,20 @@ async fn main() -> std::io::Result<()> { // Create the restaurant aggregate - command side let restaurant_aggregate = Arc::new(EventSourcedAggregate::new( restaurant_event_repository, - restaurant_decider(), + // Decider + // Error type needs to match the error type of the aggregate + restaurant_decider().map_error(&|_| ErrorMessage { + message: "Restaurant decider error".to_string(), + }), )); // Create the order aggregate - command side let order_aggregate = Arc::new(EventSourcedAggregate::new( order_event_repository, - order_decider(), + // Decider + // Error type needs to match the error type of the aggregate + order_decider().map_error(&|_| ErrorMessage { + message: "Order decider error".to_string(), + }), )); // ##### COMMAND SIDE - create one aggregate that combines all deciders - monolithic scenario ##### @@ -109,7 +118,13 @@ async fn main() -> std::io::Result<()> { // Combined aggregate - command side let _combined_aggregate = Arc::new(EventSourcedAggregate::new( event_repository, - restaurant_decider().combine(order_decider()), + // Decider + // Error type needs to match the error type of the aggregate + restaurant_decider() + .combine(order_decider()) + .map_error(&|_| ErrorMessage { + message: "Decider error".to_string(), + }), )); // ###### QUERY SIDE ######