Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment/cmake/Lue.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ message(STATUS "Build documentation : ${LUE_BUILD_DOCUMENTATION}")
message(STATUS "Build quality assurance : ${LUE_BUILD_QUALITY_ASSURANCE}")
message(STATUS "+ python api : ${LUE_QUALITY_ASSURANCE_WITH_PYTHON_API}")
message(STATUS "+ tests : ${LUE_QUALITY_ASSURANCE_WITH_TESTS}")
message(STATUS "With examples : ${LUE_WITH_EXAMPLES}")
message(STATUS "")
message(STATUS "CMAKE_GENERATOR : ${CMAKE_GENERATOR}")
message(STATUS "GENERATOR_IS_MULTI_CONFIG : ${GENERATOR_IS_MULTI_CONFIG}")
Expand Down
4 changes: 4 additions & 0 deletions environment/cmake/LueConfiguration.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ option(LUE_BUILD_DOCUMENTATION
"Build documentation"
FALSE)

option(LUE_WITH_EXAMPLES
"Include examples"
FALSE)

option(LUE_BUILD_QUALITY_ASSURANCE
"Include support for quality assurance"
FALSE)
Expand Down
4 changes: 4 additions & 0 deletions source/framework/algorithm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,10 @@ lue_install_development_libraries(

# ------------------------------------------------------------------------------

if(LUE_WITH_EXAMPLES)
add_subdirectory(example)
endif()

if(LUE_BUILD_TESTS)
add_subdirectory(test)
endif()
12 changes: 12 additions & 0 deletions source/framework/algorithm/example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
add_executable(lue.framework.algorithm.example.ndvi ndvi.cpp)

target_link_libraries(lue.framework.algorithm.example.ndvi
PRIVATE
lue::framework_local_operation
lue::framework_miscellaneous_operation
)

lue_install_executables(
TARGETS
lue.framework.algorithm.example.ndvi
)
42 changes: 42 additions & 0 deletions source/framework/algorithm/example/ndvi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "lue/framework/algorithm/operator.hpp"
#include "lue/framework/algorithm/value_policies/add.hpp"
#include "lue/framework/algorithm/value_policies/divide.hpp"
#include "lue/framework/algorithm/value_policies/subtract.hpp"
#include "lue/framework/algorithm/value_policies/uniform.hpp"
#include "lue/framework/core/component.hpp"
#include <hpx/hpx_init.hpp>


auto hpx_main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) -> int
{
using namespace lue::value_policies;

using Element = float;
lue::Rank const rank{2};
using Array = lue::PartitionedArray<Element, rank>;
using Shape = lue::ShapeT<Array>;

Shape const array_shape{40000, 40000};
Shape const partition_shape{2000, 2000};
Element const min{0};
Element const max{1};

auto const near_infrared = uniform(array_shape, partition_shape, min, max);
auto const red = uniform(array_shape, partition_shape, min, max);
auto const ndvi = (near_infrared - red) / (near_infrared + red);

int hpx_status = hpx::finalize();

return hpx_status;
}


