Skip to content

[Bug]: Got error 'ValueError: Schema with id <some uuid> has encoding_positions_set=True, but not all fields have encoding_position set' for no apparent reason #35318

Open
@jonathaningram

Description

@jonathaningram

What happened?

I updated my Beam YAML pipeline in Dataflow which was a noop change in the YAML itself, but since doing it, my pipeline is unable to progress due to this error.

Image

Here's the Terraform diff showing what was actually changed in the update (read: nothing).

                      format: JSON
                      schema:
                        type: object
              -         # NOTE(jingram): `encoding_position` is required in each property
              -         # otherwise we get the following error when the pipeline is run:
              -         #
              -         # ```
              -         # ValueError: Schema with id <some uuid> has encoding_positions_set=True, but not all fields have encoding_position set
              -         # ```
                        properties:
                          id:
                            type: string
                          app_id:
                            type: string
                          index:
                            type: string
                          event_name:
                            type: string
                          event_type:
                            type: string
                          user_token:
                            type: string
                          object_ids:
                            type: array
                            items: { type: string }
                

Don't be fooled by the mention of encoding_position in the YAML comment. This was just me cleaning up an old comment from days ago and pushing the update to prod. The comment I added at the time was me trying to fix the exact error I'm experiencing in this bug report by specifying encoding_position in the schema fields (which I thought worked, but later realised the issue still persisted).

Anyway, in the Terraform diff, all I did is remove a YAML comment. I didn't change any schema. I didn't change the Pub/Sub message payload or attributes. I didn't change the BQ table schema but for some reason Beam.

Prior to updating the YAML with the above, my pipeline was suffering from this error, which I haven't yet investigated:

Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000
	org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	redacted.MyProvider$MyTransform$MergeThingsFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:815)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
	org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
	org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
	org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
	org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000
	org.apache.beam.sdk.values.PCollectionViews$MultimapViewToMapAdapter.get(PCollectionViews.java:2030)
	java.base/java.util.Collections$UnmodifiableMap.get(Collections.java:1502)
	redacted.MyProvider$MyTransform$MergeThingsFn.processElement(MyTransformProvider.java:264)

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341

b8b0bcee58104e8ba2650ed437816a44-000 is an event ID and I have id_attribute: eventID specified in my pipeline's ReadFromPubSub.

Anyway, I'm not sure this error caused the subsequent pipeline update to bork and the YAML change is just a red herring.

Expand this for example stack of the encoding_positions_set error.

Error message from worker: generic::unknown: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 689, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 512, in get
    processor = bundle_processor.BundleProcessor(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1190, in create_execution_tree
    return collections.OrderedDict([(
                                   ^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation
    return transform_factory.create_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1498, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1614, in create_sink_runner
    return DataOutputOperation(
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 146, in __init__
    self.windowed_coder_impl = windowed_coder.get_impl()
                               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl
    self._impl = self._create_impl()
                 ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1429, in _create_impl
    self.wrapped_value_coder.get_impl(),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl
    self._impl = self._create_impl()
                 ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1536, in _create_impl
    return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
                                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl
    self._impl = self._create_impl()
                 ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/row_coder.py", line 79, in _create_impl
    return RowCoderImpl(self.schema, self.components)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "apache_beam/coders/coder_impl.py", line 1836, in apache_beam.coders.coder_impl.RowCoderImpl.__init__
ValueError: Schema with id 298ab171-007f-4a07-bef1-2231611f4734 has encoding_positions_set=True,
            but not all fields have encoding_position set

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341

I have increased the priority of this ticket, please adjust as required on your end.

The only workaround I've got to work over the week of getting this error is to just wipe my pipeline entirely and recreate it from scratch.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions