Skip to content

Commit 1f89b02

Browse files
TaskMemoryManager
1 parent ea314d0 commit 1f89b02

File tree

4 files changed

+151
-117
lines changed

4 files changed

+151
-117
lines changed

docs/memory/MemoryAllocator.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# MemoryAllocator
2+
3+
`MemoryAllocator` is...FIXME

docs/memory/TaskMemoryManager.md

Lines changed: 145 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
`TaskMemoryManager` manages the memory allocated to a [single task](#taskAttemptId) (using [MemoryManager](#memoryManager)).
44

5+
`TaskMemoryManager` assumes that:
6+
7+
1. <span id="PAGE_NUMBER_BITS"> The number of bits to address pages is `13`
8+
1. <span id="OFFSET_BITS"> The number of bits to encode offsets in pages is `51` (64 bits - [13 bits](#PAGE_NUMBER_BITS))
9+
1. <span id="PAGE_TABLE_SIZE"> Number of pages in the [page table](#pageTable) and [to be allocated](#allocatedPages) is `8192` (`1 <<` [13](#PAGE_NUMBER_BITS))
10+
1. <span id="MAXIMUM_PAGE_SIZE_BYTES"> The maximum page size is `15GB` (`((1L << 31) - 1) * 8L`)
11+
512
## Creating Instance
613

714
`TaskMemoryManager` takes the following to be created:
@@ -28,6 +35,21 @@
2835
* [freePage](#freePage)
2936
* [getMemoryConsumptionForThisTask](#getMemoryConsumptionForThisTask)
3037

38+
## <span id="pageTable"> Page Table (MemoryBlocks)
39+
40+
`TaskMemoryManager` uses an array of `MemoryBlock`s (to mimic an operating system's page table).
41+
42+
The page table uses 13 bits for addressing pages.
43+
44+
A page is "stored" in [allocatePage](#allocatePage) and "removed" in [freePage](#freePage).
45+
46+
All pages are released (_removed_) in [cleanUpAllAllocatedMemory](#cleanUpAllAllocatedMemory).
47+
48+
`TaskMemoryManager` uses the page table when requested to:
49+
50+
* [getPage](#getPage)
51+
* [getOffsetInPage](#getOffsetInPage)
52+
3153
## <span id="consumers"> Spillable Memory Consumers
3254

3355
```java
@@ -42,6 +64,44 @@ HashSet<MemoryConsumer> consumers
4264

4365
Memory consumers are used to report memory usage when `TaskMemoryManager` is requested to [show memory usage](#showMemoryUsage).
4466

67+
## <span id="acquiredButNotUsed"> Memory Acquired But Not Used
68+
69+
`TaskMemoryManager` tracks the size of memory [allocated](#allocatePage) but not used (by any of the [MemoryConsumer](#consumers)s due to a `OutOfMemoryError` upon trying to use it).
70+
71+
`TaskMemoryManager` releases the memory when [cleaning up all the allocated memory](#cleanUpAllAllocatedMemory).
72+
73+
## <span id="allocatedPages"> Allocated Pages
74+
75+
```java
76+
BitSet allocatedPages
77+
```
78+
79+
`TaskMemoryManager` uses a `BitSet` ([Java]({{ java.api }}/java.base.java/util/BitSet.html)) to track [allocated pages](#allocatePage).
80+
81+
The size is exactly the number of entries in the [page table](#pageTable) ([8192](#PAGE_TABLE_SIZE)).
82+
83+
## <span id="tungstenMemoryMode"><span id="getTungstenMemoryMode"> MemoryMode
84+
85+
`TaskMemoryManager` can be in `ON_HEAP` or `OFF_HEAP` mode (to avoid extra work for off-heap and hoping that the JIT handles branching well).
86+
87+
`TaskMemoryManager` is given the `MemoryMode` matching the [MemoryMode](MemoryManager.md#tungstenMemoryMode) (of the given [MemoryManager](#memoryManager)) when [created](#creating-instance).
88+
89+
`TaskMemoryManager` uses the `MemoryMode` to match to for the following:
90+
91+
* [allocatePage](#allocatePage)
92+
* [cleanUpAllAllocatedMemory](#cleanUpAllAllocatedMemory)
93+
94+
For `OFF_HEAP` mode, `TaskMemoryManager` has to change offset while [encodePageNumberAndOffset](#encodePageNumberAndOffset) and [getOffsetInPage](#getOffsetInPage).
95+
96+
For `OFF_HEAP` mode, `TaskMemoryManager` returns no [page](#getPage).
97+
98+
The `MemoryMode` is used when:
99+
100+
* `ShuffleExternalSorter` is [created](../shuffle/ShuffleExternalSorter.md)
101+
* `BytesToBytesMap` is [created](BytesToBytesMap.md)
102+
* `UnsafeExternalSorter` is [created](UnsafeExternalSorter.md)
103+
* `Spillable` is requested to [spill](../shuffle/Spillable.md#spill) (only when in `ON_HEAP` mode)
104+
45105
## <span id="acquireExecutionMemory"> Acquiring Execution Memory
46106

47107
```java
@@ -162,181 +222,151 @@ Acquired by [consumer]: [memUsage]
162222

163223
* `MemoryConsumer` is requested to [throw an OutOfMemoryError](MemoryConsumer.md#throwOom)
164224

165-
## Logging
166-
167-
Enable `ALL` logging level for `org.apache.spark.memory.TaskMemoryManager` logger to see what happens inside.
168-
169-
Add the following line to `conf/log4j.properties`:
225+
## <span id="cleanUpAllAllocatedMemory"> Cleaning Up All Allocated Memory
170226

171-
```text
172-
log4j.logger.org.apache.spark.memory.TaskMemoryManager=ALL
227+
```java
228+
long cleanUpAllAllocatedMemory()
173229
```
174230

175-
Refer to [Logging](../spark-logging.md).
231+
The `consumers` collection is then cleared.
176232

177-
## Review Me
233+
`cleanUpAllAllocatedMemory` finds all the registered [MemoryConsumer](MemoryConsumer.md)s (in the [consumers](#consumers) registry) that still keep [some memory used](MemoryConsumer.md#getUsed) and, for every such consumer, prints out the following DEBUG message to the logs:
178234

179-
TaskMemoryManager assumes that:
235+
```text
236+
unreleased [getUsed] memory from [consumer]
237+
```
180238

181-
* The number of bits to address pages (aka `PAGE_NUMBER_BITS`) is `13`
182-
* The number of bits to encode offsets in data pages (aka `OFFSET_BITS`) is `51` (i.e. 64 bits - `PAGE_NUMBER_BITS`)
183-
* The number of entries in the <<pageTable, page table>> and <<allocatedPages, allocated pages>> (aka `PAGE_TABLE_SIZE`) is `8192` (i.e. 1 << `PAGE_NUMBER_BITS`)
184-
* The maximum page size (aka `MAXIMUM_PAGE_SIZE_BYTES`) is `15GB` (i.e. `((1L << 31) - 1) * 8L`)
239+
`cleanUpAllAllocatedMemory` removes all the [consumers](#consumers).
185240

186-
== [[cleanUpAllAllocatedMemory]] Cleaning Up All Allocated Memory
241+
---
187242

188-
[source, java]
189-
----
190-
long cleanUpAllAllocatedMemory()
191-
----
243+
For every `MemoryBlock` in the [pageTable](#pageTable), `cleanUpAllAllocatedMemory` prints out the following DEBUG message to the logs:
192244

193-
`cleanUpAllAllocatedMemory` clears <<pageTable, page table>>.
245+
```text
246+
unreleased page: [page] in task [taskAttemptId]
247+
```
194248

195-
CAUTION: FIXME
249+
`cleanUpAllAllocatedMemory` marks the pages to be freed (`FREED_IN_TMM_PAGE_NUMBER`) and requests the [MemoryManager](#memoryManager) for the [tungstenMemoryAllocator](MemoryManager.md#tungstenMemoryAllocator) to [free up the MemoryBlock](MemoryAllocator.md#free).
196250

197-
All recorded <<consumers, consumers>> are queried for the size of used memory. If the memory used is greater than 0, the following WARN message is printed out to the logs:
251+
`cleanUpAllAllocatedMemory` clears the [pageTable](#pageTable) registry (by assigning `null` values).
198252

199-
```
200-
WARN TaskMemoryManager: leak [bytes] memory from [consumer]
201-
```
253+
---
202254

203-
The `consumers` collection is then cleared.
255+
`cleanUpAllAllocatedMemory` requests the [MemoryManager](#memoryManager) to [release execution memory](MemoryManager.md#releaseExecutionMemory) that is not used by any consumer (with the [acquiredButNotUsed](#acquiredButNotUsed) and the [tungstenMemoryMode](#tungstenMemoryMode)).
204256

205-
MemoryManager.md#releaseExecutionMemory[MemoryManager.releaseExecutionMemory] is executed to release the memory that is not used by any consumer.
257+
In the end, `cleanUpAllAllocatedMemory` requests the [MemoryManager](#memoryManager) to [release all execution memory for the task](MemoryManager.md#releaseAllExecutionMemoryForTask).
206258

207-
Before `cleanUpAllAllocatedMemory` returns, it calls MemoryManager.md#releaseAllExecutionMemoryForTask[MemoryManager.releaseAllExecutionMemoryForTask] that in turn becomes the return value.
259+
---
208260

209-
CAUTION: FIXME Image with the interactions to `MemoryManager`.
261+
`cleanUpAllAllocatedMemory` is used when:
210262

211-
NOTE: `cleanUpAllAllocatedMemory` is used exclusively when `TaskRunner` is requested to executor:TaskRunner.md#run[run] (and cleans up after itself).
263+
* `TaskRunner` is requested to [run a task](../executor/TaskRunner.md#run) (and the task has finished successfully)
212264

213-
== [[allocatePage]] Allocating Memory Block for Tungsten Consumers
265+
## <span id="allocatePage"> Allocating Memory Page
214266

215-
[source, java]
216-
----
267+
```java
217268
MemoryBlock allocatePage(
218269
long size,
219270
MemoryConsumer consumer)
220-
----
221-
222-
NOTE: It only handles *Tungsten Consumers*, i.e. MemoryConsumer.md[MemoryConsumers] in `tungstenMemoryMode` mode.
223-
224-
`allocatePage` allocates a block of memory (aka _page_) smaller than `MAXIMUM_PAGE_SIZE_BYTES` maximum size.
225-
226-
It checks `size` against the internal `MAXIMUM_PAGE_SIZE_BYTES` maximum size. If it is greater than the maximum size, the following `IllegalArgumentException` is thrown:
227-
228-
```
229-
Cannot allocate a page with more than [MAXIMUM_PAGE_SIZE_BYTES] bytes
230271
```
231272

232-
It then <<acquireExecutionMemory, acquires execution memory>> (for the input `size` and `consumer`).
233-
234-
It finishes by returning `null` when no execution memory could be acquired.
235-
236-
With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using <<allocatedPages, allocatedPages>> registry).
273+
`allocatePage` allocates a block of memory (_page_) that is:
237274

238-
If the index is `PAGE_TABLE_SIZE` or higher, <<releaseExecutionMemory, releaseExecutionMemory(acquired, consumer)>> is called and then the following `IllegalStateException` is thrown:
275+
1. Below [MAXIMUM_PAGE_SIZE_BYTES](#MAXIMUM_PAGE_SIZE_BYTES) maximum size
276+
1. For [MemoryConsumer](MemoryConsumer.md)s with the same [MemoryMode](MemoryConsumer.md#getMode) as the [TaskMemoryManager](#tungstenMemoryMode)
239277

240-
```
241-
Have already allocated a maximum of [PAGE_TABLE_SIZE] pages
242-
```
278+
`allocatePage` [acquireExecutionMemory](#acquireExecutionMemory) (for the `size` and the [MemoryConsumer](MemoryConsumer.md)). `allocatePage` returns immediately (with `null`) when this allocation ended up with `0` or less bytes.
243279

244-
It then attempts to allocate a `MemoryBlock` from `Tungsten MemoryAllocator` (calling `memoryManager.tungstenMemoryAllocator().allocate(acquired)`).
280+
`allocatePage` allocates the first clear bit in the [allocatedPages](#allocatedPages) (unless the whole page table is taken and `allocatePage` throws an `IllegalStateException`).
245281

246-
CAUTION: FIXME What is `MemoryAllocator`?
282+
`allocatePage` requests the [MemoryManager](#memoryManager) for the [tungstenMemoryAllocator](MemoryManager.md#tungstenMemoryAllocator) that is requested to [allocate the acquired memory](MemoryAllocator.md#allocate).
247283

248-
When successful, `MemoryBlock` gets assigned `pageNumber` and it gets added to the internal <<pageTable, pageTable>> registry.
284+
`allocatePage` registers the page in the [pageTable](#pageTable).
249285

250-
You should see the following TRACE message in the logs:
286+
In the end, `allocatePage` prints out the following TRACE message to the logs and returns the `MemoryBlock` allocated.
251287

252-
```
253-
TRACE Allocate page number [pageNumber] ([acquired] bytes)
288+
```text
289+
Allocate page number [pageNumber] ([acquired] bytes)
254290
```
255291

256-
The `page` is returned.
292+
### <span id="allocatePage-usage"> Usage
257293

258-
If a `OutOfMemoryError` is thrown when allocating a `MemoryBlock` page, the following WARN message is printed out to the logs:
294+
`allocatePage` is used when:
259295

260-
```
261-
WARN Failed to allocate a page ([acquired] bytes), try again.
262-
```
263-
264-
And `acquiredButNotUsed` gets `acquired` memory space with the `pageNumber` cleared in <<allocatedPages, allocatedPages>> (i.e. the index for `pageNumber` gets `false`).
265-
266-
CAUTION: FIXME Why is the code tracking `acquiredButNotUsed`?
296+
* `MemoryConsumer` is requested to allocate an [array](MemoryConsumer.md#allocateArray) and a [page](MemoryConsumer.md#allocatePage)
267297

268-
Another <<allocatePage, allocatePage>> attempt is recursively tried.
298+
### <span id="allocatePage-TooLargePageException"> TooLargePageException
269299

270-
CAUTION: FIXME Why is there a hope for being able to allocate a page?
300+
For sizes larger than the [MAXIMUM_PAGE_SIZE_BYTES](#MAXIMUM_PAGE_SIZE_BYTES) `allocatePage` throws a `TooLargePageException`.
271301

272-
== [[getMemoryConsumptionForThisTask]] `getMemoryConsumptionForThisTask` Method
302+
### <span id="allocatePage-OutOfMemoryError"> OutOfMemoryError
273303

274-
[source, java]
275-
----
276-
long getMemoryConsumptionForThisTask()
277-
----
304+
Requesting the [tungstenMemoryAllocator](MemoryManager.md#tungstenMemoryAllocator) to [allocate the acquired memory](MemoryAllocator.md#allocate) may throw an `OutOfMemoryError`. If so, `allocatePage` prints out the following WARN message to the logs:
278305

279-
`getMemoryConsumptionForThisTask`...FIXME
306+
```text
307+
Failed to allocate a page ([acquired] bytes), try again.
308+
```
280309

281-
NOTE: `getMemoryConsumptionForThisTask` is used exclusively in Spark tests.
310+
`allocatePage` adds the acquired memory to the [acquiredButNotUsed](#acquiredButNotUsed) and removes the page from the [allocatedPages](#allocatedPages) (by clearing the bit).
282311

283-
== [[freePage]] Freeing Memory Page -- `freePage` Method
312+
In the end, `allocatePage` tries to [allocate the page](#allocatePage) again (recursively).
284313

285-
[source, java]
286-
----
287-
void freePage(MemoryBlock page, MemoryConsumer consumer)
288-
----
314+
## <span id="freePage"> Freeing Memory Page
289315

290-
`pageSizeBytes` simply requests the <<memoryManager, MemoryManager>> for MemoryManager.md#pageSizeBytes[pageSizeBytes].
316+
```java
317+
void freePage(
318+
MemoryBlock page,
319+
MemoryConsumer consumer)
320+
```
291321

292-
NOTE: `pageSizeBytes` is used when `MemoryConsumer` is requested to MemoryConsumer.md#freePage[freePage] and MemoryConsumer.md#throwOom[throwOom].
322+
`pageSizeBytes` requests the [MemoryManager](#memoryManager) for [pageSizeBytes](MemoryManager.md#pageSizeBytes).
293323

294-
== [[getPage]] Getting Page -- `getPage` Method
324+
`pageSizeBytes` is used when:
295325

296-
[source, java]
297-
----
298-
Object getPage(long pagePlusOffsetAddress)
299-
----
326+
* `MemoryConsumer` is requested to [freePage](MemoryConsumer.md#freePage) and [throwOom](MemoryConsumer.md#throwOom)
300327

301-
`getPage`...FIXME
328+
## <span id="getPage"> Getting Page
302329

303-
NOTE: `getPage` is used when...FIXME
330+
```java
331+
Object getPage(
332+
long pagePlusOffsetAddress)
333+
```
304334

305-
== [[getOffsetInPage]] Getting Page Offset -- `getOffsetInPage` Method
335+
`getPage` handles the `ON_HEAP` mode of the [tungstenMemoryMode](#tungstenMemoryMode) only.
306336

307-
[source, java]
308-
----
309-
long getOffsetInPage(long pagePlusOffsetAddress)
310-
----
337+
`getPage` looks up the page (by the given address) in the [page table](#pageTable) and requests it for the base object.
311338

312-
`getPage`...FIXME
339+
`getPage` is used when:
313340

314-
NOTE: `getPage` is used when...FIXME
341+
* `ShuffleExternalSorter` is requested to [writeSortedFile](../shuffle/ShuffleExternalSorter.md#writeSortedFile)
342+
* `Location` (of [BytesToBytesMap](BytesToBytesMap.md)) is requested to `updateAddressesAndSizes`
343+
* `SortComparator` (of [UnsafeInMemorySorter](UnsafeInMemorySorter.md)) is requested to `compare` two record pointers
344+
* `SortedIterator` (of [UnsafeInMemorySorter](UnsafeInMemorySorter.md)) is requested to `loadNext` record
315345

316-
== [[internal-properties]] Internal Properties
346+
## <span id="getOffsetInPage"> getOffsetInPage
317347

318-
[cols="30m,70",options="header",width="100%"]
319-
|===
320-
| Name
321-
| Description
348+
```java
349+
long getOffsetInPage(
350+
long pagePlusOffsetAddress)
351+
```
322352

323-
| acquiredButNotUsed
324-
| [[acquiredButNotUsed]] The size of memory allocated but not used.
353+
`getOffsetInPage` gives the offset associated with the given `pagePlusOffsetAddress` (encoded by `encodePageNumberAndOffset`).
325354

326-
| allocatedPages
327-
| [[allocatedPages]] Collection of flags (`true` or `false` values) of size `PAGE_TABLE_SIZE` with all bits initially disabled (i.e. `false`).
355+
`getOffsetInPage` is used when:
328356

329-
TIP: `allocatedPages` is https://docs.oracle.com/javase/8/docs/api/java/util/BitSet.html[java.util.BitSet].
357+
* `ShuffleExternalSorter` is requested to [writeSortedFile](../shuffle/ShuffleExternalSorter.md#writeSortedFile)
358+
* `Location` (of [BytesToBytesMap](BytesToBytesMap.md)) is requested to `updateAddressesAndSizes`
359+
* `SortComparator` (of [UnsafeInMemorySorter](UnsafeInMemorySorter.md)) is requested to `compare` two record pointers
360+
* `SortedIterator` (of [UnsafeInMemorySorter](UnsafeInMemorySorter.md)) is requested to `loadNext` record
330361

331-
When <<allocatePage, allocatePage>> is called, it will record the page in the registry by setting the bit at the specified index (that corresponds to the allocated page) to `true`.
362+
## Logging
332363

333-
| pageTable
334-
| [[pageTable]] The array of size `PAGE_TABLE_SIZE` with indices being `MemoryBlock` objects.
364+
Enable `ALL` logging level for `org.apache.spark.memory.TaskMemoryManager` logger to see what happens inside.
335365

336-
When <<allocatePage, allocating a `MemoryBlock` page for Tungsten consumers>>, the index corresponds to `pageNumber` that points to the `MemoryBlock` page allocated.
366+
Add the following line to `conf/log4j.properties`:
337367

338-
| tungstenMemoryMode
339-
| [[tungstenMemoryMode]] `MemoryMode` (i.e. `OFF_HEAP` or `ON_HEAP`)
368+
```text
369+
log4j.logger.org.apache.spark.memory.TaskMemoryManager=ALL
370+
```
340371

341-
Set to the MemoryManager.md#tungstenMemoryMode[tungstenMemoryMode] of the <<memoryManager, MemoryManager>> while TaskMemoryManager is <<creating-instance, created>>
342-
|===
372+
Refer to [Logging](../spark-logging.md).

docs/storage/MemoryStore.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,13 +301,13 @@ remove(
301301
blockId: BlockId): Boolean
302302
```
303303

304-
`remove` returns `true` when the given block ([BlockId](BlockId.md)) was (found and) removed from the [entries](#entries) registry successfully and the [memory released](MemoryManager.md#releaseStorageMemory) (from the [MemoryManager](#memoryManager)).
304+
`remove` returns `true` when the given block ([BlockId](BlockId.md)) was (found and) removed from the [entries](#entries) registry successfully and the [memory released](../memory/MemoryManager.md#releaseStorageMemory) (from the [MemoryManager](#memoryManager)).
305305

306306
---
307307

308308
`remove` removes (_drops_) the block ([BlockId](BlockId.md)) from the [entries](#entries) registry.
309309

310-
If found and removed, `remove` requests the [MemoryManager](#memoryManager) to [releaseStorageMemory](MemoryManager.md#releaseStorageMemory) and prints out the following DEBUG message to the logs (with the [maxMemory](#maxMemory) and [blocksMemoryUsed](#blocksMemoryUsed)):
310+
If found and removed, `remove` requests the [MemoryManager](#memoryManager) to [releaseStorageMemory](../memory/MemoryManager.md#releaseStorageMemory) and prints out the following DEBUG message to the logs (with the [maxMemory](#maxMemory) and [blocksMemoryUsed](#blocksMemoryUsed)):
311311

312312
```text
313313
Block [blockId] of size [size] dropped from memory (free [memory])

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ nav:
282282
- memory/index.md
283283
- BytesToBytesMap: memory/BytesToBytesMap.md
284284
- ExecutionMemoryPool: memory/ExecutionMemoryPool.md
285+
- MemoryAllocator: memory/MemoryAllocator.md
285286
- MemoryConsumer: memory/MemoryConsumer.md
286287
- MemoryManager: memory/MemoryManager.md
287288
- MemoryPool: memory/MemoryPool.md

0 commit comments

Comments
 (0)