Skip to content

Commit dd8645a

Browse files
author
Attila Tóth
committed
add: parameters to control number and the distribution of messages in a micro-batch
1 parent 29d01c4 commit dd8645a

16 files changed

+1482
-13
lines changed

README.md

+68
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,77 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
325325
This may cause a false alarm. You can set it to `false` when it doesn't work as you expected. <br>
326326

327327
A batch query always fails if it fails to read any data from the provided offsets due to data loss.</td>
328+
</tr>
329+
<tr>
330+
<td>
331+
`maxEntriesPerTrigger`
332+
</td>
333+
<td>
334+
Number of entries to include in a single micro-batch during
335+
streaming.
336+
</td>
337+
<td>-1</td>
338+
<td>Streaming query</td>
339+
<td>This parameter controls how many Pulsar entries are read by
340+
the connector from the topic backlog at once. If the topic
341+
backlog is considerably high, users can use this parameter
342+
to limit the size of the micro-batch. If multiple topics are read,
343+
this parameter controls the complete number of entries fetched from
344+
all of them.
345+
346+
*Note:* Entries might contain multiple messages. The default value of `-1` means that the
347+
complete backlog is read at once.</td>
348+
</tr>
349+
350+
<tr>
351+
<td>
352+
`forwardStrategy`
353+
</td>
354+
<td>
355+
`simple`, `large-first` or `proportional`
356+
</td>
357+
<td>`simple`</td>
358+
<td>Streaming query</td>
359+
<td>If `maxEntriesPerTrigger` is set, this parameter controls
360+
which forwarding strategy is in use during the read of multiple
361+
topics.
362+
<li>
363+
`simple` just divides the allowed number of entries equally
364+
between all topics, regardless of their backlog size
365+
</li>
366+
<li>
367+
`large-first` will load the largest topic backlogs first,
368+
as the maximum number of allowed entries allows
369+
</li>
370+
<li>
371+
`proportional` will forward all topics proportional to the
372+
topic backlog/overall backlog ratio
373+
</li>
374+
</td>
375+
</tr>
328376

377+
<tr>
378+
<td>
379+
`ensureEntriesPerTopic`
380+
</td>
381+
<td>Number to forward each topic with during a micro-batch.</td>
382+
<td>0</td>
383+
<td>Streaming query</td>
384+
<td>If multiple topics are read, and the maximum number of
385+
entries is also specified, always forward all topics with the
386+
amount of entries specified here. Using this, users can ensure that topics
387+
with considerably smaller backlogs than others are also forwarded
388+
and read. Note that:
389+
<li>If this number is higher than the maximum allowed entries divided
390+
by the number of topics, then this value is taken into account, overriding
391+
the maximum number of entries per micro-batch.
392+
</li>
393+
<li>This parameter has an effect only for forwarding strategies
394+
`large-first` and `proportional`.</li>
395+
</td>
329396
</tr>
330397

398+
331399
</table>
332400

333401
#### Authentication

src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala

+87-3
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@ package org.apache.spark.sql.pulsar
1515

1616
import java.{util => ju}
1717
import java.io.Closeable
18-
import java.util.{Optional, UUID}
18+
import java.util.Optional
1919
import java.util.concurrent.TimeUnit
2020
import java.util.regex.Pattern
2121

2222
import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
23-
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient, SubscriptionInitialPosition, SubscriptionType}
23+
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
2424
import org.apache.pulsar.client.impl.schema.BytesSchema
25+
import org.apache.pulsar.client.internal.DefaultImplementation
2526
import org.apache.pulsar.common.naming.TopicName
2627
import org.apache.pulsar.common.schema.SchemaInfo
2728

2829
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH, TOPIC_OPTION_KEYS}
30+
import org.apache.spark.sql.pulsar.PulsarOptions.TOPIC_OPTION_KEYS
31+
import org.apache.spark.sql.pulsar.topicinternalstats.forward._
3032
import org.apache.spark.sql.types.StructType
3133

