Description
What happened?
I updated my Beam YAML pipeline in Dataflow which was a noop change in the YAML itself, but since doing it, my pipeline is unable to progress due to this error.

Here's the Terraform diff showing what was actually changed in the update (read: nothing).
format: JSON
schema:
type: object
- # NOTE(jingram): `encoding_position` is required in each property
- # otherwise we get the following error when the pipeline is run:
- #
- # ```
- # ValueError: Schema with id <some uuid> has encoding_positions_set=True, but not all fields have encoding_position set
- # ```
properties:
id:
type: string
app_id:
type: string
index:
type: string
event_name:
type: string
event_type:
type: string
user_token:
type: string
object_ids:
type: array
items: { type: string }
Don't be fooled by the mention of encoding_position
in the YAML comment. This was just me cleaning up an old comment from days ago and pushing the update to prod. The comment I added at the time was me trying to fix the exact error I'm experiencing in this bug report by specifying encoding_position
in the schema fields (which I thought worked, but later realised the issue still persisted).
Anyway, in the Terraform diff, all I did is remove a YAML comment. I didn't change any schema. I didn't change the Pub/Sub message payload or attributes. I didn't change the BQ table schema but for some reason Beam.
Prior to updating the YAML with the above, my pipeline was suffering from this error, which I haven't yet investigated:
Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
redacted.MyProvider$MyTransform$MergeThingsFn$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:815)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000
org.apache.beam.sdk.values.PCollectionViews$MultimapViewToMapAdapter.get(PCollectionViews.java:2030)
java.base/java.util.Collections$UnmodifiableMap.get(Collections.java:1502)
redacted.MyProvider$MyTransform$MergeThingsFn.processElement(MyTransformProvider.java:264)
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341
b8b0bcee58104e8ba2650ed437816a44-000
is an event ID and I have id_attribute: eventID
specified in my pipeline's ReadFromPubSub
.
Anyway, I'm not sure this error caused the subsequent pipeline update to bork and the YAML change is just a red herring.
Expand this for example stack of the encoding_positions_set
error.
Error message from worker: generic::unknown: Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute
response = task()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 689, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 512, in get
processor = bundle_processor.BundleProcessor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1190, in create_execution_tree
return collections.OrderedDict([(
^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
transform_consumers = {
^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
transform_consumers = {
^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
transform_consumers = {
^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
transform_consumers = {
^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
transform_consumers = {
^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation
return transform_factory.create_operation(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1498, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1614, in create_sink_runner
return DataOutputOperation(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 146, in __init__
self.windowed_coder_impl = windowed_coder.get_impl()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl
self._impl = self._create_impl()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1429, in _create_impl
self.wrapped_value_coder.get_impl(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl
self._impl = self._create_impl()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1536, in _create_impl
return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl
self._impl = self._create_impl()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/row_coder.py", line 79, in _create_impl
return RowCoderImpl(self.schema, self.components)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/coders/coder_impl.py", line 1836, in apache_beam.coders.coder_impl.RowCoderImpl.__init__
ValueError: Schema with id 298ab171-007f-4a07-bef1-2231611f4734 has encoding_positions_set=True,
but not all fields have encoding_position set
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341
I have increased the priority of this ticket, please adjust as required on your end.
The only workaround I've got to work over the week of getting this error is to just wipe my pipeline entirely and recreate it from scratch.
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner