diff --git a/environment/cmake/Lue.cmake b/environment/cmake/Lue.cmake index 2df9900c3..1b1c34305 100644 --- a/environment/cmake/Lue.cmake +++ b/environment/cmake/Lue.cmake @@ -110,6 +110,7 @@ message(STATUS "Build documentation : ${LUE_BUILD_DOCUMENTATION}") message(STATUS "Build quality assurance : ${LUE_BUILD_QUALITY_ASSURANCE}") message(STATUS "+ python api : ${LUE_QUALITY_ASSURANCE_WITH_PYTHON_API}") message(STATUS "+ tests : ${LUE_QUALITY_ASSURANCE_WITH_TESTS}") +message(STATUS "With examples : ${LUE_WITH_EXAMPLES}") message(STATUS "") message(STATUS "CMAKE_GENERATOR : ${CMAKE_GENERATOR}") message(STATUS "GENERATOR_IS_MULTI_CONFIG : ${GENERATOR_IS_MULTI_CONFIG}") diff --git a/environment/cmake/LueConfiguration.cmake b/environment/cmake/LueConfiguration.cmake index 01e3025d5..810490deb 100644 --- a/environment/cmake/LueConfiguration.cmake +++ b/environment/cmake/LueConfiguration.cmake @@ -38,6 +38,10 @@ option(LUE_BUILD_DOCUMENTATION "Build documentation" FALSE) +option(LUE_WITH_EXAMPLES + "Include examples" + FALSE) + option(LUE_BUILD_QUALITY_ASSURANCE "Include support for quality assurance" FALSE) diff --git a/source/framework/algorithm/CMakeLists.txt b/source/framework/algorithm/CMakeLists.txt index 68bb61490..7f54f8c50 100644 --- a/source/framework/algorithm/CMakeLists.txt +++ b/source/framework/algorithm/CMakeLists.txt @@ -1959,6 +1959,10 @@ lue_install_development_libraries( # ------------------------------------------------------------------------------ +if(LUE_WITH_EXAMPLES) + add_subdirectory(example) +endif() + if(LUE_BUILD_TESTS) add_subdirectory(test) endif() diff --git a/source/framework/algorithm/example/CMakeLists.txt b/source/framework/algorithm/example/CMakeLists.txt new file mode 100644 index 000000000..4aa5b31bf --- /dev/null +++ b/source/framework/algorithm/example/CMakeLists.txt @@ -0,0 +1,12 @@ +add_executable(lue.framework.algorithm.example.ndvi ndvi.cpp) + +target_link_libraries(lue.framework.algorithm.example.ndvi + PRIVATE + lue::framework_local_operation + lue::framework_miscellaneous_operation +) + +lue_install_executables( + TARGETS + lue.framework.algorithm.example.ndvi +) diff --git a/source/framework/algorithm/example/ndvi.cpp b/source/framework/algorithm/example/ndvi.cpp new file mode 100644 index 000000000..986d1634b --- /dev/null +++ b/source/framework/algorithm/example/ndvi.cpp @@ -0,0 +1,42 @@ +#include "lue/framework/algorithm/operator.hpp" +#include "lue/framework/algorithm/value_policies/add.hpp" +#include "lue/framework/algorithm/value_policies/divide.hpp" +#include "lue/framework/algorithm/value_policies/subtract.hpp" +#include "lue/framework/algorithm/value_policies/uniform.hpp" +#include "lue/framework/core/component.hpp" +#include + + +auto hpx_main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) -> int +{ + using namespace lue::value_policies; + + using Element = float; + lue::Rank const rank{2}; + using Array = lue::PartitionedArray; + using Shape = lue::ShapeT; + + Shape const array_shape{40000, 40000}; + Shape const partition_shape{2000, 2000}; + Element const min{0}; + Element const max{1}; + + auto const near_infrared = uniform(array_shape, partition_shape, min, max); + auto const red = uniform(array_shape, partition_shape, min, max); + auto const ndvi = (near_infrared - red) / (near_infrared + red); + + int hpx_status = hpx::finalize(); + + return hpx_status; +} + + +auto main(int argc, char* argv[]) -> int +{ + std::vector const cfg{}; + + hpx::init_params params{}; + params.cfg = {cfg}; + + return hpx::init(argc, argv, params); +} diff --git a/source/framework/core/include/lue/framework/core/assert.hpp b/source/framework/core/include/lue/framework/core/assert.hpp index 045c83d4f..a7e91c993 100644 --- a/source/framework/core/include/lue/framework/core/assert.hpp +++ b/source/framework/core/include/lue/framework/core/assert.hpp @@ -1,5 +1,5 @@ #pragma once -#include +#include #define lue_hpx_assert(...) HPX_ASSERT(__VA_ARGS__) diff --git a/source/framework/io/include/lue/framework/io/serializer.hpp b/source/framework/io/include/lue/framework/io/serializer.hpp new file mode 100644 index 000000000..79469265a --- /dev/null +++ b/source/framework/io/include/lue/framework/io/serializer.hpp @@ -0,0 +1,173 @@ +#pragma once +#include "lue/framework/core/assert.hpp" +#include +#include +#include + + +namespace lue { + + /*! + @brief Class for maintaining information that can be used to serialize concurrent tasks + @tparam Key Type for objects to group information by. In case of serializing access to a file, + this could be the type used to represent the name of the file, for example. + @tparam Generation Type to represent the generation / order / count + @warning This class is not thread-safe: call its member functions from a single thread + @warning There is no facility yet to allow instances of this class to shrink again + + An example use-case for this class is performing parallel I/O to an HDF5 file. When opening the same + file multiple times, the collective open calls must be serialized. An instance of this class can be + used to achieve this: + + - On the root locality, each call to the function that will result in the call to H5Open is associated + with a count, starting with 1 + - All tasks that will result in the call to H5Open on the localities are passed in the count. Note + that a task for a higher count can potentially be scheduled to run before a task for a lower count. + This is the problem that needs to be prevented. + - As long as an open call associated with a count before the current one has not finished yet, a task + must not try to open this file. This can be achieved by attaching a continuation to the future + associated with the open call that must finish first. + + In code: + + @code{.cpp} + Serializer open_file_serializer{}; + + void my_task(std::string const& pathname, Count const count) + { + // The promise is related to us / the current count + hpx::promise promise = open_file_serializer.promise_for(pathname, count); + + // The future is related to the one before us, with count count - 1 + hpx::future predecessor_future = open_file_serializer.when_predecessor_done( + pathname, count); + + hpx::future a_future = predecessor_future.then( + [](hpx::future const& future) + { + // Call H5Open + // ... + + // This will allow the next in line to call H5Open + promise.set_value(); + + // Return open dataset? + // ... + } + ); + + // a_future will become ready once the call to H5Open has finished + } + @endcode + */ + template + class Serializer + { + + public: + + /*! + @brief Request a promise associated by a future for the @a key and @a generation passed + in + @warning generation must be larger than zero + + The promise returned is related to the future which is related to the @a generation passed in. + It can only be obtained once. Calling this function multiple times for the same generation + will result in promises that are in a valid but unspecified state (they are useless). + + It is fine if this function is called for future generations first. That is the point of this + class. It allows to serialize code in a context where calls can not easily be serialized. + + The caller is responsible for setting the value of the promise (call set_value() + on it). Otherwise none of the tasks associated with a higher generation will ever be + scheduled. + */ + auto promise_for([[maybe_unused]] Key const& key, [[maybe_unused]] Generation const generation) + -> hpx::promise + { + lue_hpx_assert(generation > 0); + + // Map will be created if not present already + auto& map{_tuples[key]}; + + if (!map.contains(generation)) + { + // Function to add (promise, future) tuples by generation + auto add_tuple = [&map](Generation const generation) -> void + { + hpx::promise promise{}; + hpx::future future{promise.get_future()}; + map[generation] = std::make_tuple(std::move(promise), std::move(future)); + }; + + // If the map is empty, we add a first (promise, future) tuple for a first generation (0). + // The promise's value is set immediately so the future is already ready. This way, we can + // return a future in when_predecessor_done for the first generation (1). + if (map.empty()) + { + add_tuple(0); + hpx::promise& promise = std::get<0>(map[0]); + lue_hpx_assert(!std::get<1>(map[0]).is_ready()); + promise.set_value(); + lue_hpx_assert(std::get<1>(map[0]).is_ready()); + } + + // Add (promise, future) tuples for the current generation passed in and any generations + // missing between the last one added until the current one + for (Generation new_order = map.nth(map.size() - 1)->first + 1; new_order <= generation; + ++new_order) + { + add_tuple(new_order); + } + } + + lue_hpx_assert(map.contains(generation)); + lue_hpx_assert(map.size() > 1); + + hpx::promise& promise = std::get<0>(map[generation]); + + return std::move(promise); + } + + + /*! + @brief Return the future associated with the **predecessor** call for the @a key and + @a generation passed in + + Attach a continuation to the future returned to serialize access to some resource. + + This function can only be called once for a @a key and @a generation. The future returned is + the only one. Subsequent calls will return a future that is in a valid but unspecified state + (it is useless). + */ + auto when_predecessor_done( + [[maybe_unused]] Key const& key, [[maybe_unused]] Generation const generation) + -> hpx::future + { + lue_hpx_assert(_tuples.contains(key)); + auto& map{_tuples[key]}; + lue_hpx_assert(map.contains(generation)); + lue_hpx_assert(map.size() > 1); + + hpx::future& future = std::get<1>(map[generation - 1]); + lue_hpx_assert(future.valid()); + + return std::move(future); + } + + private: + + using Promise = hpx::promise; + + using Future = hpx::future; + + using FutureTuple = std::tuple; + + using TupleByGeneration = boost::container::flat_map; + + using TupleByGenerationByKey = std::map; + + TupleByGenerationByKey _tuples; + }; + +} // namespace lue diff --git a/source/framework/io/test/CMakeLists.txt b/source/framework/io/test/CMakeLists.txt index ab2346b67..dc1bfada2 100644 --- a/source/framework/io/test/CMakeLists.txt +++ b/source/framework/io/test/CMakeLists.txt @@ -2,6 +2,7 @@ set(scope lue_framework_io) set(names gdal lue + serializer ) foreach(name ${names}) diff --git a/source/framework/io/test/serializer_test.cpp b/source/framework/io/test/serializer_test.cpp new file mode 100644 index 000000000..941aaa272 --- /dev/null +++ b/source/framework/io/test/serializer_test.cpp @@ -0,0 +1,68 @@ +#define BOOST_TEST_MODULE lue framework io serializer +#include "lue/framework/io/serializer.hpp" +#include "lue/framework/test/hpx_unit_test.hpp" +#include +#include + + +BOOST_AUTO_TEST_CASE(variable_raster) +{ + // Create a number of tasks that need to be run in the order of their task ID, not the order in which they + // are created or some other order. + + using Key = std::string; + using TaskID = std::int32_t; + + lue::Serializer task_serializer{}; + + Key const key = "my_tasks"; + TaskID const min_task_id = 1; + TaskID const max_task_id = 100; + + std::random_device random_device{}; + std::mt19937 random_number_engine{random_device()}; + + // Create collection of randomly ordered task IDs + std::vector input_task_ids(max_task_id); + std::ranges::iota(input_task_ids, min_task_id); + std::ranges::shuffle(input_task_ids, random_number_engine); + + // Each task writes its ID into this collection. If all goes well, all task IDs should end up here, + // ordered from min_task_id to max_task_id. + std::vector output_task_ids{}; + + // Collection of futures to wait on before we can start testing + std::vector> futures{}; + + for (TaskID task_id : input_task_ids) + { + // This is how we can mark that we are done accessing some resource + hpx::promise promise = task_serializer.promise_for(key, task_id); + + // Create a task that will run after the one with task ID equal to task_id - 1 has finished + futures.push_back(task_serializer.when_predecessor_done(key, task_id) + .then( + [&output_task_ids, task_id, promise = std::move(promise)]( + hpx::future const& future) mutable -> auto + { + BOOST_REQUIRE(future.valid()); + BOOST_REQUIRE(future.is_ready()); + + // All threads access this same collection, but since these calls are + // serialized, these accesses won't happen at the same time + output_task_ids.push_back(task_id); + + // We are done, next in line can do its thing + promise.set_value(); + })); + } + + hpx::wait_all(futures.begin(), futures.end()); + + BOOST_REQUIRE(!output_task_ids.empty()); + BOOST_CHECK_EQUAL(output_task_ids.size(), max_task_id - min_task_id + 1); + BOOST_CHECK(std::ranges::is_sorted(output_task_ids)); + BOOST_CHECK(std::ranges::adjacent_find(output_task_ids) == output_task_ids.end()); + BOOST_CHECK_EQUAL(output_task_ids.front(), 1); + BOOST_CHECK_EQUAL(output_task_ids.back(), max_task_id); +} diff --git a/source/framework/python/source/algorithm/miscellaneous_operation/uniform-2.cpp b/source/framework/python/source/algorithm/miscellaneous_operation/uniform-2.cpp index 13b2a981b..dde8ec495 100644 --- a/source/framework/python/source/algorithm/miscellaneous_operation/uniform-2.cpp +++ b/source/framework/python/source/algorithm/miscellaneous_operation/uniform-2.cpp @@ -12,7 +12,21 @@ using namespace pybind11::literals; namespace lue::framework { namespace { - // Step 3: Call the algorithm + // Step 4: Release the GIL and call the algorithm + template + auto uniform( + StaticShape const& array_shape, + StaticShape const& partition_shape, + Element const& min_value, + Element const& max_value) -> PartitionedArray + { + pybind11::gil_scoped_release release{}; + + return value_policies::uniform(array_shape, partition_shape, min_value, max_value); + } + + + // Step 3: Convert between Python and C++ types template auto uniform( StaticShape const& array_shape, @@ -24,12 +38,11 @@ namespace lue::framework { if constexpr (arithmetic_element_supported) { - result = pybind11::cast( - value_policies::uniform( - array_shape, - partition_shape, - pybind11::cast(min_value), - pybind11::cast(max_value))); + result = pybind11::cast(value_policies::uniform( + array_shape, + partition_shape, + pybind11::cast(min_value), + pybind11::cast(max_value))); } return result; @@ -143,11 +156,10 @@ namespace lue::framework { if (dynamic_array_shape.size() != dynamic_partition_shape.size()) { - throw std::runtime_error( - std::format( - "Rank of array shape and partition shape must be equal ({} != {})", - dynamic_array_shape.size(), - dynamic_partition_shape.size())); + throw std::runtime_error(std::format( + "Rank of array shape and partition shape must be equal ({} != {})", + dynamic_array_shape.size(), + dynamic_partition_shape.size())); } } diff --git a/source/framework/python/source/command_line.cpp b/source/framework/python/source/command_line.cpp index cf8561a62..3e6597ca7 100644 --- a/source/framework/python/source/command_line.cpp +++ b/source/framework/python/source/command_line.cpp @@ -7,24 +7,23 @@ namespace lue { CommandLine::CommandLine() { - // The goal here is (only) to set _argc/_argv for the - // HPX runtime to use - - pybind11::gil_scoped_acquire acquire; - - pybind11::object sys{pybind11::module_::import("sys")}; - pybind11::list argv_py{sys.attr("argv")}; + // The goal here is (only) to set _argc/_argv for the HPX runtime to use + { + pybind11::gil_scoped_acquire acquire{}; + pybind11::object sys{pybind11::module_::import("sys")}; + pybind11::list argv_py{sys.attr("argv")}; - _argc = static_cast(argv_py.size()); + _argc = static_cast(argv_py.size()); - _argument_strings.resize(_argc); - _argument_pointers.resize(_argument_strings.size() + 1); + _argument_strings.resize(_argc); + _argument_pointers.resize(_argument_strings.size() + 1); - for (int i = 0; i < _argc; ++i) - { - pybind11::str arg_py{argv_py[i]}; - _argument_strings[i] = arg_py; - _argument_pointers[i] = _argument_strings[i].data(); + for (int i = 0; i < _argc; ++i) + { + pybind11::str arg_py{argv_py[i]}; + _argument_strings[i] = arg_py; + _argument_pointers[i] = _argument_strings[i].data(); + } } lue_assert(_argument_pointers.size() == static_cast(_argc + 1)); diff --git a/source/framework/python/source/command_line.hpp b/source/framework/python/source/command_line.hpp index f5ca222a4..ffbb9dd39 100644 --- a/source/framework/python/source/command_line.hpp +++ b/source/framework/python/source/command_line.hpp @@ -1,7 +1,4 @@ #pragma once -#include "lue/py/configure.hpp" -#include -#include #include #include diff --git a/source/framework/python/source/hpx_runtime.cpp b/source/framework/python/source/hpx_runtime.cpp index 2d5758717..e0b9a212b 100644 --- a/source/framework/python/source/hpx_runtime.cpp +++ b/source/framework/python/source/hpx_runtime.cpp @@ -1,5 +1,4 @@ #include "hpx_runtime.hpp" -#include "ostream.hpp" #include #include @@ -18,8 +17,7 @@ namespace lue { hpx::init_params params{}; params.cfg = _configuration; - params.mode = hpx::runtime_mode::default_; - // params.mode = hpx::runtime_mode::console; + params.mode = hpx::runtime_mode::console; if (!hpx::start(start_function, _command_line.argc(), _command_line.argv(), params)) { @@ -55,7 +53,7 @@ namespace lue { // Main HPX thread, does nothing but wait for the application to exit - int HPXRuntime::hpx_main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) + auto HPXRuntime::hpx_main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) -> int { // Store a pointer to the runtime here. _runtime = hpx::get_runtime_ptr(); @@ -68,14 +66,6 @@ namespace lue { _startup_condition_variable.notify_one(); - // Redirect all console output to Python's stdout - std::unique_ptr stream; - - { - pybind11::gil_scoped_acquire acquire; - stream = std::unique_ptr{}; - } - // Now, wait for destructor to be called. { std::unique_lock lock(_mutex); @@ -85,11 +75,6 @@ namespace lue { } } - { - pybind11::gil_scoped_acquire acquire; - stream.reset(); - } - // Tell the runtime it's OK to exit return hpx::finalize(); } diff --git a/source/framework/python/source/hpx_runtime.hpp b/source/framework/python/source/hpx_runtime.hpp index 22303d233..e165299be 100644 --- a/source/framework/python/source/hpx_runtime.hpp +++ b/source/framework/python/source/hpx_runtime.hpp @@ -13,11 +13,19 @@ namespace lue { HPXRuntime(std::vector const& configuration); + HPXRuntime(HPXRuntime const& other) = delete; + + HPXRuntime(HPXRuntime&& other) = delete; + ~HPXRuntime(); + auto operator=(HPXRuntime const& other) -> HPXRuntime& = delete; + + auto operator=(HPXRuntime&& other) -> HPXRuntime& = delete; + protected: - int hpx_main(int argc, char* argv[]); + auto hpx_main(int argc, char* argv[]) -> int; private: diff --git a/source/framework/python/source/ostream.hpp b/source/framework/python/source/ostream.hpp deleted file mode 100644 index 9fffec6cf..000000000 --- a/source/framework/python/source/ostream.hpp +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (c) 2016-2018 Hartmut Kaiser -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#if !defined(PHYLANX_BINDING_HELPERS_OSTREAM_HPP) -#define PHYLANX_BINDING_HELPERS_OSTREAM_HPP - -// #include - -#include - -#include -#include -#include - -namespace phylanx { - namespace bindings { - class pythonbuf: public std::streambuf - { - private: - - using traits_type = std::streambuf::traits_type; - - char d_buffer[1024]; - pybind11::object pywrite; - pybind11::object pyflush; - - int overflow(int c) override - { - if (!traits_type::eq_int_type(c, traits_type::eof())) - { - *pptr() = traits_type::to_char_type(c); - pbump(1); - } - return sync() ? traits_type::not_eof(c) : traits_type::eof(); - } - - int sync() override - { - if (pbase() != pptr()) - { - { - // acquire GIL to avoid multi-threading problems - pybind11::gil_scoped_acquire acquire; - - // This subtraction cannot be negative, so dropping the sign - pybind11::str line(pbase(), static_cast(pptr() - pbase())); - - pywrite(line); - pyflush(); - } - - setp(pbase(), epptr()); - } - return 0; - } - - public: - - pythonbuf(pybind11::object pyostream) - { - { - // acquire GIL to avoid multi-threading problems - pybind11::gil_scoped_acquire acquire; - pybind11::object tmp = std::move(pyostream); - pywrite = tmp.attr("write"); - pyflush = tmp.attr("flush"); - } - - setp(d_buffer, d_buffer + sizeof(d_buffer) - 1); - } - - // Sync before destroy - ~pythonbuf() - { - { - pybind11::gil_scoped_release release; - sync(); - } - - pywrite.release(); - pyflush.release(); - } - }; - - class scoped_ostream_redirect - { - protected: - - std::streambuf* old; - std::ostream& costream; - pythonbuf buffer; - - static pybind11::object import_stdout() - { - pybind11::gil_scoped_acquire acquire; - return pybind11::module::import("sys").attr("stdout"); - } - - public: - - scoped_ostream_redirect( - std::ostream& costream = std::cout, pybind11::object pyostream = import_stdout()): - old(nullptr), - costream(costream), - buffer(std::move(pyostream)) - { - old = costream.rdbuf(&buffer); - } - - ~scoped_ostream_redirect() - { - costream.rdbuf(old); - } - - scoped_ostream_redirect(const scoped_ostream_redirect&) = delete; - scoped_ostream_redirect(scoped_ostream_redirect&&) = delete; - - scoped_ostream_redirect& operator=(const scoped_ostream_redirect&) = delete; - scoped_ostream_redirect& operator=(scoped_ostream_redirect&&) = delete; - }; - } // namespace bindings -} // namespace phylanx - -#endif diff --git a/source/framework/python/source/submodule.cpp b/source/framework/python/source/submodule.cpp index 2d14e6dfb..559c3d6c9 100644 --- a/source/framework/python/source/submodule.cpp +++ b/source/framework/python/source/submodule.cpp @@ -7,6 +7,8 @@ #include #include +#include + namespace lue::framework { namespace { @@ -16,11 +18,10 @@ namespace lue::framework { void start_hpx_runtime(std::vector const& configuration) { - // Iff the pointer to the runtime is not pointing to an instance, - // instantiate one. This will start the HPX runtime. + // Iff the pointer to the runtime is not pointing to an instance, instantiate one. This will + // start the HPX runtime. if (runtime == nullptr) { - pybind11::gil_scoped_release release; runtime = new HPXRuntime{configuration}; } } @@ -34,7 +35,6 @@ namespace lue::framework { { HPXRuntime* r = runtime; runtime = nullptr; - pybind11::gil_scoped_release release; delete r; } } @@ -108,11 +108,12 @@ namespace lue::framework { gdal::register_gdal_drivers(); - submodule.def("start_hpx_runtime", &start_hpx_runtime); - - submodule.def("stop_hpx_runtime", &stop_hpx_runtime); - - submodule.def("on_root_locality", &on_root_locality); + submodule.def( + "start_hpx_runtime", &start_hpx_runtime, pybind11::call_guard()); + submodule.def( + "stop_hpx_runtime", &stop_hpx_runtime, pybind11::call_guard()); + submodule.def( + "on_root_locality", &on_root_locality, pybind11::call_guard()); bind_hpx(submodule);