Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
21 changes: 21 additions & 0 deletions cpp/DataStorm/partialUpdate/AtmosphericConditions.ice
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) ZeroC, Inc.

module ClearSky
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

enum UpdateTag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have doc comments on this enum.

{
TemperatureUpdated,
HumidityUpdated
}

/// Represents the atmospheric conditions measured by a sensor.
struct AtmosphericConditions
{
/// The temperature in degrees Celsius.
float temperature;

/// The humidity in percent.
float humidity;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
cmake_minimum_required(VERSION 3.21)

project(datastorm_stock CXX)
project(datastorm_sampleFilter CXX)

include(../../cmake/common.cmake)

# This demo requires C++20
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_executable(reader Reader.cpp Stock.ice)
add_executable(reader Reader.cpp AtmosphericConditions.ice)
slice2cpp_generate(reader)
target_link_libraries(reader PRIVATE Ice::Ice Ice::DataStorm)
if(WIN32)
Expand All @@ -20,7 +16,7 @@ if(WIN32)
)
endif()

add_executable(writer Writer.cpp Stock.ice)
add_executable(writer Writer.cpp AtmosphericConditions.ice)
slice2cpp_generate(writer)
target_link_libraries(writer PRIVATE Ice::Ice Ice::DataStorm)
if(WIN32)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# DataStorm Stock
# DataStorm partialUpdate

This demo illustrates the use of partial updates. The writer adds stocks to the
topic and sends partial updates to update the stock price or volume. The reader
prints out the stock information and partial updates.
This demo illustrates the use of **partial updates** in DataStorm.

The demo uses Slice to define the `Demo::Stock` class in the `Stock.ice` file.
The writer publishes atmospheric conditions changes, when both temperature and humidity changes it publishes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The writer publishes atmospheric conditions changes, when both temperature and humidity changes it publishes
The writer publishes atmospheric conditions changes. When both temperature and humidity changes it publishes

a full update, when only one field changes it publishes a partial update for the given field.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
a full update, when only one field changes it publishes a partial update for the given field.
a full update, and when only one field changes it publishes a partial update for the given field.


The reader and writer register updaters with the topic to process partial update samples.

## Ice prerequisites