auto main(int argc, char* argv[]) -> int
{
std::vector<std::string> const cfg{};

hpx::init_params params{};
params.cfg = {cfg};

return hpx::init(argc, argv, params);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#pragma once
#include <hpx/modules/assertion.hpp>
#include <hpx/assert.hpp>


#define lue_hpx_assert(...) HPX_ASSERT(__VA_ARGS__)
173 changes: 173 additions & 0 deletions source/framework/io/include/lue/framework/io/serializer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#pragma once
#include "lue/framework/core/assert.hpp"
#include <hpx/future.hpp>
#include <boost/container/flat_map.hpp>
#include <concepts>


namespace lue {

/*!
@brief Class for maintaining information that can be used to serialize concurrent tasks
@tparam Key Type for objects to group information by. In case of serializing access to a file,
this could be the type used to represent the name of the file, for example.
@tparam Generation Type to represent the generation / order / count
@warning This class is not thread-safe: call its member functions from a single thread
@warning There is no facility yet to allow instances of this class to shrink again

An example use-case for this class is performing parallel I/O to an HDF5 file. When opening the same
file multiple times, the collective open calls must be serialized. An instance of this class can be
used to achieve this:

- On the root locality, each call to the function that will result in the call to H5Open is associated
with a count, starting with 1
- All tasks that will result in the call to H5Open on the localities are passed in the count. Note
that a task for a higher count can potentially be scheduled to run before a task for a lower count.
This is the problem that needs to be prevented.
- As long as an open call associated with a count before the current one has not finished yet, a task
must not try to open this file. This can be achieved by attaching a continuation to the future
associated with the open call that must finish first.

In code:

@code{.cpp}
Serializer<std::string, Count> open_file_serializer{};

void my_task(std::string const& pathname, Count const count)
{
// The promise is related to us / the current count
hpx::promise<void> promise = open_file_serializer.promise_for(pathname, count);

// The future is related to the one before us, with count count - 1
hpx::future<void> predecessor_future = open_file_serializer.when_predecessor_done(
pathname, count);

hpx::future<Dataset> a_future = predecessor_future.then(
[](hpx::future<void> const& future)
{
// Call H5Open
// ...

// This will allow the next in line to call H5Open
promise.set_value();

// Return open dataset?
// ...
}
);

// a_future will become ready once the call to H5Open has finished
}
@endcode
*/
template<std::equality_comparable Key, std::totally_ordered Generation>
class Serializer
{

public:

/*!
@brief Request a promise associated by a future for the @a key and @a generation passed
in
@warning generation must be larger than zero

The promise returned is related to the future which is related to the @a generation passed in.
It can only be obtained once. Calling this function multiple times for the same generation
will result in promises that are in a valid but unspecified state (they are useless).

It is fine if this function is called for future generations first. That is the point of this
class. It allows to serialize code in a context where calls can not easily be serialized.

The caller is responsible for setting the value of the promise (call set_value()
on it). Otherwise none of the tasks associated with a higher generation will ever be
scheduled.
*/
auto promise_for([[maybe_unused]] Key const& key, [[maybe_unused]] Generation const generation)
-> hpx::promise<void>
{
lue_hpx_assert(generation > 0);

// Map will be created if not present already
auto& map{_tuples[key]};

if (!map.contains(generation))
{
// Function to add (promise, future) tuples by generation
auto add_tuple = [&map](Generation const generation) -> void
{
hpx::promise<void> promise{};
hpx::future<void> future{promise.get_future()};
map[generation] = std::make_tuple(std::move(promise), std::move(future));
};

// If the map is empty, we add a first (promise, future) tuple for a first generation (0).
// The promise's value is set immediately so the future is already ready. This way, we can
// return a future in when_predecessor_done for the first generation (1).
if (map.empty())
{
add_tuple(0);
hpx::promise<void>& promise = std::get<0>(map[0]);
lue_hpx_assert(!std::get<1>(map[0]).is_ready());
promise.set_value();
lue_hpx_assert(std::get<1>(map[0]).is_ready());
}

// Add (promise, future) tuples for the current generation passed in and any generations
// missing between the last one added until the current one
for (Generation new_order = map.nth(map.size() - 1)->first + 1; new_order <= generation;
++new_order)
{
add_tuple(new_order);
}
}

lue_hpx_assert(map.contains(generation));
lue_hpx_assert(map.size() > 1);

hpx::promise<void>& promise = std::get<0>(map[generation]);

return std::move(promise);
}


/*!
@brief Return the future associated with the **predecessor** call for the @a key and
@a generation passed in

Attach a continuation to the future returned to serialize access to some resource.

This function can only be called once for a @a key and @a generation. The future returned is
the only one. Subsequent calls will return a future that is in a valid but unspecified state
(it is useless).
*/
auto when_predecessor_done(
[[maybe_unused]] Key const& key, [[maybe_unused]] Generation const generation)
-> hpx::future<void>
{
lue_hpx_assert(_tuples.contains(key));
auto& map{_tuples[key]};
lue_hpx_assert(map.contains(generation));
lue_hpx_assert(map.size() > 1);

hpx::future<void>& future = std::get<1>(map[generation - 1]);
lue_hpx_assert(future.valid());

return std::move(future);
}

private:

using Promise = hpx::promise<void>;

using Future = hpx::future<void>;

using FutureTuple = std::tuple<Promise, Future>;

using TupleByGeneration = boost::container::flat_map<Generation, FutureTuple>;

using TupleByGenerationByKey = std::map<Key, TupleByGeneration>;

TupleByGenerationByKey _tuples;
};

} // namespace lue
1 change: 1 addition & 0 deletions source/framework/io/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ set(scope lue_framework_io)
set(names
gdal
lue
serializer
)

foreach(name ${names})
Expand Down
68 changes: 68 additions & 0 deletions source/framework/io/test/serializer_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#define BOOST_TEST_MODULE lue framework io serializer
#include "lue/framework/io/serializer.hpp"
#include "lue/framework/test/hpx_unit_test.hpp"
#include <hpx/algorithm.hpp>
#include <random>


BOOST_AUTO_TEST_CASE(variable_raster)
{
// Create a number of tasks that need to be run in the order of their task ID, not the order in which they
// are created or some other order.

using Key = std::string;
using TaskID = std::int32_t;

lue::Serializer<Key, TaskID> task_serializer{};

Key const key = "my_tasks";
TaskID const min_task_id = 1;
TaskID const max_task_id = 100;

std::random_device random_device{};
std::mt19937 random_number_engine{random_device()};

// Create collection of randomly ordered task IDs
std::vector<TaskID> input_task_ids(max_task_id);
std::ranges::iota(input_task_ids, min_task_id);
std::ranges::shuffle(input_task_ids, random_number_engine);

// Each task writes its ID into this collection. If all goes well, all task IDs should end up here,
// ordered from min_task_id to max_task_id.
std::vector<TaskID> output_task_ids{};

// Collection of futures to wait on before we can start testing
std::vector<hpx::future<void>> futures{};

for (TaskID task_id : input_task_ids)
{
// This is how we can mark that we are done accessing some resource
hpx::promise<void> promise = task_serializer.promise_for(key, task_id);

// Create a task that will run after the one with task ID equal to task_id - 1 has finished
futures.push_back(task_serializer.when_predecessor_done(key, task_id)
.then(
[&output_task_ids, task_id, promise = std::move(promise)](
hpx::future<void> const& future) mutable -> auto
{
BOOST_REQUIRE(future.valid());
BOOST_REQUIRE(future.is_ready());

// All threads access this same collection, but since these calls are
// serialized, these accesses won't happen at the same time
output_task_ids.push_back(task_id);

// We are done, next in line can do its thing
promise.set_value();
}));
}

hpx::wait_all(futures.begin(), futures.end());

BOOST_REQUIRE(!output_task_ids.empty());
BOOST_CHECK_EQUAL(output_task_ids.size(), max_task_id - min_task_id + 1);
BOOST_CHECK(std::ranges::is_sorted(output_task_ids));
BOOST_CHECK(std::ranges::adjacent_find(output_task_ids) == output_task_ids.end());
BOOST_CHECK_EQUAL(output_task_ids.front(), 1);
BOOST_CHECK_EQUAL(output_task_ids.back(), max_task_id);
}
Loading
Loading