Skip to content

Commit a081323

Browse files
author
Attila Tóth
committed
remove(metadatareader): topic strategy support
This feature is planned to be put into a different PR.
1 parent 7459a8f commit a081323

12 files changed

+27
-906
lines changed

README.md

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -329,53 +329,6 @@ all of them.
329329
complete backlog is read at once.</td>
330330
</tr>
331331

332-
<tr>
333-
<td>
334-
`forwardStrategy`
335-
</td>
336-
<td>
337-
`simple`, `large-first` or `proportional`
338-
</td>
339-
<td>`simple`</td>
340-
<td>Streaming query</td>
341-
<td>If `maxEntriesPerTrigger` is set, this parameter controls
342-
which forwarding strategy is in use during the read of multiple
343-
topics.
344-
<li>
345-
`simple` just divides the allowed number of entries equally
346-
between all topics, regardless of their backlog size
347-
</li>
348-
<li>
349-
`large-first` will load the largest topic backlogs first,
350-
as the maximum number of allowed entries allows
351-
</li>
352-
<li>
353-
`proportional` will forward all topics proportional to the
354-
topic backlog/overall backlog ratio
355-
</li>
356-
</td>
357-
</tr>
358-
359-
<tr>
360-
<td>
361-
`ensureEntriesPerTopic`
362-
</td>
363-
<td>Number to forward each topic with during a micro-batch.</td>
364-
<td>0</td>
365-
<td>Streaming query</td>
366-
<td>If multiple topics are read, and the maximum number of
367-
entries is also specified, always forward all topics with the
368-
amount of entries specified here. Using this, users can ensure that topics
369-
with considerably smaller backlogs than others are also forwarded
370-
and read. Note that:
371-
<li>If this number is higher than the maximum allowed entries divided
372-
by the number of topics, then this value is taken into account, overriding
373-
the maximum number of entries per micro-batch.
374-
</li>
375-
<li>This parameter has an effect only for forwarding strategies
376-
`large-first` and `proportional`.</li>
377-
</td>
378-
</tr>
379332
<tr>
380333
<td>
381334
`allowDifferentTopicSchemas`

src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -225,30 +225,14 @@ private[pulsar] case class PulsarMetadataReader(
225225
}.toMap)
226226
}
227227

228-
229-
def forwardOffset(actualOffset: Map[String, MessageId],
230-
strategy: String,
231-
numberOfEntriesToForward: Long,
232-
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
228+
def fetchNextOffsetWithMaxEntries(actualOffset: Map[String, MessageId],
229+
numberOfEntries: Long): SpecificPulsarOffset = {
233230
getTopicPartitions()
234231

235232
// Collect internal stats for all topics
236233
val topicStats = topicPartitions.map( topic => {
237-
val internalStats = admin.topics().getInternalStats(topic)
238-
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
239-
topic -> TopicState(internalStats,
240-
PulsarSourceUtils.getLedgerId(topicActualMessageId),
241-
PulsarSourceUtils.getEntryId(topicActualMessageId))
242-
} ).toMap
243-
244-
val forwarder = strategy match {
245-
case PulsarOptions.ProportionalForwardStrategy =>
246-
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
247-
case PulsarOptions.LargeFirstForwardStrategy =>
248-
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
249-
case _ =>
250-
new LinearForwardStrategy(numberOfEntriesToForward)
251-
}
234+
topic -> admin.topics().getInternalStats(topic)
235+
} ).toMap.asJava
252236

