From 003faf1f9108507d3bd9861d0c5b29dbe86feee4 Mon Sep 17 00:00:00 2001 From: Jakub Rozmiarek Date: Fri, 9 May 2025 13:07:42 +0200 Subject: [PATCH 1/2] Introduced state projectors responsible for building process state from events. - improved responsibility segregation: state management logic moved to the projectors from process managers - ProcessState definitions moved to the projectors which also simplified with_state method --- .../determine_vat_rates_on_order_placed.rb | 25 ++--------- .../processes/order_item_invoicing_process.rb | 30 ++----------- .../lib/processes/release_payment_process.rb | 33 ++------------ .../lib/processes/reservation_process.rb | 18 ++------ .../lib/processes/shipment_process.rb | 19 ++------ .../determine_vat_rates_on_order_placed.rb | 30 +++++++++++++ .../order_item_invoicing_process.rb | 35 +++++++++++++++ .../release_payment_process.rb | 37 ++++++++++++++++ .../state_projectors/reservation_process.rb | 35 +++++++++++++++ .../state_projectors/shipment_process.rb | 24 +++++++++++ .../state_projectors/three_plus_one_free.rb | 41 ++++++++++++++++++ .../lib/processes/three_plus_one_free.rb | 36 ++-------------- infra/lib/infra/process_manager.rb | 43 +++++++++---------- 13 files changed, 241 insertions(+), 165 deletions(-) create mode 100644 ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb create mode 100644 ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb create mode 100644 ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb create mode 100644 ecommerce/processes/lib/processes/state_projectors/reservation_process.rb create mode 100644 ecommerce/processes/lib/processes/state_projectors/shipment_process.rb create mode 100644 ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb diff --git a/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb b/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb index 18d697ec..884f754f 100644 --- a/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb +++ b/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb @@ -1,6 +1,8 @@ +require_relative 'state_projectors/determine_vat_rates_on_order_placed' + module Processes class DetermineVatRatesOnOrderPlaced - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state(StateProjectors::DetermineVatRatesOnOrderPlaced) subscribes_to( Pricing::OfferAccepted, @@ -21,29 +23,8 @@ def determine_vat_rates end end - def apply(event) - case event - when Pricing::OfferAccepted - state.with( - offer_accepted: true, - order_lines: event.data.fetch(:order_lines), - order_id: event.data.fetch(:order_id) - ) - when Fulfillment::OrderRegistered - state.with(order_placed: true) - end - end - def fetch_id(event) event.data.fetch(:order_id) end - - ProcessState = Data.define(:offer_accepted, :order_placed, :order_id, :order_lines) do - def initialize(offer_accepted: false, order_placed: false, order_id: nil, order_lines: []) - super - end - - def placed? = offer_accepted && order_placed - end end end diff --git a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb index 410a8a9a..d3638bd8 100644 --- a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb +++ b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb @@ -1,6 +1,8 @@ +require_relative 'state_projectors/order_item_invoicing_process' + module Processes class OrderItemInvoicingProcess - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state(StateProjectors::OrderItemInvoicingProcess) subscribes_to( Pricing::PriceItemValueCalculated, @@ -26,35 +28,9 @@ def act end end - def apply(event) - case event - when Pricing::PriceItemValueCalculated - state.with( - order_id: event.data.fetch(:order_id), - product_id: event.data.fetch(:product_id), - quantity: event.data.fetch(:quantity), - discounted_amount: event.data.fetch(:discounted_amount) - ) - when Taxes::VatRateDetermined - state.with( - vat_rate: event.data.fetch(:vat_rate) - ) - end - end - def fetch_id(event) "#{event.data.fetch(:order_id)}$#{event.data.fetch(:product_id)}" end - - ProcessState = Data.define(:order_id, :product_id, :quantity, :vat_rate, :discounted_amount) do - def initialize(order_id: nil, product_id: nil, quantity: nil, vat_rate: nil, discounted_amount: nil) - super - end - - def can_create_invoice_item? - order_id && product_id && quantity && vat_rate && discounted_amount - end - end end class MoneySplitter diff --git a/ecommerce/processes/lib/processes/release_payment_process.rb b/ecommerce/processes/lib/processes/release_payment_process.rb index ee126665..b18fe989 100644 --- a/ecommerce/processes/lib/processes/release_payment_process.rb +++ b/ecommerce/processes/lib/processes/release_payment_process.rb @@ -1,7 +1,8 @@ +require_relative 'state_projectors/release_payment_process' + module Processes class ReleasePaymentProcess - include Infra::ProcessManager.with_state { ProcessState } - + include Infra::ProcessManager.with_state(StateProjectors::ReleasePaymentProcess) subscribes_to( Payments::PaymentAuthorized, Payments::PaymentReleased, @@ -16,24 +17,6 @@ def act release_payment if state.release? end - def apply(event) - case event - when Payments::PaymentAuthorized - state.with(payment: :authorized) - when Payments::PaymentReleased - state.with(payment: :released) - when Fulfillment::OrderRegistered - state.with( - order: :placed, - order_id: event.data.fetch(:order_id) - ) - when Pricing::OfferExpired - state.with(order: :expired) - when Fulfillment::OrderConfirmed - state.with(order: :confirmed) - end - end - def release_payment command_bus.call(Payments::ReleasePayment.new(order_id: state.order_id)) end @@ -41,15 +24,5 @@ def release_payment def fetch_id(event) event.data.fetch(:order_id) end - - ProcessState = Data.define(:order, :payment, :order_id) do - def initialize(order: :draft, payment: :none, order_id: nil) - super - end - - def release? - payment.eql?(:authorized) && order.eql?(:expired) - end - end end end diff --git a/ecommerce/processes/lib/processes/reservation_process.rb b/ecommerce/processes/lib/processes/reservation_process.rb index 402c1876..a532ae90 100644 --- a/ecommerce/processes/lib/processes/reservation_process.rb +++ b/ecommerce/processes/lib/processes/reservation_process.rb @@ -1,6 +1,8 @@ +require_relative 'state_projectors/reservation_process' + module Processes class ReservationProcess - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state(StateProjectors::ReservationProcess) subscribes_to( Pricing::OfferAccepted, @@ -28,20 +30,6 @@ def act end end - def apply(event) - case event - when Pricing::OfferAccepted - state.with( - order: :accepted, - order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h - ) - when Fulfillment::OrderCancelled - state.with(order: :cancelled) - when Fulfillment::OrderConfirmed - state.with(order: :confirmed) - end - end - def reserve_stock unavailable_products = [] reserved_products = [] diff --git a/ecommerce/processes/lib/processes/shipment_process.rb b/ecommerce/processes/lib/processes/shipment_process.rb index 67f0d432..d1255db7 100644 --- a/ecommerce/processes/lib/processes/shipment_process.rb +++ b/ecommerce/processes/lib/processes/shipment_process.rb @@ -1,6 +1,8 @@ +require_relative 'state_projectors/shipment_process' + module Processes class ShipmentProcess - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state(StateProjectors::ShipmentProcess) subscribes_to( Shipping::ShippingAddressAddedToShipment, @@ -21,17 +23,6 @@ def act end end - def apply(event) - case event - when Shipping::ShippingAddressAddedToShipment - state.with(shipment: :address_set) - when Fulfillment::OrderRegistered - state.with(order: :placed) - when Fulfillment::OrderConfirmed - state.with(order: :confirmed) - end - end - def submit_shipment command_bus.call(Shipping::SubmitShipment.new(order_id: id)) end @@ -43,9 +34,5 @@ def authorize_shipment def fetch_id(event) event.data.fetch(:order_id) end - - ProcessState = Data.define(:order, :shipment) do - def initialize(order: nil, shipment: nil) = super - end end end diff --git a/ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb b/ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb new file mode 100644 index 00000000..316f48be --- /dev/null +++ b/ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb @@ -0,0 +1,30 @@ +module Processes + module StateProjectors + class DetermineVatRatesOnOrderPlaced + ProcessState = Data.define(:offer_accepted, :order_placed, :order_id, :order_lines) do + def initialize(offer_accepted: false, order_placed: false, order_id: nil, order_lines: []) + super + end + + def placed? = offer_accepted && order_placed + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::OfferAccepted + state_instance.with( + offer_accepted: true, + order_lines: event.data.fetch(:order_lines), + order_id: event.data.fetch(:order_id) + ) + when Fulfillment::OrderRegistered + state_instance.with(order_placed: true) + end + end + end + end +end diff --git a/ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb b/ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb new file mode 100644 index 00000000..8bdc9604 --- /dev/null +++ b/ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb @@ -0,0 +1,35 @@ +module Processes + module StateProjectors + class OrderItemInvoicingProcess + ProcessState = Data.define(:order_id, :product_id, :quantity, :vat_rate, :discounted_amount) do + def initialize(order_id: nil, product_id: nil, quantity: nil, vat_rate: nil, discounted_amount: nil) + super + end + + def can_create_invoice_item? + order_id && product_id && quantity && vat_rate && discounted_amount + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::PriceItemValueCalculated + state_instance.with( + order_id: event.data.fetch(:order_id), + product_id: event.data.fetch(:product_id), + quantity: event.data.fetch(:quantity), + discounted_amount: event.data.fetch(:discounted_amount) + ) + when Taxes::VatRateDetermined + state_instance.with( + vat_rate: event.data.fetch(:vat_rate) + ) + end + end + end + end +end diff --git a/ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb b/ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb new file mode 100644 index 00000000..04accf41 --- /dev/null +++ b/ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb @@ -0,0 +1,37 @@ +module Processes + module StateProjectors + class ReleasePaymentProcess + ProcessState = Data.define(:order, :payment, :order_id) do + def initialize(order: :draft, payment: :none, order_id: nil) + super + end + + def release? + payment.eql?(:authorized) && order.eql?(:expired) + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Payments::PaymentAuthorized + state_instance.with(payment: :authorized) + when Payments::PaymentReleased + state_instance.with(payment: :released) + when Fulfillment::OrderRegistered + state_instance.with( + order: :placed, + order_id: event.data.fetch(:order_id) + ) + when Pricing::OfferExpired + state_instance.with(order: :expired) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end + end + end +end diff --git a/ecommerce/processes/lib/processes/state_projectors/reservation_process.rb b/ecommerce/processes/lib/processes/state_projectors/reservation_process.rb new file mode 100644 index 00000000..55c54f6b --- /dev/null +++ b/ecommerce/processes/lib/processes/state_projectors/reservation_process.rb @@ -0,0 +1,35 @@ +module Processes + module StateProjectors + class ReservationProcess + ProcessState = Data.define(:order, :order_lines) do + def initialize(order: nil, order_lines: []) + super(order:, order_lines: order_lines.freeze) + end + + def reserved_product_ids + order_lines.keys + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::OfferAccepted + state_instance.with( + order: :accepted, + order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h + ) + when Fulfillment::OrderCancelled + state_instance.with(order: :cancelled) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + else + state_instance + end + end + end + end +end diff --git a/ecommerce/processes/lib/processes/state_projectors/shipment_process.rb b/ecommerce/processes/lib/processes/state_projectors/shipment_process.rb new file mode 100644 index 00000000..bdae8de4 --- /dev/null +++ b/ecommerce/processes/lib/processes/state_projectors/shipment_process.rb @@ -0,0 +1,24 @@ +module Processes + module StateProjectors + class ShipmentProcess + ProcessState = Data.define(:order, :shipment) do + def initialize(order: nil, shipment: nil) = super + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Shipping::ShippingAddressAddedToShipment + state_instance.with(shipment: :address_set) + when Fulfillment::OrderRegistered + state_instance.with(order: :placed) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end + end + end +end diff --git a/ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb b/ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb new file mode 100644 index 00000000..47869e25 --- /dev/null +++ b/ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb @@ -0,0 +1,41 @@ +module Processes + module StateProjectors + class ThreePlusOneFree + ProcessState = Data.define(:lines, :free_product) do + def initialize(lines: [], free_product: nil) + super(lines: lines.freeze, free_product:) + end + + MIN_ORDER_LINES_QUANTITY = 4 + + def eligible_free_product + if lines.size >= MIN_ORDER_LINES_QUANTITY + lines.sort_by { _1.fetch(:price) }.first.fetch(:product_id) + end + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + product_id = event.data.fetch(:product_id) + case event + when Pricing::PriceItemAdded + lines = (state_instance.lines + [{ product_id:, price: event.data.fetch(:price) }]) + state_instance.with(lines:) + when Pricing::PriceItemRemoved + lines = state_instance.lines.dup + index_of_line_to_remove = lines.index { |line| line.fetch(:product_id) == product_id } + lines.delete_at(index_of_line_to_remove) + state_instance.with(lines:) + when Pricing::ProductMadeFreeForOrder + state_instance.with(free_product: product_id) + when Pricing::FreeProductRemovedFromOrder + state_instance.with(free_product: nil) + end + end + end + end +end diff --git a/ecommerce/processes/lib/processes/three_plus_one_free.rb b/ecommerce/processes/lib/processes/three_plus_one_free.rb index 4a8c9b92..d776e606 100644 --- a/ecommerce/processes/lib/processes/three_plus_one_free.rb +++ b/ecommerce/processes/lib/processes/three_plus_one_free.rb @@ -1,6 +1,8 @@ +require_relative 'state_projectors/three_plus_one_free' + module Processes class ThreePlusOneFree - include Infra::ProcessManager.with_state { ProcessState } + include Infra::ProcessManager.with_state(StateProjectors::ThreePlusOneFree) subscribes_to( Pricing::PriceItemAdded, @@ -22,24 +24,6 @@ def act end end - def apply(event) - product_id = event.data.fetch(:product_id) - case event - when Pricing::PriceItemAdded - lines = (state.lines + [{ product_id:, price: event.data.fetch(:price) }]) - state.with(lines:) - when Pricing::PriceItemRemoved - lines = state.lines.dup - index_of_line_to_remove = lines.index { |line| line.fetch(:product_id) == product_id } - lines.delete_at(index_of_line_to_remove) - state.with(lines:) - when Pricing::ProductMadeFreeForOrder - state.with(free_product: product_id) - when Pricing::FreeProductRemovedFromOrder - state.with(free_product: nil) - end - end - def remove_old_free_product(product_id) command_bus.call(Pricing::RemoveFreeProductFromOrder.new(order_id: id, product_id:)) end @@ -51,19 +35,5 @@ def make_new_product_for_free(product_id) def fetch_id(event) event.data.fetch(:order_id) end - - ProcessState = Data.define(:lines, :free_product) do - def initialize(lines: [], free_product: nil) - super(lines: lines.freeze, free_product:) - end - - MIN_ORDER_LINES_QUANTITY = 4 - - def eligible_free_product - if lines.size >= MIN_ORDER_LINES_QUANTITY - lines.sort_by { _1.fetch(:price) }.first.fetch(:product_id) - end - end - end end end diff --git a/infra/lib/infra/process_manager.rb b/infra/lib/infra/process_manager.rb index 0b211a58..8e34d2d3 100644 --- a/infra/lib/infra/process_manager.rb +++ b/infra/lib/infra/process_manager.rb @@ -15,14 +15,22 @@ def call(event) private - attr_reader :event_store, :command_bus, :id + attr_reader :event_store, :command_bus, :id, :state def build_state(event) + projector_class = self.class.instance_variable_get(:@projector_class) + raise "State projector class not found/configured for #{self.class}" unless projector_class with_retry do past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name:, expected_version: last_stored) - (past_events + [event]).each { |ev| @state = apply(ev) } + last_stored_idx = past_events.empty? ? -1 : past_events.size - 1 + event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored_idx) + + current_projected_state = projector_class.initial_state_instance + all_events_to_apply = past_events + [event] + all_events_to_apply.uniq(&:event_id).each do |ev| + current_projected_state = projector_class.apply(current_projected_state, ev) + end + @state = current_projected_state end end @@ -43,31 +51,22 @@ def subscribes_to(*events) attr_reader :subscribed_events end - def self.with_state(&state_class_block) - unless block_given? - raise ArgumentError, "A block returning the state class is required." + def self.with_state(projector_class) + unless projector_class && projector_class.respond_to?(:apply) && projector_class.respond_to?(:initial_state_instance) + raise ArgumentError, "Projector class must be valid and respond to :apply and :initial_state_instance." end Module.new do - @state_definition_block = state_class_block - + @projector_class_config = projector_class define_method(:initial_state) do - block = self.class.instance_variable_get(:@state_definition_block) - raise "State definition block not found on #{self.class}" unless block - - state_class = block.call - raise "State definition block did not return a Class" unless state_class.is_a?(Class) - - state_class.new - end - - define_method(:state) do - @state ||= initial_state + configured_projector = self.class.instance_variable_get(:@projector_class) + raise "Projector class not found on #{self.class}" unless configured_projector + configured_projector.initial_state_instance end def self.included(host_class) - host_class.instance_variable_set(:@state_definition_block, @state_definition_block) - + projector_to_set = @projector_class_config + host_class.instance_variable_set(:@projector_class, projector_to_set) host_class.include(ProcessMethods) host_class.include(Infra::Retry) host_class.extend(Subscriptions) From 86fb96c5b2f2e1469e8a40774de5ba8bb5f2e4a8 Mon Sep 17 00:00:00 2001 From: Jakub Rozmiarek Date: Mon, 12 May 2025 11:13:39 +0200 Subject: [PATCH 2/2] State projectors moved to the same file where process manager class is defined. - state projector class is now nested inside each process manager class defintion to emphasize it's a part of the process - Infra::Process manager with_state definition uses block again to allow state projector class to be defined below --- .../determine_vat_rates_on_order_placed.rb | 31 +++++++++-- .../processes/order_item_invoicing_process.rb | 37 ++++++++++++-- .../lib/processes/release_payment_process.rb | 38 ++++++++++++-- .../lib/processes/reservation_process.rb | 35 ++++++++++--- .../lib/processes/shipment_process.rb | 25 +++++++-- .../determine_vat_rates_on_order_placed.rb | 30 ----------- .../order_item_invoicing_process.rb | 35 ------------- .../release_payment_process.rb | 37 -------------- .../state_projectors/reservation_process.rb | 35 ------------- .../state_projectors/shipment_process.rb | 24 --------- .../state_projectors/three_plus_one_free.rb | 41 --------------- .../lib/processes/three_plus_one_free.rb | 42 +++++++++++++-- infra/lib/infra/process_manager.rb | 51 ++++++++++++++----- 13 files changed, 224 insertions(+), 237 deletions(-) delete mode 100644 ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb delete mode 100644 ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb delete mode 100644 ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb delete mode 100644 ecommerce/processes/lib/processes/state_projectors/reservation_process.rb delete mode 100644 ecommerce/processes/lib/processes/state_projectors/shipment_process.rb delete mode 100644 ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb diff --git a/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb b/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb index 884f754f..2760b3ce 100644 --- a/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb +++ b/ecommerce/processes/lib/processes/determine_vat_rates_on_order_placed.rb @@ -1,8 +1,6 @@ -require_relative 'state_projectors/determine_vat_rates_on_order_placed' - module Processes class DetermineVatRatesOnOrderPlaced - include Infra::ProcessManager.with_state(StateProjectors::DetermineVatRatesOnOrderPlaced) + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::OfferAccepted, @@ -26,5 +24,32 @@ def determine_vat_rates def fetch_id(event) event.data.fetch(:order_id) end + + class StateProjector + ProcessState = Data.define(:offer_accepted, :order_placed, :order_id, :order_lines) do + def initialize(offer_accepted: false, order_placed: false, order_id: nil, order_lines: []) + super + end + + def placed? = offer_accepted && order_placed + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::OfferAccepted + state_instance.with( + offer_accepted: true, + order_lines: event.data.fetch(:order_lines), + order_id: event.data.fetch(:order_id) + ) + when Fulfillment::OrderRegistered + state_instance.with(order_placed: true) + end + end + end end end diff --git a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb index d3638bd8..82c36fcd 100644 --- a/ecommerce/processes/lib/processes/order_item_invoicing_process.rb +++ b/ecommerce/processes/lib/processes/order_item_invoicing_process.rb @@ -1,9 +1,6 @@ -require_relative 'state_projectors/order_item_invoicing_process' - module Processes class OrderItemInvoicingProcess - include Infra::ProcessManager.with_state(StateProjectors::OrderItemInvoicingProcess) - + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::PriceItemValueCalculated, Taxes::VatRateDetermined @@ -58,4 +55,36 @@ def call distributed_amounts end end + + class StateProjector + ProcessState = Data.define(:order_id, :product_id, :quantity, :vat_rate, :discounted_amount) do + def initialize(order_id: nil, product_id: nil, quantity: nil, vat_rate: nil, discounted_amount: nil) + super + end + + def can_create_invoice_item? + order_id && product_id && quantity && vat_rate && discounted_amount + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::PriceItemValueCalculated + state_instance.with( + order_id: event.data.fetch(:order_id), + product_id: event.data.fetch(:product_id), + quantity: event.data.fetch(:quantity), + discounted_amount: event.data.fetch(:discounted_amount) + ) + when Taxes::VatRateDetermined + state_instance.with( + vat_rate: event.data.fetch(:vat_rate) + ) + end + end + end end diff --git a/ecommerce/processes/lib/processes/release_payment_process.rb b/ecommerce/processes/lib/processes/release_payment_process.rb index b18fe989..e0f4d095 100644 --- a/ecommerce/processes/lib/processes/release_payment_process.rb +++ b/ecommerce/processes/lib/processes/release_payment_process.rb @@ -1,8 +1,6 @@ -require_relative 'state_projectors/release_payment_process' - module Processes class ReleasePaymentProcess - include Infra::ProcessManager.with_state(StateProjectors::ReleasePaymentProcess) + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Payments::PaymentAuthorized, Payments::PaymentReleased, @@ -24,5 +22,39 @@ def release_payment def fetch_id(event) event.data.fetch(:order_id) end + + class StateProjector + ProcessState = Data.define(:order, :payment, :order_id) do + def initialize(order: :draft, payment: :none, order_id: nil) + super + end + + def release? + payment.eql?(:authorized) && order.eql?(:expired) + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Payments::PaymentAuthorized + state_instance.with(payment: :authorized) + when Payments::PaymentReleased + state_instance.with(payment: :released) + when Fulfillment::OrderRegistered + state_instance.with( + order: :placed, + order_id: event.data.fetch(:order_id) + ) + when Pricing::OfferExpired + state_instance.with(order: :expired) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end + end end end diff --git a/ecommerce/processes/lib/processes/reservation_process.rb b/ecommerce/processes/lib/processes/reservation_process.rb index a532ae90..c657e986 100644 --- a/ecommerce/processes/lib/processes/reservation_process.rb +++ b/ecommerce/processes/lib/processes/reservation_process.rb @@ -1,8 +1,6 @@ -require_relative 'state_projectors/reservation_process' - module Processes class ReservationProcess - include Infra::ProcessManager.with_state(StateProjectors::ReservationProcess) + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::OfferAccepted, @@ -72,12 +70,34 @@ def fetch_id(event) event.data.fetch(:order_id) end - ProcessState = Data.define(:order, :order_lines) do - def initialize(order: nil, order_lines: []) - super(order:, order_lines: order_lines.freeze) + class StateProjector + ProcessState = Data.define(:order, :order_lines) do + def initialize(order: nil, order_lines: []) + super(order: order, order_lines: order_lines.freeze) + end + + def reserved_product_ids + order_lines.keys + end end - def reserved_product_ids = order_lines.keys + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Pricing::OfferAccepted + state_instance.with( + order: :accepted, + order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h + ) + when Fulfillment::OrderCancelled + state_instance.with(order: :cancelled) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end end class SomeInventoryNotAvailable < StandardError @@ -87,5 +107,6 @@ def initialize(unavailable_products) @unavailable_products = unavailable_products end end + end end diff --git a/ecommerce/processes/lib/processes/shipment_process.rb b/ecommerce/processes/lib/processes/shipment_process.rb index d1255db7..b658bcb7 100644 --- a/ecommerce/processes/lib/processes/shipment_process.rb +++ b/ecommerce/processes/lib/processes/shipment_process.rb @@ -1,8 +1,6 @@ -require_relative 'state_projectors/shipment_process' - module Processes class ShipmentProcess - include Infra::ProcessManager.with_state(StateProjectors::ShipmentProcess) + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Shipping::ShippingAddressAddedToShipment, @@ -34,5 +32,26 @@ def authorize_shipment def fetch_id(event) event.data.fetch(:order_id) end + + class StateProjector + ProcessState = Data.define(:order, :shipment) do + def initialize(order: nil, shipment: nil) = super + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + case event + when Shipping::ShippingAddressAddedToShipment + state_instance.with(shipment: :address_set) + when Fulfillment::OrderRegistered + state_instance.with(order: :placed) + when Fulfillment::OrderConfirmed + state_instance.with(order: :confirmed) + end + end + end end end diff --git a/ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb b/ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb deleted file mode 100644 index 316f48be..00000000 --- a/ecommerce/processes/lib/processes/state_projectors/determine_vat_rates_on_order_placed.rb +++ /dev/null @@ -1,30 +0,0 @@ -module Processes - module StateProjectors - class DetermineVatRatesOnOrderPlaced - ProcessState = Data.define(:offer_accepted, :order_placed, :order_id, :order_lines) do - def initialize(offer_accepted: false, order_placed: false, order_id: nil, order_lines: []) - super - end - - def placed? = offer_accepted && order_placed - end - - def self.initial_state_instance - ProcessState.new - end - - def self.apply(state_instance, event) - case event - when Pricing::OfferAccepted - state_instance.with( - offer_accepted: true, - order_lines: event.data.fetch(:order_lines), - order_id: event.data.fetch(:order_id) - ) - when Fulfillment::OrderRegistered - state_instance.with(order_placed: true) - end - end - end - end -end diff --git a/ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb b/ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb deleted file mode 100644 index 8bdc9604..00000000 --- a/ecommerce/processes/lib/processes/state_projectors/order_item_invoicing_process.rb +++ /dev/null @@ -1,35 +0,0 @@ -module Processes - module StateProjectors - class OrderItemInvoicingProcess - ProcessState = Data.define(:order_id, :product_id, :quantity, :vat_rate, :discounted_amount) do - def initialize(order_id: nil, product_id: nil, quantity: nil, vat_rate: nil, discounted_amount: nil) - super - end - - def can_create_invoice_item? - order_id && product_id && quantity && vat_rate && discounted_amount - end - end - - def self.initial_state_instance - ProcessState.new - end - - def self.apply(state_instance, event) - case event - when Pricing::PriceItemValueCalculated - state_instance.with( - order_id: event.data.fetch(:order_id), - product_id: event.data.fetch(:product_id), - quantity: event.data.fetch(:quantity), - discounted_amount: event.data.fetch(:discounted_amount) - ) - when Taxes::VatRateDetermined - state_instance.with( - vat_rate: event.data.fetch(:vat_rate) - ) - end - end - end - end -end diff --git a/ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb b/ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb deleted file mode 100644 index 04accf41..00000000 --- a/ecommerce/processes/lib/processes/state_projectors/release_payment_process.rb +++ /dev/null @@ -1,37 +0,0 @@ -module Processes - module StateProjectors - class ReleasePaymentProcess - ProcessState = Data.define(:order, :payment, :order_id) do - def initialize(order: :draft, payment: :none, order_id: nil) - super - end - - def release? - payment.eql?(:authorized) && order.eql?(:expired) - end - end - - def self.initial_state_instance - ProcessState.new - end - - def self.apply(state_instance, event) - case event - when Payments::PaymentAuthorized - state_instance.with(payment: :authorized) - when Payments::PaymentReleased - state_instance.with(payment: :released) - when Fulfillment::OrderRegistered - state_instance.with( - order: :placed, - order_id: event.data.fetch(:order_id) - ) - when Pricing::OfferExpired - state_instance.with(order: :expired) - when Fulfillment::OrderConfirmed - state_instance.with(order: :confirmed) - end - end - end - end -end diff --git a/ecommerce/processes/lib/processes/state_projectors/reservation_process.rb b/ecommerce/processes/lib/processes/state_projectors/reservation_process.rb deleted file mode 100644 index 55c54f6b..00000000 --- a/ecommerce/processes/lib/processes/state_projectors/reservation_process.rb +++ /dev/null @@ -1,35 +0,0 @@ -module Processes - module StateProjectors - class ReservationProcess - ProcessState = Data.define(:order, :order_lines) do - def initialize(order: nil, order_lines: []) - super(order:, order_lines: order_lines.freeze) - end - - def reserved_product_ids - order_lines.keys - end - end - - def self.initial_state_instance - ProcessState.new - end - - def self.apply(state_instance, event) - case event - when Pricing::OfferAccepted - state_instance.with( - order: :accepted, - order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h - ) - when Fulfillment::OrderCancelled - state_instance.with(order: :cancelled) - when Fulfillment::OrderConfirmed - state_instance.with(order: :confirmed) - else - state_instance - end - end - end - end -end diff --git a/ecommerce/processes/lib/processes/state_projectors/shipment_process.rb b/ecommerce/processes/lib/processes/state_projectors/shipment_process.rb deleted file mode 100644 index bdae8de4..00000000 --- a/ecommerce/processes/lib/processes/state_projectors/shipment_process.rb +++ /dev/null @@ -1,24 +0,0 @@ -module Processes - module StateProjectors - class ShipmentProcess - ProcessState = Data.define(:order, :shipment) do - def initialize(order: nil, shipment: nil) = super - end - - def self.initial_state_instance - ProcessState.new - end - - def self.apply(state_instance, event) - case event - when Shipping::ShippingAddressAddedToShipment - state_instance.with(shipment: :address_set) - when Fulfillment::OrderRegistered - state_instance.with(order: :placed) - when Fulfillment::OrderConfirmed - state_instance.with(order: :confirmed) - end - end - end - end -end diff --git a/ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb b/ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb deleted file mode 100644 index 47869e25..00000000 --- a/ecommerce/processes/lib/processes/state_projectors/three_plus_one_free.rb +++ /dev/null @@ -1,41 +0,0 @@ -module Processes - module StateProjectors - class ThreePlusOneFree - ProcessState = Data.define(:lines, :free_product) do - def initialize(lines: [], free_product: nil) - super(lines: lines.freeze, free_product:) - end - - MIN_ORDER_LINES_QUANTITY = 4 - - def eligible_free_product - if lines.size >= MIN_ORDER_LINES_QUANTITY - lines.sort_by { _1.fetch(:price) }.first.fetch(:product_id) - end - end - end - - def self.initial_state_instance - ProcessState.new - end - - def self.apply(state_instance, event) - product_id = event.data.fetch(:product_id) - case event - when Pricing::PriceItemAdded - lines = (state_instance.lines + [{ product_id:, price: event.data.fetch(:price) }]) - state_instance.with(lines:) - when Pricing::PriceItemRemoved - lines = state_instance.lines.dup - index_of_line_to_remove = lines.index { |line| line.fetch(:product_id) == product_id } - lines.delete_at(index_of_line_to_remove) - state_instance.with(lines:) - when Pricing::ProductMadeFreeForOrder - state_instance.with(free_product: product_id) - when Pricing::FreeProductRemovedFromOrder - state_instance.with(free_product: nil) - end - end - end - end -end diff --git a/ecommerce/processes/lib/processes/three_plus_one_free.rb b/ecommerce/processes/lib/processes/three_plus_one_free.rb index d776e606..670d5fd9 100644 --- a/ecommerce/processes/lib/processes/three_plus_one_free.rb +++ b/ecommerce/processes/lib/processes/three_plus_one_free.rb @@ -1,8 +1,6 @@ -require_relative 'state_projectors/three_plus_one_free' - module Processes class ThreePlusOneFree - include Infra::ProcessManager.with_state(StateProjectors::ThreePlusOneFree) + include Infra::ProcessManager.with_state { StateProjector } subscribes_to( Pricing::PriceItemAdded, @@ -35,5 +33,43 @@ def make_new_product_for_free(product_id) def fetch_id(event) event.data.fetch(:order_id) end + + class StateProjector + ProcessState = Data.define(:lines, :free_product) do + def initialize(lines: [], free_product: nil) + super(lines: lines.freeze, free_product:) + end + + MIN_ORDER_LINES_QUANTITY = 4 + + def eligible_free_product + if lines.size >= MIN_ORDER_LINES_QUANTITY + lines.sort_by { _1.fetch(:price) }.first.fetch(:product_id) + end + end + end + + def self.initial_state_instance + ProcessState.new + end + + def self.apply(state_instance, event) + product_id = event.data.fetch(:product_id) + case event + when Pricing::PriceItemAdded + lines = (state_instance.lines + [{ product_id:, price: event.data.fetch(:price) }]) + state_instance.with(lines:) + when Pricing::PriceItemRemoved + lines = state_instance.lines.dup + index_of_line_to_remove = lines.index { |line| line.fetch(:product_id) == product_id } + lines.delete_at(index_of_line_to_remove) + state_instance.with(lines:) + when Pricing::ProductMadeFreeForOrder + state_instance.with(free_product: product_id) + when Pricing::FreeProductRemovedFromOrder + state_instance.with(free_product: nil) + end + end + end end end diff --git a/infra/lib/infra/process_manager.rb b/infra/lib/infra/process_manager.rb index 8e34d2d3..90fd3121 100644 --- a/infra/lib/infra/process_manager.rb +++ b/infra/lib/infra/process_manager.rb @@ -18,8 +18,21 @@ def call(event) attr_reader :event_store, :command_bus, :id, :state def build_state(event) - projector_class = self.class.instance_variable_get(:@projector_class) - raise "State projector class not found/configured for #{self.class}" unless projector_class + projector_class_block = self.class.instance_variable_get(:@projector_class_definition_block) + unless projector_class_block + raise "State projector class definition block not found for #{self.class}. "\ + "Ensure it's configured via Infra::ProcessManager.with_state { YourProjectorClass }." + end + + projector_class = projector_class_block.call + unless projector_class.is_a?(Class) && + projector_class.respond_to?(:apply) && + projector_class.respond_to?(:initial_state_instance) + raise ArgumentError, + "The block provided to with_state must return a valid Projector class " \ + "that responds to :apply and :initial_state_instance. Got: #{projector_class.inspect}" + end + with_retry do past_events = event_store.read.stream(stream_name).to_a last_stored_idx = past_events.empty? ? -1 : past_events.size - 1 @@ -27,7 +40,10 @@ def build_state(event) current_projected_state = projector_class.initial_state_instance all_events_to_apply = past_events + [event] - all_events_to_apply.uniq(&:event_id).each do |ev| + + unique_events = all_events_to_apply.uniq(&:event_id) + + unique_events.each do |ev| current_projected_state = projector_class.apply(current_projected_state, ev) end @state = current_projected_state @@ -51,22 +67,33 @@ def subscribes_to(*events) attr_reader :subscribed_events end - def self.with_state(projector_class) - unless projector_class && projector_class.respond_to?(:apply) && projector_class.respond_to?(:initial_state_instance) - raise ArgumentError, "Projector class must be valid and respond to :apply and :initial_state_instance." + def self.with_state(&projector_class_block) + unless block_given? + raise ArgumentError, "A block returning the projector class is required for with_state." end Module.new do - @projector_class_config = projector_class + @projector_class_definition_block_config = projector_class_block + define_method(:initial_state) do - configured_projector = self.class.instance_variable_get(:@projector_class) - raise "Projector class not found on #{self.class}" unless configured_projector - configured_projector.initial_state_instance + block = self.class.instance_variable_get(:@projector_class_definition_block) + unless block + raise "Projector class definition block not found on #{self.class}. " \ + "Was Infra::ProcessManager.with_state called with a block?" + end + + projector_class = block.call + unless projector_class.is_a?(Class) && projector_class.respond_to?(:initial_state_instance) + raise "The block provided to with_state did not return a Class responding to :initial_state_instance. " \ + "Got: #{projector_class.inspect}" + end + projector_class.initial_state_instance end def self.included(host_class) - projector_to_set = @projector_class_config - host_class.instance_variable_set(:@projector_class, projector_to_set) + projector_block_to_set = @projector_class_definition_block_config + host_class.instance_variable_set(:@projector_class_definition_block, projector_block_to_set) + host_class.include(ProcessMethods) host_class.include(Infra::Retry) host_class.extend(Subscriptions)