Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
21 changes: 21 additions & 0 deletions cpp/DataStorm/partialUpdate/AtmosphericConditions.ice
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) ZeroC, Inc.

module ClearSky
{

enum UpdateTag
{
TemperatureUpdated,
HumidityUpdated
}

/// Represents the atmospheric conditions measured by a sensor.
struct AtmosphericConditions
{
/// The temperature in degrees Celsius.
float temperature;

/// The humidity in percent.
float humidity;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
cmake_minimum_required(VERSION 3.21)

project(datastorm_stock CXX)
project(datastorm_sampleFilter CXX)

include(../../cmake/common.cmake)

# This demo requires C++20
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_executable(reader Reader.cpp Stock.ice)
add_executable(reader Reader.cpp AtmosphericConditions.ice)
slice2cpp_generate(reader)
target_link_libraries(reader PRIVATE Ice::Ice Ice::DataStorm)
if(WIN32)
Expand All @@ -20,7 +16,7 @@ if(WIN32)
)
endif()

add_executable(writer Writer.cpp Stock.ice)
add_executable(writer Writer.cpp AtmosphericConditions.ice)
slice2cpp_generate(writer)
target_link_libraries(writer PRIVATE Ice::Ice Ice::DataStorm)
if(WIN32)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# DataStorm Stock
# DataStorm partialUpdate

This demo illustrates the use of partial updates. The writer adds stocks to the
topic and sends partial updates to update the stock price or volume. The reader
prints out the stock information and partial updates.
This demo illustrates the use of **partial updates** in DataStorm.

The demo uses Slice to define the `Demo::Stock` class in the `Stock.ice` file.
The writer publishes atmospheric conditions changes, when both temperature and humidity changes it publishes
a full update, when only one field changes it publishes a partial update for the given field.

The reader and writer register updaters with the topic to process partial update samples.

## Ice prerequisites

Expand Down Expand Up @@ -51,7 +52,4 @@ In a separate window, start the reader:
build\reader
```

You can start multiple writers and readers to publish or follow different
stocks.

[Ice for C++ installation]: https://github.com/zeroc-ice/ice/blob/main/NIGHTLY.md#ice-for-c
70 changes: 70 additions & 0 deletions cpp/DataStorm/partialUpdate/Reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) ZeroC, Inc.

#include <DataStorm/DataStorm.h>
#include <Ice/Ice.h>

#include <iostream>
#include <string>

#include "AtmosphericConditions.h"

using namespace std;

void
printSample(const DataStorm::Sample<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag>& sample)
{
cout << "sample: event=" << sample.getEvent() << " key=" << sample.getKey() << ", value=" << sample.getValue()
<< std::endl;
}

void
printSamples(const vector<DataStorm::Sample<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag>>& samples)
{
for (const auto& sample : samples)
{
printSample(sample);
}
}

int
main(int argc, char* argv[])
{
// CtrlCHandler is a helper class that handles Ctrl+C and similar signals. It must be constructed at the beginning
// of the program, before creating a DataStorm node or starting any thread.
Ice::CtrlCHandler ctrlCHandler;

// Instantiates DataStorm node.
DataStorm::Node node{argc, argv};

// Set Ctrl+C handler to shutdown the node.
ctrlCHandler.setCallback(
[&ctrlCHandler, &node](int)
{
std::cout << "Shutting down..." << std::endl;
node.shutdown();
});

// Instantiates the "atmospheric-conditions" topic.
// The topic uses strings for keys and AtmosphericConditions for values.
DataStorm::Topic<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag> topic{
node,
"atmospheric-conditions"};

// A partial updater for the temperature field.
topic.setUpdater<float>(
ClearSky::UpdateTag::TemperatureUpdated,
[](ClearSky::AtmosphericConditions& t, float temperature) { t.temperature = temperature; });

// A partial updater for the humidity field.
topic.setUpdater<float>(
ClearSky::UpdateTag::HumidityUpdated,
[](ClearSky::AtmosphericConditions& t, float humidity) { t.humidity = humidity; });

// Create an any key reader, that read all the samples published on the topic.
auto reader = DataStorm::makeAnyKeyReader(topic, "atmospheric-conditions-reader");

reader.onSamples(printSamples, printSample);

node.waitForShutdown();
return 0;
}
134 changes: 134 additions & 0 deletions cpp/DataStorm/partialUpdate/Writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) ZeroC, Inc.

#include <DataStorm/DataStorm.h>
#include <Ice/Ice.h>

#include "AtmosphericConditions.h"

#include <iostream>
#include <random>
#include <string>

using namespace std;

int
main(int argc, char* argv[])
{
// CtrlCHandler is a helper class that handles Ctrl+C and similar signals. It must be constructed at the beginning
// of the program, before creating a DataStorm node or starting any thread.
Ice::CtrlCHandler ctrlCHandler;

// Instantiates DataStorm node.
DataStorm::Node node{argc, argv};

// Instantiates the "atmospheric-conditions" topic.
// The topic uses strings for keys and AtmosphericConditions for values.
DataStorm::Topic<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag> topic{
node,
"atmospheric-conditions"};

// A partial updater for the temperature field.
topic.setUpdater<float>(
ClearSky::UpdateTag::TemperatureUpdated,
[](ClearSky::AtmosphericConditions& t, float temperature) { t.temperature = temperature; });

// A partial updater for the humidity field.
topic.setUpdater<float>(
ClearSky::UpdateTag::HumidityUpdated,
[](ClearSky::AtmosphericConditions& t, float humidity) { t.humidity = humidity; });

// Create a single key writer that writes data for the "floor1/studio" key.
auto writer = DataStorm::makeSingleKeyWriter(topic, "floor1/studio", "atmospheric-conditions-writer");

// Wait for a reader to connect
topic.waitForReaders();

// Stop writing samples on Ctrl-C.
auto shutdownPromise = std::promise<void>();
auto shutdownFuture = shutdownPromise.get_future();
ctrlCHandler.setCallback(
[&ctrlCHandler, &shutdownPromise](int)
{
std::cout << "Shutting down..." << std::endl;
shutdownPromise.set_value();
// Reset the callback to nullptr to avoid calling it again.
ctrlCHandler.setCallback(nullptr);
});

// Initialize random number generators.
std::mt19937 gen{std::random_device{}()};
std::uniform_int_distribution<> atmosphericDist{0, 10};

ClearSky::AtmosphericConditions conditions{21.5f, 50.0f};
writer.update(conditions);

// Publish AtmosphericConditions readings.
float maxTemp = 23.0f;
float minTemp = 19.0f;
float maxHumidity = 55.0f;
float minHumidity = 45.0f;

bool humidityIncreasing = true;
bool tempIncreasing = true;

while (true)
{
// Generate random changes where either temperature or humidity changes about 50% of the time.
bool tempChanged = atmosphericDist(gen) > 5;
bool humidityChanged = atmosphericDist(gen) > 5;

if (tempChanged)
{
if (tempIncreasing)
{
conditions.temperature += 0.5f;
}
else
{
conditions.temperature -= 0.5f;
}

tempIncreasing = (tempIncreasing && conditions.temperature < maxTemp) ||
(!tempIncreasing && conditions.temperature <= minTemp);
}

if (humidityChanged)
{
if (humidityIncreasing)
{
conditions.humidity += 1.0f;
}
else
{
conditions.humidity -= 1.0f;
}

humidityIncreasing = (humidityIncreasing && conditions.humidity < maxHumidity) ||
(!humidityIncreasing && conditions.humidity <= minHumidity);
}

// If both temperature and humidity changed, publish a full update. Otherwise, publish a partial update for the
// changed field.
if (tempChanged && humidityChanged)
{
cout << "Publishing full update... " << conditions << endl;
writer.update(conditions);
}
else if (humidityChanged)
{
cout << "Publishing humidity update... " << conditions.humidity << "%" << endl;
writer.partialUpdate<float>(ClearSky::UpdateTag::HumidityUpdated)(conditions.humidity);
}
else if (tempChanged)
{
cout << "Publishing temperature update... " << conditions.temperature << "°C" << endl;
writer.partialUpdate<float>(ClearSky::UpdateTag::TemperatureUpdated)(conditions.temperature);
}

// Wait for half a second or for the shutdown signal.
if (shutdownFuture.wait_for(std::chrono::milliseconds(500)) != std::future_status::timeout)
{
break;
}
}
}
Loading