253237
SpecificPulsarOffset(topicPartitions.map { topic =>
254238
topic -> PulsarSourceUtils.seekableLatestMid {
@@ -262,39 +246,39 @@ private[pulsar] case class PulsarMetadataReader(
262246
// Get the partition index
263247
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
264248
// Cache topic internal stats
265-
val internalStats = topicStats.get(topic).get.internalStat
249+
val internalStats = topicStats.get(topic)
266250
// Calculate the amount of messages we will pull in
267-
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
268-
// Get a future message ID which corresponds
269-
// to the maximum number of messages
251+
val numberOfEntriesPerTopic = numberOfEntries / topics.size
252+
// Get a next message ID which respects
253+
// the maximum number of messages
270254
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
271255
internalStats,
272256
actualLedgerId,
273257
actualEntryId,
274258
numberOfEntriesPerTopic)
275-
// Build a message id
276-
val forwardedMessageId =
259+
// Build the next message ID
260+
val nextMessageId =
277261
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
278262
// Log state
279-
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
263+
val entryCountUntilNextMessageId = TopicInternalStatsUtils.numOfEntriesUntil(
280264
internalStats, nextLedgerId, nextEntryId)
281265
val entryCount = internalStats.numberOfEntries
282-
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
283-
val logMessage = s"Pulsar Connector forward on topic. " +
284-
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
266+
val progress = f"${entryCountUntilNextMessageId.toFloat / entryCount.toFloat}%1.3f"
267+
val logMessage = s"Pulsar Connector offset step forward. " +
268+
s"[$numberOfEntriesPerTopic/$numberOfEntries]" +
285269
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
286-
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
270+
s"$nextMessageId ($entryCountUntilNextMessageId/$entryCount) [$progress]"
287271
log.debug(logMessage)
288272
// Return the message ID
289-
forwardedMessageId
273+
nextMessageId
290274
} catch {
291275
case e: PulsarAdminException if e.getStatusCode == 404 =>
292276
MessageId.earliest
293277
case e: Throwable =>
294278
throw new RuntimeException(
295279
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
296-
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
297-
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
280+
s"(tried to forward ${numberOfEntries} messages " +
281+
s"starting from `$topicActualMessageId`)", e)
298282
}
299283

300284
}

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ private[pulsar] object PulsarOptions {
3434
val PartitionSuffix: String = TopicName.PARTITIONED_TOPIC_SUFFIX
3535

3636
val MaxEntriesPerTrigger = "maxentriespertrigger"
37-
val EnsureEntriesPerTopic = "ensureentriespertopic"
38-
val ForwardStrategy = "forwardstrategy"
39-
val ProportionalForwardStrategy = "proportional"
40-
val LargeFirstForwardStrategy = "large-first"
4137

4238
val TopicOptionKeys = Set(
4339
TopicSingle,

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,7 @@ private[pulsar] class PulsarProvider
113113
failOnDataLoss(caseInsensitiveParams),
114114
subscriptionNamePrefix,
115115
jsonOptions,
116-
maxEntriesPerTrigger(caseInsensitiveParams),
117-
minEntriesPerTopic(caseInsensitiveParams),
118-
forwardStrategy(caseInsensitiveParams)
116+
maxEntriesPerTrigger(caseInsensitiveParams)
119117
)
120118
}
121119

@@ -384,12 +382,6 @@ private[pulsar] object PulsarProvider extends Logging {
384382
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
385383
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong
386384

387-
private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
388-
caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong
389-
390-
private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
391-
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")
392-
393385
private def validateGeneralOptions(
394386
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
395387
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {

src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ private[pulsar] class PulsarSource(
3737
failOnDataLoss: Boolean,
3838
subscriptionNamePrefix: String,
3939
jsonOptions: JSONOptionsInRead,
40-
maxEntriesPerTrigger: Long,
41-
ensureEntriesPerTopic: Long,
42-
forwardStrategy: String)
40+
maxEntriesPerTrigger: Long)
4341
extends Source
4442
with Logging {
4543

@@ -72,11 +70,11 @@ private[pulsar] class PulsarSource(
7270
} else {
7371
currentTopicOffsets match {
7472
case Some(value) =>
75-
metadataReader.forwardOffset(value,
76-
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
73+
metadataReader.fetchNextOffsetWithMaxEntries(value,
74+
maxEntriesPerTrigger)
7775
case _ =>
78-
metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
79-
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
76+
metadataReader.fetchNextOffsetWithMaxEntries(initialTopicOffsets.topicOffsets,
77+
maxEntriesPerTrigger)
8078
}
8179
}
8280
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")

src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala

Lines changed: 0 additions & 96 deletions
This file was deleted.

src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)