Expand Down Expand Up @@ -51,7 +52,4 @@ In a separate window, start the reader:
build\reader
```

You can start multiple writers and readers to publish or follow different
stocks.

[Ice for C++ installation]: https://github.com/zeroc-ice/ice/blob/main/NIGHTLY.md#ice-for-c
70 changes: 70 additions & 0 deletions cpp/DataStorm/partialUpdate/Reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) ZeroC, Inc.

#include <DataStorm/DataStorm.h>
#include <Ice/Ice.h>

#include <iostream>
#include <string>

#include "AtmosphericConditions.h"

using namespace std;

void
printSample(const DataStorm::Sample<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag>& sample)
{
cout << "sample: event=" << sample.getEvent() << " key=" << sample.getKey() << ", value=" << sample.getValue()
<< std::endl;
}

void
printSamples(const vector<DataStorm::Sample<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag>>& samples)
{
for (const auto& sample : samples)
{
printSample(sample);
}
}

int
main(int argc, char* argv[])
{
// CtrlCHandler is a helper class that handles Ctrl+C and similar signals. It must be constructed at the beginning
// of the program, before creating a DataStorm node or starting any thread.
Ice::CtrlCHandler ctrlCHandler;

// Instantiates DataStorm node.
DataStorm::Node node{argc, argv};

// Set Ctrl+C handler to shutdown the node.
ctrlCHandler.setCallback(
[&ctrlCHandler, &node](int)
{
std::cout << "Shutting down..." << std::endl;
node.shutdown();
});

// Instantiates the "atmospheric-conditions" topic.
// The topic uses strings for keys and AtmosphericConditions for values.
DataStorm::Topic<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag> topic{
node,
"atmospheric-conditions"};

// A partial updater for the temperature field.
topic.setUpdater<float>(
ClearSky::UpdateTag::TemperatureUpdated,
[](ClearSky::AtmosphericConditions& t, float temperature) { t.temperature = temperature; });

// A partial updater for the humidity field.
topic.setUpdater<float>(
ClearSky::UpdateTag::HumidityUpdated,
[](ClearSky::AtmosphericConditions& t, float humidity) { t.humidity = humidity; });

// Create an any key reader, that read all the samples published on the topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create an any key reader, that read all the samples published on the topic.
// Create an any key reader, that reads all the samples published on the topic.

auto reader = DataStorm::makeAnyKeyReader(topic, "atmospheric-conditions-reader");

reader.onSamples(printSamples, printSample);

node.waitForShutdown();
return 0;
}
134 changes: 134 additions & 0 deletions cpp/DataStorm/partialUpdate/Writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) ZeroC, Inc.

#include <DataStorm/DataStorm.h>
#include <Ice/Ice.h>

#include "AtmosphericConditions.h"

#include <iostream>
#include <random>
#include <string>

using namespace std;

int
main(int argc, char* argv[])
{
// CtrlCHandler is a helper class that handles Ctrl+C and similar signals. It must be constructed at the beginning
// of the program, before creating a DataStorm node or starting any thread.
Ice::CtrlCHandler ctrlCHandler;

// Instantiates DataStorm node.
DataStorm::Node node{argc, argv};

// Instantiates the "atmospheric-conditions" topic.
// The topic uses strings for keys and AtmosphericConditions for values.
DataStorm::Topic<string, ClearSky::AtmosphericConditions, ClearSky::UpdateTag> topic{
node,
"atmospheric-conditions"};

// A partial updater for the temperature field.
topic.setUpdater<float>(
ClearSky::UpdateTag::TemperatureUpdated,
[](ClearSky::AtmosphericConditions& t, float temperature) { t.temperature = temperature; });

// A partial updater for the humidity field.
topic.setUpdater<float>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we, for example, set this float to a string and make the lambda no-op, or maybe convert the string to a float. Does writer.partialUpdate throw?

ClearSky::UpdateTag::HumidityUpdated,
[](ClearSky::AtmosphericConditions& t, float humidity) { t.humidity = humidity; });

// Create a single key writer that writes data for the "floor1/studio" key.
auto writer = DataStorm::makeSingleKeyWriter(topic, "floor1/studio", "atmospheric-conditions-writer");

// Wait for a reader to connect
topic.waitForReaders();

// Stop writing samples on Ctrl-C.
auto shutdownPromise = std::promise<void>();
auto shutdownFuture = shutdownPromise.get_future();
ctrlCHandler.setCallback(
[&ctrlCHandler, &shutdownPromise](int)
{
std::cout << "Shutting down..." << std::endl;
shutdownPromise.set_value();
// Reset the callback to nullptr to avoid calling it again.
ctrlCHandler.setCallback(nullptr);
});

// Initialize random number generators.
std::mt19937 gen{std::random_device{}()};
std::uniform_int_distribution<> atmosphericDist{0, 10};

ClearSky::AtmosphericConditions conditions{21.5f, 50.0f};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should comment these two lines as thy interact with datastorm. Are these some kind of initial conditions? This is a full update?

writer.update(conditions);

// Publish AtmosphericConditions readings.
float maxTemp = 23.0f;
float minTemp = 19.0f;
float maxHumidity = 55.0f;
float minHumidity = 45.0f;

bool humidityIncreasing = true;
bool tempIncreasing = true;

while (true)
{
// Generate random changes where either temperature or humidity changes about 50% of the time.
bool tempChanged = atmosphericDist(gen) > 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just generate a random temperate/humidity fluctuations between something like -1.0 and +1.0.

Then all we need to check is that it's not too low or high.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of changed is that we use partial updates when only one of the conditions change.

bool humidityChanged = atmosphericDist(gen) > 5;

if (tempChanged)
{
if (tempIncreasing)
{
conditions.temperature += 0.5f;
}
else
{
conditions.temperature -= 0.5f;
}

tempIncreasing = (tempIncreasing && conditions.temperature < maxTemp) ||
(!tempIncreasing && conditions.temperature <= minTemp);
}

if (humidityChanged)
{
if (humidityIncreasing)
{
conditions.humidity += 1.0f;
}
else
{
conditions.humidity -= 1.0f;
}

humidityIncreasing = (humidityIncreasing && conditions.humidity < maxHumidity) ||
(!humidityIncreasing && conditions.humidity <= minHumidity);
}

// If both temperature and humidity changed, publish a full update. Otherwise, publish a partial update for the
// changed field.
if (tempChanged && humidityChanged)
{
cout << "Publishing full update... " << conditions << endl;
writer.update(conditions);
}
else if (humidityChanged)
{
cout << "Publishing humidity update... " << conditions.humidity << "%" << endl;
writer.partialUpdate<float>(ClearSky::UpdateTag::HumidityUpdated)(conditions.humidity);
}
else if (tempChanged)
{
cout << "Publishing temperature update... " << conditions.temperature << "°C" << endl;
writer.partialUpdate<float>(ClearSky::UpdateTag::TemperatureUpdated)(conditions.temperature);
}

// Wait for half a second or for the shutdown signal.
if (shutdownFuture.wait_for(std::chrono::milliseconds(500)) != std::future_status::timeout)
{
break;
}
}
}
Loading