Skip to content

Commit 0deba91

Browse files
committed
Resolve floating point precision loss problem. Add tests.
1 parent 7c26f40 commit 0deba91

File tree

2 files changed

+64
-8
lines changed

2 files changed

+64
-8
lines changed

sdks/python/apache_beam/transforms/periodicsequence.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from typing import Any
2222
from typing import Optional
2323
from typing import Sequence
24-
from typing import Union
2524

2625
import apache_beam as beam
2726
from apache_beam.io.restriction_trackers import OffsetRange
@@ -41,13 +40,21 @@ class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
4140
def initial_restriction(self, element):
4241
start, end, interval = element
4342
if isinstance(start, Timestamp):
44-
start = start.micros / 1000000
43+
start_micros = start.micros
44+
else:
45+
start_micros = round(start * 1000000)
46+
4547
if isinstance(end, Timestamp):
46-
end = end.micros / 1000000
48+
end_micros = end.micros
49+
else:
50+
end_micros = round(end * 1000000)
51+
52+
interval_micros = round(interval * 1000000)
4753

48-
assert start <= end
54+
assert start_micros <= end_micros
4955
assert interval > 0
50-
total_outputs = round((end - start) / interval)
56+
delta_micros: int = end_micros - start_micros
57+
total_outputs = math.ceil(delta_micros / interval_micros)
5158
return OffsetRange(0, total_outputs)
5259

5360
def create_tracker(self, restriction):
@@ -236,7 +243,7 @@ def _validate_and_adjust_duration(self):
236243
# data's actual end time plus an extra fire interval, because the
237244
# impulse duration's upper bound is exclusive.
238245
end = start + data_duration + self.interval
239-
self.stop_ts = Timestamp.of(end)
246+
self.stop_ts = Timestamp(micros=end * 1000000)
240247
else:
241248
end = self.stop_ts.micros / 1000000
242249
else:

sdks/python/apache_beam/transforms/periodicsequence_test.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import time
2626
import unittest
2727

28+
from parameterized import parameterized
29+
2830
import apache_beam as beam
2931
from apache_beam.io.restriction_trackers import OffsetRange
3032
from apache_beam.testing.test_pipeline import TestPipeline
@@ -159,6 +161,53 @@ def test_processing_time(self):
159161
expected = [0, 2, 4]
160162
assert_that(ret, equal_to(expected, lambda x, y: abs(x - y) < threshold))
161163

164+
@parameterized.expand([0.5, 1, 2, 10])
165+
def test_stop_over_by_epsilon(self, interval):
166+
with TestPipeline() as p:
167+
ret = (
168+
p | PeriodicImpulse(
169+
start_timestamp=Timestamp(seconds=1),
170+
stop_timestamp=Timestamp(seconds=1, micros=1),
171+
data=[1, 2],
172+
fire_interval=interval)
173+
| beam.WindowInto(FixedWindows(interval))
174+
| beam.WithKeys(0)
175+
| beam.GroupByKey())
176+
expected = [
177+
(0, [1]),
178+
]
179+
assert_that(ret, equal_to(expected))
180+
181+
@parameterized.expand([1, 2])
182+
def test_stop_over_by_interval(self, interval):
183+
with TestPipeline() as p:
184+
ret = (
185+
p | PeriodicImpulse(
186+
start_timestamp=Timestamp(seconds=1),
187+
stop_timestamp=Timestamp(seconds=1 + interval),
188+
data=[1, 2],
189+
fire_interval=interval)
190+
| beam.WindowInto(FixedWindows(interval))
191+
| beam.WithKeys(0)
192+
| beam.GroupByKey())
193+
expected = [(0, [1])]
194+
assert_that(ret, equal_to(expected))
195+
196+
@parameterized.expand([1, 2])
197+
def test_stop_over_by_interval_and_epsilon(self, interval):
198+
with TestPipeline() as p:
199+
ret = (
200+
p | PeriodicImpulse(
201+
start_timestamp=Timestamp(seconds=1),
202+
stop_timestamp=Timestamp(seconds=1 + interval, micros=1),
203+
data=[1, 2],
204+
fire_interval=interval)
205+
| beam.WindowInto(FixedWindows(interval))
206+
| beam.WithKeys(0)
207+
| beam.GroupByKey())
208+
expected = [(0, [1]), (0, [2])]
209+
assert_that(ret, equal_to(expected))
210+
162211
def test_interval(self):
163212
with TestPipeline() as p:
164213
ret = (
@@ -176,7 +225,7 @@ def test_repeat(self):
176225
ret = (
177226
p | PeriodicImpulse(
178227
start_timestamp=now,
179-
stop_timestamp=now + 3.0,
228+
stop_timestamp=now + 2.6,
180229
data=[1, 2, 3, 4],
181230
fire_interval=0.5)
182231
| beam.WindowInto(FixedWindows(0.5))
@@ -212,7 +261,7 @@ def test_not_enough_timestamped_value(self):
212261

213262
def test_fuzzy_interval(self):
214263
seed = int(time.time() * 1000)
215-
times = 25
264+
times = 30
216265
logging.warning("random seed=%d", seed)
217266
random.seed(seed)
218267
for _ in range(times):

0 commit comments

Comments
 (0)