3234
/**
@@ -205,6 +207,88 @@ private[pulsar] case class PulsarMetadataReader(
205207
}.toMap)
206208
}
207209

210+
211+
def forwardOffset(actualOffset: Option[Map[String, MessageId]],
212+
strategy: String,
213+
numberOfEntriesToForward: Long,
214+
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
215+
getTopicPartitions()
216+
217+
// Collect internal stats for all topics
218+
val topicStats = topicPartitions.map( topic => {
219+
val internalStats = admin.topics().getInternalStats(topic)
220+
val topicActualMessageId = actualOffset match {
221+
case Some(value) => value.getOrElse(topic, MessageId.earliest)
222+
case None => MessageId.earliest
223+
}
224+
topic -> TopicState(internalStats,
225+
PulsarSourceUtils.getLedgerId(topicActualMessageId),
226+
PulsarSourceUtils.getEntryId(topicActualMessageId))
227+
} ).toMap
228+
229+
val forwarder = strategy match {
230+
case PulsarOptions.PROPORTIONAL_FORWARD_STRATEGY =>
231+
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
232+
case PulsarOptions.LARGE_FIRST_FORWARD_STRATEGY =>
233+
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
234+
case _ =>
235+
new LinearForwardStrategy(numberOfEntriesToForward)
236+
}
237+
238+
SpecificPulsarOffset(topicPartitions.map { topic =>
239+
topic -> PulsarSourceUtils.seekableLatestMid {
240+
// Fetch actual offset for topic
241+
val topicActualMessageId = actualOffset match {
242+
case Some(value) => value.getOrElse(topic, MessageId.earliest)
243+
case None => MessageId.earliest
244+
}
245+
try {
246+
// Get the actual ledger
247+
val actualLedgerId = PulsarSourceUtils.getLedgerId(topicActualMessageId)
248+
// Get the actual entry ID
249+
val actualEntryId = PulsarSourceUtils.getEntryId(topicActualMessageId)
250+
// Get the partition index
251+
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
252+
// Cache topic internal stats
253+
val internalStats = topicStats.get(topic).get.internalStat
254+
// Calculate the amount of messages we will pull in
255+
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
256+
// Get a future message ID which corresponds
257+
// to the maximum number of messages
258+
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
259+
internalStats,
260+
actualLedgerId,
261+
actualEntryId,
262+
numberOfEntriesPerTopic)
263+
// Build a message id
264+
val forwardedMessageId =
265+
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
266+
// Log state
267+
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
268+
internalStats, nextLedgerId, nextEntryId)
269+
val entryCount = TopicInternalStatsUtils.entryCount(internalStats)
270+
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
271+
val logMessage = s"Pulsar Connector forward on topic. " +
272+
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
273+
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
274+
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
275+
log.debug(logMessage)
276+
// Return the message ID
277+
forwardedMessageId
278+
} catch {
279+
case e: PulsarAdminException if e.getStatusCode == 404 =>
280+
MessageId.earliest
281+
case e: Throwable =>
282+
throw new RuntimeException(
283+
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
284+
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
285+
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
286+
}
287+
288+
}
289+
}.toMap)
290+
}
291+
208292
def fetchLatestOffsetForTopic(topic: String): MessageId = {
209293
PulsarSourceUtils.seekableLatestMid( try {
210294
admin.topics().getLastMessageId(topic)

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ private[pulsar] object PulsarOptions {
3131
val TOPIC_MULTI = "topics"
3232
val TOPIC_PATTERN = "topicspattern"
3333

34+
val MAX_ENTRIES_PER_TRIGGER = "maxentriespertrigger"
35+
val ENSURE_ENTRIES_PER_TOPIC = "ensureentriespertopic"
36+
val FORWARD_STRATEGY = "forwardstrategy"
37+
val PROPORTIONAL_FORWARD_STRATEGY = "proportional"
38+
val LARGE_FIRST_FORWARD_STRATEGY = "large-first"
39+
3440
val PARTITION_SUFFIX = TopicName.PARTITIONED_TOPIC_SUFFIX
3541

3642
val TOPIC_OPTION_KEYS = Set(

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

+13-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ private[pulsar] class PulsarProvider
110110
pollTimeoutMs(caseInsensitiveParams),
111111
failOnDataLoss(caseInsensitiveParams),
112112
subscriptionNamePrefix,
113-
jsonOptions
113+
jsonOptions,
114+
maxEntriesPerTrigger(caseInsensitiveParams),
115+
minEntriesPerTopic(caseInsensitiveParams),
116+
forwardStrategy(caseInsensitiveParams)
114117
)
115118
}
116119

@@ -365,6 +368,15 @@ private[pulsar] object PulsarProvider extends Logging {
365368
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
366369
.toInt
367370

371+
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
372+
caseInsensitiveParams.getOrElse(MAX_ENTRIES_PER_TRIGGER, "-1").toLong
373+
374+
private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
375+
caseInsensitiveParams.getOrElse(ENSURE_ENTRIES_PER_TOPIC, "0").toLong
376+
377+
private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
378+
caseInsensitiveParams.getOrElse(FORWARD_STRATEGY, "simple")
379+
368380
private def validateGeneralOptions(
369381
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
370382
if (!caseInsensitiveParams.contains(SERVICE_URL_OPTION_KEY)) {

src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala

+13-9
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ private[pulsar] class PulsarSource(
3636
pollTimeoutMs: Int,
3737
failOnDataLoss: Boolean,
3838
subscriptionNamePrefix: String,
39-
jsonOptions: JSONOptionsInRead)
39+
jsonOptions: JSONOptionsInRead,
40+
maxEntriesPerTrigger: Long,
41+
ensureEntriesPerTopic: Long,
42+
forwardStrategy: String)
4043
extends Source
4144
with Logging {
4245

@@ -63,12 +66,15 @@ private[pulsar] class PulsarSource(
6366
override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)
6467

6568
override def getOffset: Option[Offset] = {
66-
// Make sure initialTopicOffsets is initialized
6769
initialTopicOffsets
68-
val latest = metadataReader.fetchLatestOffsets()
69-
currentTopicOffsets = Some(latest.topicOffsets)
70-
logDebug(s"GetOffset: ${latest.topicOffsets.toSeq.map(_.toString).sorted}")
71-
Some(latest.asInstanceOf[Offset])
70+
val nextOffsets = if (maxEntriesPerTrigger == -1) {
71+
metadataReader.fetchLatestOffsets()
72+
} else {
73+
metadataReader.forwardOffset(currentTopicOffsets,
74+
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
75+
}
76+
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
77+
Some(nextOffsets.asInstanceOf[Offset])
7278
}
7379

7480
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
@@ -78,9 +84,7 @@ private[pulsar] class PulsarSource(
7884
logInfo(s"getBatch called with start = $start, end = $end")
7985
val endTopicOffsets = SpecificPulsarOffset.getTopicOffsets(end)
8086

81-
if (currentTopicOffsets.isEmpty) {
82-
currentTopicOffsets = Some(endTopicOffsets)
83-
}
87+
currentTopicOffsets = Some(endTopicOffsets)
8488

8589
if (start.isDefined && start.get == end) {
8690
return sqlContext.internalCreateDataFrame(

src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala

+30
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,36 @@ private[pulsar] object PulsarSourceUtils extends Logging {
120120
}
121121
}
122122

123+
def getLedgerId(mid: MessageId): Long = {
124+
mid match {
125+
case bmid: BatchMessageIdImpl =>
126+
bmid.getLedgerId
127+
case midi: MessageIdImpl => midi.getLedgerId
128+
case t: TopicMessageIdImpl => getLedgerId(t.getInnerMessageId)
129+
case up: UserProvidedMessageId => up.getLedgerId
130+
}
131+
}
132+
133+
def getEntryId(mid: MessageId): Long = {
134+
mid match {
135+
case bmid: BatchMessageIdImpl =>
136+
bmid.getEntryId
137+
case midi: MessageIdImpl => midi.getEntryId
138+
case t: TopicMessageIdImpl => getEntryId(t.getInnerMessageId)
139+
case up: UserProvidedMessageId => up.getEntryId
140+
}
141+
}
142+
143+
def getPartitionIndex(mid: MessageId): Int = {
144+
mid match {
145+
case bmid: BatchMessageIdImpl =>
146+
bmid.getPartitionIndex
147+
case midi: MessageIdImpl => midi.getPartitionIndex
148+
case t: TopicMessageIdImpl => getPartitionIndex(t.getInnerMessageId)
149+
case up: UserProvidedMessageId => up.getPartitionIndex
150+
}
151+
}
152+
123153
def seekableLatestMid(mid: MessageId): MessageId = {
124154
if (messageExists(mid)) mid else MessageId.earliest
125155
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.spark.sql.pulsar.topicinternalstats.forward
15+
16+
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats
17+
18+
trait ForwardStrategy {
19+
def forward(topics: Map[String, TopicState]): Map[String, Long]
20+
}
21+
22+
case class TopicState(internalStat: PersistentTopicInternalStats,
23+
actualLedgerId: Long,
24+
actualEntryId: Long)

0 commit comments

Comments
 (0)